# conx - a neural network library
#
# Copyright (c) Douglas S. Blank <doug.blank@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor,
# Boston, MA 02110-1301 USA
"""
The network module contains the code for the Network class.
"""
import collections
import operator
import inspect
from functools import reduce
import signal
import string
import numbers
import random
import pickle
import base64
import json
import html
import copy
import sys
import io
import os
import re
from typing import Any
import PIL
import numpy as np
import matplotlib.pyplot as plt
import keras
from keras.callbacks import Callback, History
import keras.backend as K
from .utils import *
from .layers import Layer, make_layer
from .dataset import Dataset, VirtualDataset
import conx.networks
#------------------------------------------------------------------------
[docs]def as_sum(item):
"""
Given a result loss measure, return a value.
"""
if isinstance(item, np.ndarray):
return item.sum()
else:
dims = shape(item)
if len(dims) == 1 and dims[0] != 0:
return sum(item)
elif isinstance(item, (float, int)):
return item
else:
return sum([as_sum(i) for i in item])
[docs]class ReportCallback(Callback):
def __init__(self, network, verbose, report_rate, mpl_backend, record):
# mpl_backend is matplotlib backend
super().__init__()
self.network = network
self.verbose = verbose
self.report_rate = report_rate
self.mpl_backend = mpl_backend
self.in_console = self.network.in_console(mpl_backend)
self.record = record
[docs] def on_epoch_end(self, epoch, logs=None):
self.network.history.append(logs)
self.network.epoch_count += 1
if (self.verbose > 0 and
self.in_console and
(epoch+1) % self.report_rate == 0):
self.network.report_epoch(self.network.epoch_count, logs)
if self.record != 0 and (epoch+1) % self.record == 0:
self.network.weight_history[self.network.epoch_count] = self.network.get_weights()
[docs]class PlotCallback(Callback):
def __init__(self, network, report_rate, mpl_backend):
# mpl_backend te matplotlib backend string code
#
super().__init__()
self.network = network
self.report_rate = report_rate
self.mpl_backend = mpl_backend
self.in_console = self.network.in_console(mpl_backend)
self.figure = None
[docs] def on_epoch_end(self, epoch, logs=None):
if epoch == -1:
# training loop finished, so make a final update to plot
# in case the number of loop cycles wasn't a multiple of
# report_rate
self.network.plot_results(self)
if not self.in_console:
plt.close(self.figure[0])
elif (epoch+1) % self.report_rate == 0:
self.network.plot_results(self)
[docs]class FunctionCallback(Callback):
"""
'on_batch_begin',
'on_batch_end',
'on_epoch_begin',
'on_epoch_end',
'on_train_begin',
'on_train_end',
"""
def __init__(self, network, on_method, function):
super().__init__()
self.network = network
self.on_method = on_method
self.function = function
[docs] def on_batch_begin(self, batch, logs=None):
if self.on_method == "on_batch_begin":
self.function(self.network, batch, logs)
[docs] def on_batch_end(self, batch, logs=None):
if self.on_method == "on_batch_end":
self.function(self.network, batch, logs)
[docs] def on_epoch_begin(self, epoch, logs=None):
if self.on_method == "on_epoch_begin":
self.function(self.network, epoch, logs)
[docs] def on_epoch_end(self, epoch, logs=None):
if self.on_method == "on_epoch_end":
self.function(self.network, epoch, logs)
[docs] def on_train_begin(self, logs=None):
if self.on_method == "on_train_begin":
self.function(self.network, logs)
[docs] def on_train_end(self, logs=None):
if self.on_method == "on_train_end":
self.function(self.network, logs)
[docs]class StoppingCriteria(Callback):
def __init__(self, item, op, value, use_validation_to_stop):
super().__init__()
self.item = item
self.op = op
self.value = value
self.use_validation_to_stop = use_validation_to_stop
[docs] def on_epoch_end(self, epoch, logs=None):
key = ("val_" + self.item) if self.use_validation_to_stop else self.item
if key in logs: # we get what we need directly:
if self.compare(logs[key], self.op, self.value):
self.model.stop_training = True
else:
## ok, then let's sum/average anything that matches
total = 0
count = 0
for item in logs:
if self.use_validation_to_stop:
if item.startswith("val_") and item.endswith("_" + self.item):
count += 1
total += logs[item]
else:
if item.endswith("_" + self.item) and not item.startswith("val_"):
count += 1
total += logs[item]
if count > 0 and self.compare(total/count, self.op, self.value):
self.model.stop_training = True
[docs] def compare(self, v1, op, v2):
if v2 is None: return False
if op == "<":
return v1 < v2
elif op == ">":
return v1 > v2
elif op == "==":
return v1 == v2
elif op == "<=":
return v1 <= v2
elif op == ">=":
return v1 >= v2
[docs]class Network():
"""
The main class for the conx neural network package.
Arguments:
name: Required. The name of the network. Should not contain special HTML
characters.
sizes: Optional numbers. Defines the sizes of layers of a sequential
network. These will be created, added, and connected automatically.
config: Configuration overrides for the network.
Note:
To create a complete, operating network, you must do the following items:
1. create a network
2. add layers
3. connect the layers
4. compile the network
5. set the dataset
6. train the network
See also :any:`Layer`, :any:`Network.add`, :any:`Network.connect`,
and :any:`Network.compile`.
Examples:
>>> net = Network("XOR1", 2, 5, 2)
>>> len(net.layers)
3
>>> net = Network("XOR2")
>>> net.add(Layer("input", 2))
'input'
>>> net.add(Layer("hidden", 5))
'hidden'
>>> net.add(Layer("output", 2))
'output'
>>> net.connect()
>>> len(net.layers)
3
>>> net = Network("XOR3")
>>> net.add(Layer("input", 2))
'input'
>>> net.add(Layer("hidden", 5))
'hidden'
>>> net.add(Layer("output", 2))
'output'
>>> net.connect("input", "hidden")
>>> net.connect("hidden", "output")
>>> len(net.layers)
3
>>> net = Network("NMIST")
>>> net.name
'NMIST'
>>> len(net.layers)
0
>>> net = Network("NMIST", 10, 5, 1)
>>> len(net.layers)
3
>>> net = Network("NMIST", 10, 5, 5, 1, activation="sigmoid")
>>> net.config["activation"]
'sigmoid'
>>> net["output"].activation == "sigmoid"
True
>>> net["hidden1"].activation == "sigmoid"
True
>>> net["hidden2"].activation == "sigmoid"
True
>>> net["input"].activation is None
True
>>> net.layers[0].name == "input"
True
"""
OPTIMIZERS = ("sgd", "rmsprop", "adagrad", "adadelta", "adam",
"adamax", "nadam", "tfoptimizer")
ERROR_FUNCTIONS = ['binary_crossentropy', 'categorical_crossentropy',
'categorical_hinge', 'cosine', 'cosine_proximity', 'hinge',
'kld', 'kullback_leibler_divergence', 'logcosh', 'mae', 'mape',
'mean_absolute_error', 'mean_absolute_percentage_error', 'mean_squared_error',
'mean_squared_logarithmic_error', 'mse', 'msle', 'poisson',
'sparse_categorical_crossentropy', 'squared_hinge']
def __init__(self, name: str, *sizes: int, load_config=True, debug=False,
build_propagate_from_models=True, **config: Any):
self.warnings = {}
self.NETWORKS = {name: function for (name, function) in
inspect.getmembers(conx.networks, inspect.isfunction)}
if not isinstance(name, str):
raise Exception("first argument should be a name for the network")
self.debug = debug
self.build_propagate_from_models = build_propagate_from_models
## Pick a place in the random stream, and remember it:
## (can override randomness with a particular seed):
if not isinstance(name, str):
raise Exception("conx layers need a name as a first parameter")
self._check_network_name(name)
self.information = None
self.name = name
if "seed" in config:
seed = config["seed"]
del config["seed"]
set_random_seed(seed)
self.reset_config()
## Next, load a config if available, and override defaults:
self.layers = []
if load_config:
self.load_config()
## Override those with args:
self.config.update(config)
## Set initial values:
self.num_input_layers = 0
self.num_target_layers = 0
self.input_bank_order = []
self.output_bank_order = []
self.dataset = Dataset(self)
self.compile_options = {}
self.compile_args = {}
self.train_options = {}
self._tolerance = K.variable(0.1, dtype='float32', name='tolerance')
self.layer_dict = {}
self.epoch_count = 0
self.history = []
self.weight_history = {}
self.model = None
self.connections = []
self._level_ordering = None
self.prop_from_dict = {} ## FIXME: can be multiple paths
self.keras_functions = {}
self._svg_counter = 1
self._need_to_show_headings = True
# If simple feed-forward network:
for i in range(len(sizes)):
if i > 0:
self.add(Layer(autoname(i, len(sizes)), shape=sizes[i],
activation=self.config["activation"]))
else:
self.add(Layer(autoname(i, len(sizes)), shape=sizes[i]))
# Connect them together:
for i in range(len(sizes) - 1):
self.connect(autoname(i, len(sizes)), autoname(i+1, len(sizes)))
[docs] def info(self):
"""
Get information about the network.
"""
class NetworkInformation():
def __init__(self, net):
self.net = net
def __repr__(self):
return self.make_info()
def _repr_markdown_(self):
return self.make_info()
def make_info(self):
retval = ""
retval += "**Network**: %s\n\n" % self.net.name
retval += " * **Status**: %s \n" % ("uncompiled" if not self.net.model else "compiled")
retval += " * **Layers**: %s \n" % len(self.net.layers)
if self.net.information is not None:
retval += self.net.information
retval += "\n"
return retval
return display(NetworkInformation(self))
[docs] def reset_config(self):
"""
Reset the config back to factor defaults.
"""
self.config = {
"font_size": 12, # for svg
"font_family": "monospace", # for svg
"border_top": 25, # for svg
"border_bottom": 25, # for svg
"hspace": 150, # for svg
"vspace": 30, # for svg, arrows
"image_maxdim": 200, # for svg
"image_pixels_per_unit": 50, # for svg
"activation": "linear", # Dense default, if none specified
"arrow_color": "black",
"arrow_width": "2",
"border_width": "2",
"border_color": "black",
"show_targets": False,
"show_errors": False,
"pixels_per_unit": 1,
"precision": 2,
"svg_scale": None, # for svg, 0 - 1, or None for optimal
"svg_rotate": False, # for rotating SVG
"svg_smoothing": 0.02, # smoothing curves
"svg_preferred_size": 400, # in pixels
"svg_max_width": 800, # in pixels
"dashboard.dataset": "Train",
"dashboard.features.bank": "",
"dashboard.features.columns": 3,
"dashboard.features.scale": 1.0,
"layers": {},
}
def _check_network_name(self, name):
"""
Check to see if a network name is appropriate.
Raises exception if invalid name.
"""
valid_chars = string.ascii_letters + string.digits + " _-"
if len(name) == 0:
raise Exception("network name must not be length 0: '%s'" % name)
if not all(char in valid_chars for char in name):
raise Exception("network name must only contain letters, numbers, '-', ' ', and '_': '%s'" % name)
def _get_tolerance(self):
return K.get_value(self._tolerance)
def _set_tolerance(self, value):
K.set_value(self._tolerance, value)
tolerance = property(_get_tolerance,
_set_tolerance)
def __getitem__(self, layer_name):
if layer_name not in self.layer_dict:
return None
else:
return self.layer_dict[layer_name]
def _repr_svg_(self):
return self.to_svg(show_errors=False, show_targets=False,
svg_rotate=False, svg_scale=None)
def __repr__(self):
return "<Network name='%s' (%s)>" % (
self.name, ("uncompiled" if not self.model else "compiled"))
[docs] def networks(self=None):
"""
Returns the list of pre-made networks.
>>> len(Network.networks())
3
>>> net = Network("Test Me")
>>> len(net.networks())
3
"""
if self is None:
self = Network("Temp")
return sorted(self.NETWORKS.keys())
[docs] @classmethod
def get(cls, network_name, *args, **kwargs):
self = cls("Temp")
if network_name.lower() in self.NETWORKS:
return self.NETWORKS[network_name.lower()](*args, **kwargs)
else:
raise Exception(
("unknown network name '%s': should be one of %s" %
(network_name, list(self.NETWORKS.keys()))))
[docs] def set_weights_from_history(self, index, epochs=None):
"""
Set the weights of the network from a particular point in the learning
sequence.
net.set_weights_from_history(0) # restore initial weights
net.set_weights_from_history(-1) # restore last weights
See also:
* `Network.get_weights_from_history`
"""
epochs = epochs if epochs is not None else sorted(self.weight_history.keys())
return self.set_weights(self.get_weights_from_history(index, epochs))
[docs] def get_weights_from_history(self, index, epochs=None):
"""
Get the weights of the network from a particular point in the learning
sequence.
wts = net.get_weights_from_history(0) # get initial weights
wts = net.get_weights_from_history(-1) # get last weights
See also:
* `Network.set_weights_from_history`
"""
epochs = epochs if epochs is not None else sorted(self.weight_history.keys())
return self.weight_history[epochs[index]]
[docs] def playback(self, function, display=True):
"""
Playback a function over the set of recorded weights.
function has signature: function(network, epoch) and returns
a displayable, or list of displayables.
Example:
>>> net = Network("Playback Test", 2, 2, 1, activation="sigmoid")
>>> net.compile(error="mse", optimizer="sgd")
>>> net.dataset.load([
... [[0, 0], [0]],
... [[0, 1], [1]],
... [[1, 0], [1]],
... [[1, 1], [0]]])
>>> results = net.train(10, record=True, verbose=0, plot=False)
>>> def function(network, epoch):
... return None
>>> sv = net.playback(function)
>>> ## Testing:
>>> class Dummy:
... def update(self, result):
... return result
>>> sv.displayers = [Dummy()]
>>> print("Testing"); sv.goto("end") # doctest: +ELLIPSIS
Testing...
"""
from .widgets import SequenceViewer
if len(self.weight_history) == 0:
raise Exception("network wasn't trained with record=True; please train again")
epochs = sorted(self.weight_history.keys())
def display_weight_history(index):
self.set_weights_from_history(index, epochs)
return function(self, epochs[index])
if display:
sv = SequenceViewer("%s Playback:" % self.name, display_weight_history, len(epochs))
return sv
else:
for index in range(len(epochs)):
print(display_weight_history(index))
[docs] def movie(self, function, movie_name=None, start=0, stop=None, step=1,
loop=0, optimize=True, duration=100, embed=False, mp4=True):
"""
Make a movie from a playback function over the set of recorded weights.
function has signature: function(network, epoch) and should return
a PIL.Image.
Example:
>>> net = Network("Movie Test", 2, 2, 1, activation="sigmoid")
>>> net.compile(error='mse', optimizer="adam")
>>> ds = [[[0, 0], [0]],
... [[0, 1], [1]],
... [[1, 0], [1]],
... [[1, 1], [0]]]
>>> net.dataset.load(ds)
>>> epochs, khistory = net.train(10, verbose=0, report_rate=1000, record=True, plot=False)
>>> img = net.movie(lambda net, epoch: net.propagate_to_image("hidden", [1, 1],
... resize=(500, 100)),
... "/tmp/movie.gif", mp4=False)
>>> img
<IPython.core.display.Image object>
"""
from IPython.display import Image
if len(self.weight_history) == 0:
raise Exception("network wasn't trained with record=True; please train again")
epochs = sorted(self.weight_history.keys())
if stop is None:
stop = len(epochs)
frames = []
indices = []
for index in range(start, stop, step):
self.set_weights_from_history(index, epochs)
frames.append(function(self, epochs[index]))
indices.append(index)
if stop - 1 not in indices:
self.set_weights_from_history(stop - 1, epochs)
frames.append(function(self, epochs[stop - 1]))
if movie_name is None:
movie_name = "%s-movie.gif" % self.name.replace(" ", "_")
if frames:
frames[0].save(movie_name, save_all=True, append_images=frames[1:],
optimize=optimize, loop=loop, duration=duration)
if mp4 is False:
return Image(url=movie_name, embed=embed)
else:
return gif2mp4(movie_name)
[docs] def picture(self, inputs=None, dynamic=False, rotate=False, scale=None,
show_errors=False, show_targets=False, format="html", class_id=None,
minmax=None, targets=None, **kwargs):
"""
Create an SVG of the network given some inputs (optional).
Arguments:
inputs: input values to propagate
dynamic (bool): if True, the picture will update with
propagations(update_picture=True)
rotate (bool): rotate picture to horizontal
scale (float): scale the picture
show_errors (bool): show the errors in resulting picture
show_targets (bool): show the targets in resulting picture
format (str): "html", "image", or "svg"
class_id (str): id for dynamic updates
minmax (tuple): provide override for input range (layer 0 only)
Examples:
>>> net = Network("Picture", 2, 2, 1)
>>> net.compile(error="mse", optimizer="adam")
>>> net.picture()
<IPython.core.display.HTML object>
>>> net.picture([.5, .5])
<IPython.core.display.HTML object>
>>> net.picture([.5, .5], dynamic=True)
<IPython.core.display.HTML object>
"""
from IPython.display import HTML
if any([(layer.kind() == "unconnected") for layer in self.layers]) or len(self.layers) == 0:
print("Network error: please add layers and connect them")
return
if not dynamic:
if class_id is not None:
print("WARNING: class_id given but ignored", file=sys.stderr)
r = random.randint(1, 1000000)
class_id = "picture-static-%s-%s" % (self.name, r)
elif not dynamic_pictures_check():
print("WARNING: use dynamic_pictures() to allow dynamic pictures",
file=sys.stderr)
orig_rotate = self.config["svg_rotate"]
orig_show_errors = self.config["show_errors"]
orig_show_targets = self.config["show_targets"]
orig_svg_scale = self.config["svg_scale"]
orig_minmax = self.layers[0].minmax
self.config["svg_rotate"] = rotate
self.config["show_errors"] = show_errors
self.config["show_targets"] = show_targets
self.config["svg_scale"] = scale
if minmax:
self.layers[0].minmax = minmax
elif len(self.dataset) == 0 and inputs is not None:
self.layers[0].minmax = (minimum(inputs), maximum(inputs))
## else, leave minmax as None
svg = self.to_svg(inputs=inputs, class_id=class_id, targets=targets, **kwargs)
self.config["svg_rotate"] = orig_rotate
self.config["show_errors"] = orig_show_errors
self.config["show_targets"] = orig_show_targets
self.config["svg_scale"] = orig_svg_scale
self.layers[0].minmax = orig_minmax
if format == "html":
return HTML(svg)
elif format == "svg":
return svg
elif format == "image":
return svg_to_image(svg)
[docs] def in_console(self, mpl_backend: str) -> bool:
"""
Return True if running connected to a console; False if connected
to notebook, or other non-console system.
Possible values:
* 'TkAgg' - console with Tk
* 'Qt5Agg' - console with Qt
* 'MacOSX' - mac console
* 'module://ipykernel.pylab.backend_inline' - default for notebook and
non-console, and when using %matplotlib inline
* 'NbAgg' - notebook, using %matplotlib notebook
Here, None means not plotting, or just use text.
Note:
If you are running ipython without a DISPLAY with the QT
background, you may wish to:
export QT_QPA_PLATFORM='offscreen'
"""
return mpl_backend not in [
'module://ipykernel.pylab.backend_inline',
'NbAgg',
]
[docs] def add(self, *layers: Layer) -> None:
"""
Add layers to the network layer connections. Order is not
important, unless calling :any:`Network.connect` without any
arguments.
Arguments:
layer: One or more layer instances.
Returns:
layer_name (str) - name of last layer added
Examples:
>>> net = Network("XOR2")
>>> net.add(Layer("input", 2))
'input'
>>> len(net.layers)
1
>>> net = Network("XOR3")
>>> net.add(Layer("input", 2))
'input'
>>> net.add(Layer("hidden", 5))
'hidden'
>>> net.add(Layer("hidden2", 5),
... Layer("hidden3", 5),
... Layer("hidden4", 5),
... Layer("hidden5", 5))
'hidden5'
>>> net.add(Layer("output", 2))
'output'
>>> len(net.layers)
7
Note:
See :any:`Network` for more information.
"""
last_name = None
for layer in layers:
if not isinstance(layer.name, str):
raise Exception("layer_name should be a string")
if layer.name in self.layer_dict:
raise Exception("duplicate layer name '%s'" % layer.name)
## Automatic layer naming by pattern:
if "%d" in layer.name:
layer_names = [layer.name for layer in self.layers]
i = 1
while (layer.name % i) in layer_names:
i += 1
layer.name = layer.name % i
if hasattr(layer, "params") and "name" in layer.params:
layer.params["name"] = layer.name
self.layers.append(layer)
self.layer_dict[layer.name] = layer
## Layers have link back to network
layer.network = self
## Finally, override any config from network.config:
self.update_layer_from_config(layer)
## Return name, for possible connections
last_name = layer.name
return last_name
def update_layer_from_config(self, layer):
if layer.name in self.config["layers"]:
for item in self.config["layers"][layer.name]:
setattr(layer, item, self.config["layers"][layer.name][item])
[docs] def connect(self, from_layer_name : str=None, to_layer_name : str=None):
"""
Connect two layers together if called with arguments. If
called with no arguments, then it will make a sequential
run through the layers in order added.
Arguments:
from_layer_name: Name of layer where connect begins.
to_layer_name: Name of layer where connection ends.
If both from_layer_name and to_layer_name are None, then
all of the layers are connected sequentially in the order
added.
Examples:
>>> net = Network("XOR2")
>>> net.add(Layer("input", 2))
'input'
>>> net.add(Layer("hidden", 5))
'hidden'
>>> net.add(Layer("output", 2))
'output'
>>> net.connect()
>>> [layer.name for layer in net["input"].outgoing_connections]
['hidden']
"""
if len(self.layers) == 0:
raise Exception("no layers have been added")
if from_layer_name is not None and not isinstance(from_layer_name, str):
raise Exception("from_layer_name should be a string or None")
if to_layer_name is not None and not isinstance(to_layer_name, str):
raise Exception("to_layer_name should be a string or None")
if from_layer_name is None and to_layer_name is None:
if (any([layer.outgoing_connections for layer in self.layers]) or
any([layer.incoming_connections for layer in self.layers])):
raise Exception("layers already have connections")
for i in range(len(self.layers) - 1):
self.connect(self.layers[i].name, self.layers[i+1].name)
else:
if from_layer_name == to_layer_name:
raise Exception("self connections are not allowed")
if not isinstance(from_layer_name, str):
raise Exception("from_layer_name should be a string")
if from_layer_name not in self.layer_dict:
raise Exception('unknown layer: %s' % from_layer_name)
if not isinstance(to_layer_name, str):
raise Exception("to_layer_name should be a string")
if to_layer_name not in self.layer_dict:
raise Exception('unknown layer: %s' % to_layer_name)
from_layer = self.layer_dict[from_layer_name]
to_layer = self.layer_dict[to_layer_name]
## NOTE: these could be allowed, I guess:
if to_layer in from_layer.outgoing_connections:
raise Exception("attempting to duplicate connection: %s to %s" % (from_layer_name, to_layer_name))
from_layer.outgoing_connections.append(to_layer)
if from_layer in to_layer.incoming_connections:
raise Exception("attempting to duplicate connection: %s to %s" % (to_layer_name, from_layer_name))
## Check for input going to a Dense to warn:
if from_layer.shape and len(from_layer.shape) > 1 and to_layer.CLASS.__name__ == "Dense":
print("WARNING: connected multi-dimensional input layer '%s' to layer '%s'; consider adding a FlattenLayer between them" % (
from_layer.name, to_layer.name), file=sys.stderr)
to_layer.incoming_connections.append(from_layer)
## Post connection hooks:
to_layer.on_connect("to", from_layer)
from_layer.on_connect("from", to_layer)
self._reset_layer_metadata()
self.connections.append((from_layer_name, to_layer_name))
def _reset_layer_metadata(self):
"""
Reset num_input_layers, banks, ordering, etc.
"""
## Compute input/target layers:
input_layers = [layer for layer in self.layers if layer.kind() == "input"]
self.num_input_layers = len(input_layers)
self.input_bank_order = [layer.name for layer in input_layers]
target_layers = [layer for layer in self.layers if layer.kind() == "output"]
self.num_target_layers = len(target_layers)
self.output_bank_order = [layer.name for layer in target_layers]
## Set up a layer's input names, as best possible:
sequence = topological_sort(self, self.layers)
for layer in sequence:
if layer.kind() == 'input':
layer.input_names = set([layer.name])
else:
if len(layer.incoming_connections) == 1:
layer.input_names = layer.incoming_connections[0].input_names
else:
layer.input_names = set([item for sublist in
[incoming.input_names for incoming in layer.incoming_connections]
for item in sublist])
[docs] def depth(self):
"""
Find the depth of the network graph of connections.
"""
max_depth = 0
for in_layer_name in self.input_bank_order:
for out_layer_name in self.output_bank_order:
path = find_path(self, in_layer_name, out_layer_name)
if path:
max_depth = max(len(list(path)) + 1, max_depth)
return max_depth
[docs] def summary(self):
"""
Print out a summary of the network.
"""
if self.model:
self.model.summary()
else:
print("Compile network in order to see summary.")
[docs] def reset(self, clear=False, **overrides):
"""
Reset all of the weights/biases in a network.
The magnitude is based on the size of the network.
"""
self.epoch_count = 0
self.history = []
self.weight_history.clear()
self.build_model()
if "seed" in overrides:
set_random_seed(overrides["seed"])
del overrides["seed"]
## Reset all weights and biases:
## Recompile with possibly new options:
if clear:
self.compile_options = {}
self.compile_options.update(overrides)
self.compile_model(**self.compile_options)
[docs] def evaluate(self, batch_size=None, show=False, show_inputs=True, show_targets=True,
kverbose=0, sample_weight=None, steps=None, tolerance=None, force=False,
max_col_width=15, select=None, verbose=0):
"""
Evaluate the train and/or test sets.
If show is True, then it will show details for each
training/test pair, the amount of detail then determined by
show_inputs, and show_targets.
If force is True, then it will show all patterns, even if there are many.
Set max_col_width to change the maximum width that any one column can be.
"""
if len(self.dataset) == 0:
print("WARNING: nothing to test; use network.dataset.load()",
file=sys.stderr)
return
if tolerance is not None:
self.tolerance = tolerance
if select is not None:
if isinstance(select, int):
select = (select,)
slice_select = slice(*select) if select is not None else slice(len(self.dataset))
print("%s:" % self.name) ## network name
if 0 < self.dataset._split <= 1 and select is None:
size, num_train, num_test = self.dataset._get_split_sizes()
results = self.model.evaluate(self.dataset._inputs, self.dataset._targets,
batch_size=batch_size, verbose=kverbose,
sample_weight=sample_weight, steps=steps)
print("Training Data Results:")
if show:
self._evaluate_range(slice(num_train), show_inputs, show_targets, batch_size,
kverbose, sample_weight, steps, force, max_col_width)
results = self.model.evaluate([self.dataset._inputs[bank][:num_train]
for bank in range(self.num_input_layers)],
[self.dataset._targets[bank][:num_train]
for bank in range(self.num_target_layers)],
batch_size=batch_size, verbose=kverbose,
sample_weight=sample_weight, steps=steps)
for i in range(len(self.model.metrics_names)):
print(" %15s: %10s" % (self.model.metrics_names[i], self.pf(results[i])))
print(" %15s: %10s" % ("Total", num_train))
print()
print("Testing Data Results:")
if show:
self._evaluate_range(slice(num_train, num_train + num_test), show_inputs, show_targets, batch_size,
kverbose, sample_weight, steps, force, max_col_width)
results = self.model.evaluate([self.dataset._inputs[bank][num_train:]
for bank in range(self.num_input_layers)],
[self.dataset._targets[bank][num_train:]
for bank in range(self.num_target_layers)],
batch_size=batch_size, verbose=kverbose,
sample_weight=sample_weight, steps=steps)
for i in range(len(self.model.metrics_names)):
print(" %15s: %10s" % (self.model.metrics_names[i], self.pf(results[i])))
print(" %15s: %10s" % ("Total", num_test))
if verbose == -1:
## FIXME: combine results from train and test
return results
else: # all (or select data):
if select is None:
print("All Data Results:")
else:
print("Selected Data Results: range(%s)" % (", ".join([str(n) for n in select])))
if show:
self._evaluate_range(slice_select, show_inputs, show_targets, batch_size,
kverbose, sample_weight, steps, force, max_col_width)
if select is None:
results = self.model.evaluate(self.dataset._inputs, self.dataset._targets,
batch_size=batch_size, verbose=kverbose,
sample_weight=sample_weight, steps=steps)
else:
results = self.model.evaluate([bank[slice_select] for bank in self.dataset._inputs],
[bank[slice_select] for bank in self.dataset._targets],
batch_size=batch_size, verbose=kverbose,
sample_weight=sample_weight, steps=steps)
for i in range(len(self.model.metrics_names)):
print(" %15s: %10s" % (self.model.metrics_names[i], self.pf(results[i])))
print(" %15s: %10s" % ("Total", len(self.dataset)))
if verbose == -1:
return results
def _evaluate_range(self, slice, show_inputs, show_targets, batch_size,
kverbose, sample_weight, steps, force, max_col_width):
def split_heading(name, fill=""):
if "_" not in name:
return [fill, name]
else:
return name.rsplit("_", 1)
column_count = 1 # the row number
if show_inputs:
column_count += self.num_input_layers
if show_targets:
column_count += self.num_target_layers
column_count += len(self.model.metrics_names)
cols = [[] for i in range(column_count)]
for i in range(len(self.dataset))[slice]:
result = self.model.evaluate([self.dataset._inputs[bank][i:i+1]
for bank in range(self.num_input_layers)],
[self.dataset._targets[bank][i:i+1]
for bank in range(self.num_target_layers)],
batch_size=batch_size, verbose=kverbose,
sample_weight=sample_weight, steps=steps)
cols[0].append(str(i))
col = 1
if show_inputs:
for j in range(self.num_input_layers):
cols[col].append(self.pf(self.dataset._inputs[j][i], max_line_width=max_col_width))
col += 1
if show_targets:
for j in range(self.num_target_layers):
cols[col].append(self.pf(self.dataset._targets[j][i], max_line_width=max_col_width))
col += 1
for item in result:
cols[col].append(self.pf(item))
col += 1
headings = [["", "#"]]
if show_inputs:
headings.extend([split_heading(name) for name in self.input_bank_order])
if show_targets:
headings.extend([split_heading(name, "target") for name in self.output_bank_order])
headings.extend([split_heading(name) for name in self.model.metrics_names])
correct = {}
for c in range(len(cols)):
if headings[c][1].endswith("acc"):
correct[c] = 0
for r in range(len(cols[c])):
if cols[c][r] == self.pf(0):
cols[c][r] = "x"
elif cols[c][r] == self.pf(1):
cols[c][r] = "correct"
correct[c] += 1
column_widths = [max([len(c) for c in row]) if row else 0 for row in cols]
column_widths = [max([column_widths[c], len(headings[c][0]), len(headings[c][1])]) for c in range(len(column_widths))]
for c in range(column_count):
print("-" * column_widths[c], end="-+-")
print()
for h in range(len(headings)):
print(("%" + str(column_widths[h]) + "s") % headings[h][0], end=" | ")
print()
for h in range(len(headings)):
print(("%" + str(column_widths[h]) + "s") % headings[h][1], end=" | ")
print()
for c in range(column_count):
print("-" * column_widths[c], end="-+-")
print()
for i in range(len(cols[0])): # at least one column
if not force and i == 15:
print()
print("... skipping some rows ...")
print()
if force or i < 15:
for c in range(column_count):
if len(cols[c]) > i:
print(("%" + str(column_widths[c]) + "s") % cols[c][i], end=" | ")
else:
print(("%" + str(column_widths[c]) + "s") % "", end=" | ")
print() # end of line
## Totals:
for c in range(column_count):
print("-" * column_widths[c], end="-+-")
print()
row = ""
for c in range(column_count):
if c in correct:
row += (("%" + str(column_widths[c]) + "s") % correct[c]) + " | "
else:
row += (("%" + str(column_widths[c]) + "s") % "") + " "
print("Correct:", row[9:])
for c in range(column_count):
if c in correct:
print("-" * column_widths[c], end="-+-")
else:
print("-" * column_widths[c], end="---")
print()
[docs] def evaluate_and_label(self, batch_size=32, tolerance=None,
select=None):
"""
Test the network on the dataset, and categorize the results.
"""
tolerance = tolerance if tolerance is not None else self.tolerance
if select is not None:
if isinstance(select, int):
select = (select,)
slice_select = slice(*select) if select is not None else slice(len(self.dataset))
if 0 < self.dataset._split < 1.0 and select is None: ## special case; use entire set
inputs = self.dataset._inputs
targets = self.dataset._targets
else:
inputs = [column[slice_select] for column in self.dataset._inputs]
targets = [column[slice_select] for column in self.dataset._targets]
outputs = self.model.predict(inputs, batch_size=batch_size)
correct = self.compute_correct([outputs], targets, tolerance)
categories = {}
for c, i in enumerate(range(len(self.dataset))[slice_select]):
label_i = self.dataset.labels[i] if i < len(self.dataset.labels) else "Unlabeled"
label = "%s (%s)" % (label_i, "correct" if correct[c] else "wrong")
if not label in categories:
categories[label] = []
categories[label].append(self.dataset.inputs[i])
return sorted(categories.items())
[docs] def compute_correct(self, outputs, targets, tolerance=None):
"""
Both are np.arrays. Return [True, ...].
"""
tolerance = tolerance if tolerance is not None else self.tolerance
correct = []
for r in range(len(outputs[0])):
row = []
for c in range(len(outputs)):
row.extend(list(map(lambda v: v <= tolerance, np.abs(outputs[c][r] - targets[c][r]))))
correct.append(all(row))
return correct
[docs] def train_one(self, inputs, targets, batch_size=32, update_pictures=False):
"""
Train on one input/target pair.
Inputs should be a vector if one input bank, or a list of vectors
if more than one input bank.
Targets should be a vector if one output bank, or a list of vectors
if more than one output bank.
Alternatively, inputs and targets can each be a dictionary mapping
bank to vector.
Examples:
>>> from conx import Network, Layer, Dataset
>>> net = Network("XOR", 2, 2, 1, activation="sigmoid")
>>> net.compile(error='mean_squared_error',
... optimizer="sgd", lr=0.3, momentum=0.9)
>>> ds = [[[0, 0], [0]],
... [[0, 1], [1]],
... [[1, 0], [1]],
... [[1, 1], [0]]]
>>> net.dataset.load(ds)
>>> out, err = net.train_one({"input": [0, 0]},
... {"output": [0]})
>>> len(out)
1
>>> len(err)
1
>>> from conx import Network, Layer, Dataset
>>> net = Network("XOR2")
>>> net.add(Layer("input%d", shape=1))
'input1'
>>> net.add(Layer("input%d", shape=1))
'input2'
>>> net.add(Layer("hidden%d", shape=2, activation="sigmoid"))
'hidden1'
>>> net.add(Layer("hidden%d", shape=2, activation="sigmoid"))
'hidden2'
>>> net.add(Layer("shared-hidden", shape=2, activation="sigmoid"))
'shared-hidden'
>>> net.add(Layer("output%d", shape=1, activation="sigmoid"))
'output1'
>>> net.add(Layer("output%d", shape=1, activation="sigmoid"))
'output2'
>>> net.connect("input1", "hidden1")
>>> net.connect("input2", "hidden2")
>>> net.connect("hidden1", "shared-hidden")
>>> net.connect("hidden2", "shared-hidden")
>>> net.connect("shared-hidden", "output1")
>>> net.connect("shared-hidden", "output2")
>>> net.compile(error='mean_squared_error',
... optimizer="sgd", lr=0.3, momentum=0.9)
>>> ds = [([[0],[0]], [[0],[0]]),
... ([[0],[1]], [[1],[1]]),
... ([[1],[0]], [[1],[1]]),
... ([[1],[1]], [[0],[0]])]
>>> net.dataset.load(ds)
>>> net.compile(error='mean_squared_error',
... optimizer="sgd", lr=0.3, momentum=0.9)
>>> out, err = net.train_one({"input1": [0], "input2": [0]},
... {"output1": [0], "output2": [0]})
>>> len(out)
2
>>> len(err)
2
>>> net.dataset._num_input_banks()
2
>>> net.dataset._num_target_banks()
2
"""
if isinstance(inputs, dict):
inputs = [inputs[name] for name in self.input_bank_order]
if self.num_input_layers == 1:
inputs = inputs[0]
if isinstance(targets, dict):
targets = [targets[name] for name in self.output_bank_order]
if self.num_target_layers == 1:
targets = targets[0]
pairs = [(inputs, targets)]
if self.num_input_layers == 1:
ins = np.array([pair[0] for pair in pairs], "float32")
else:
ins = []
for i in range(len(pairs[0][0])):
ins.append(np.array([pair[0][i] for pair in pairs], "float32"))
if self.num_target_layers == 1:
targs = np.array([pair[1] for pair in pairs], "float32")
else:
targs = []
for i in range(len(pairs[0][1])):
targs.append(np.array([pair[1][i] for pair in pairs], "float32"))
if isinstance(self.dataset, VirtualDataset):
history = self.model.fit_generator(
generator=self.dataset.get_generator(batch_size),
validation_data=self.dataset.get_validation_generator(batch_size),
epochs=1, verbose=0)
else:
history = self.model.fit(ins, targs, epochs=1, verbose=0, batch_size=batch_size)
## may need to update history?
outputs = self.propagate(inputs, batch_size=batch_size, update_pictures=update_pictures)
if len(self.output_bank_order) == 1:
errors = (np.array(outputs) - np.array(targets)).tolist()
else:
errors = []
for bank in range(len(self.output_bank_order)):
errors.append((np.array(outputs[bank]) - np.array(targets[bank])).tolist())
if update_pictures:
if self.config["show_targets"]:
if len(self.output_bank_order) == 1:
self.display_component([targets], "targets")
else:
self.display_component(targets, "targets")
if self.config["show_errors"]: ## min max is error:
if len(self.output_bank_order) == 1:
self.display_component([errors], "errors", minmax=(-1, 1))
else:
errors = []
for bank in range(len(self.output_bank_order)):
errors.append( np.array(outputs[bank]) - np.array(targets[bank]))
self.display_component(errors, "errors", minmax=(-1, 1))
return (outputs, errors)
[docs] def retrain(self, **overrides):
"""
Call network.train() again with same options as last call, unless overrides.
"""
for key in overrides:
if key not in self.train_options:
raise Exception("Unknown train option: %s" % key)
self.train_options.update(overrides)
self.train(**self.train_options)
def _compute_result_acc(self, results):
"""
Compute accuracy from results. There are no val_ items here.
"""
if "acc" in results: return results["acc"]
values = [results[key] for key in results if key.endswith("_acc")]
if len(values) > 0:
return sum(values)/len(values)
else:
raise Exception("attempting to find accuracy in results, but there aren't any")
[docs] def test_dataset_ranges(self):
"""
Test the dataset ranges to see if in range of activation functions.
"""
if len(self.dataset.targets) == 0:
return # nothing to test
for index in range(len(self.dataset._targets)):
if len(self.dataset._targets[index].shape) > 2:
self.warn_once("WARNING: network '%s' target bank #%s has a multi-dimensional shape; is this correct?" %
(self.name, index))
for index in range(len(self.output_bank_order)):
layer_name = self.output_bank_order[index]
if self[layer_name].activation == "linear":
continue
lmin, lmax = self[layer_name].get_act_minmax()
# test dataset min to see if in range of act output:
if self[layer_name].activation is not None:
if not (lmin <= self.dataset._targets_range[index][0] <= lmax):
self.warn_once("WARNING: output bank '%s' has activation function, '%s', that is not consistent with minimum value of targets" %
(layer_name, self[layer_name].activation))
# test dataset min to see if in range of act output:
if not (lmin <= self.dataset._targets_range[index][1] <= lmax):
self.warn_once("WARNING: output bank '%s' has activation function, '%s', that is not consistent with maximum value of targets" %
(layer_name, self[layer_name].activation))
[docs] def train(self, epochs=1, accuracy=None, error=None, batch_size=32,
report_rate=1, verbose=1, kverbose=0, shuffle=True, tolerance=None,
class_weight=None, sample_weight=None, use_validation_to_stop=False,
plot=True, record=0, callbacks=None, kcallbacks=None, save=False):
"""
Train the network.
To stop before number of epochs, give either error=VALUE, or accuracy=VALUE.
Normally, it will check training info to stop, unless you
use_validation_to_stop = True.
Arguments:
epochs (int): Maximum number of epochs (sweeps) through
training data.
accuracy (float): Value of correctness (0.0 - 1.0) to attain in
order to stop. Depends on tolerance to determine accuracy.
error (float): Error to attain in order to stop. Depends on error
function given in `Network.compile`.
batch_size (int): Size of batch to train on.
report_rate (int): Rate of feedback on learning, in epochs.
verbose (int): Level of feedback on training. verbose=0 gives no
feedback, but returns (epoch_count, result)
kverbose (int): Level of feedback from Keras.
shuffle (bool or str): Should the training data be shuffled?
'batch' shuffles in batch-sized chunks.
tolerance (float): The maximum difference between target and output
that should be considered correct.
class_weight (float):
sample_weight (float):
use_validation_to_stop (bool): If `True`, then accuracy and error will
use the validation set rather than the training set.
plot (bool): If `True`, then the feedback will be shown in graphical form.
record (int): If 'record != 0', the weights will be saved every record
number of epochs.
callbacks (list): A list of (str, function) where str is 'on_batch_begin',
'on_batch_end', 'on_epoch_begin', 'on_epoch_end', 'on_train_begin',
or 'on_train_end', and function takes a network, and other
parameters, depending on str.
save (bool): If `True`, then the network is saved at end, whether
interrupted or not.
Returns:
tuple: (epoch_count, result) if verbose == 0
None: if verbose != 0
Examples:
>>> net = Network("Train Test", 1, 3, 1)
>>> net.compile(error="mse", optimizer="rmsprop")
>>> net.dataset.append([0.0], [1.0])
>>> net.dataset.append([1.0], [0.0])
>>> net.train(plot=False) # doctest: +ELLIPSIS
Evaluating initial training metrics...
Training...
...
"""
self.train_options = {
"epochs": epochs,
"accuracy": accuracy,
"error": error,
"batch_size": batch_size,
"report_rate": report_rate,
"verbose": verbose,
"shuffle": shuffle,
"class_weight": class_weight,
"sample_weight": sample_weight,
"tolerance": tolerance,
"use_validation_to_stop": use_validation_to_stop,
"plot": plot,
"record": record,
"callbacks": callbacks,
"save": save,
}
if plot:
import matplotlib
mpl_backend = matplotlib.get_backend()
else:
mpl_backend = None
if self.model is None:
raise Exception("need to build and compile network")
if self.model.optimizer is None:
raise Exception("need to compile network")
if not isinstance(report_rate, numbers.Integral) or report_rate < 1:
raise Exception("bad report rate: %s" % (report_rate,))
if not (isinstance(batch_size, numbers.Integral) or batch_size is None):
raise Exception("bad batch size: %s" % (batch_size,))
## Test for targets in range of activation function:
self.test_dataset_ranges()
if epochs == 0: return
if len(self.dataset.inputs) == 0:
print("No training data available")
return
if use_validation_to_stop:
if (self.dataset._split == 0):
print("Attempting to use validation to stop, but Network.dataset.split() is 0")
return
elif ((accuracy is None) and (error is None)):
print("Attempting to use validation to stop, but neither accuracy nor error was set")
return
self._need_to_show_headings = True
if tolerance is not None:
if accuracy is None:
raise Exception("tolerance given but unknown accuracy")
K.set_value(self._tolerance, tolerance)
## Going to need evaluation on training set in any event:
if self.dataset._split == 1.0: ## special case; use entire set
inputs = self.dataset._inputs
targets = self.dataset._targets
else:
## need to split; check format based on output banks:
length = len(self.dataset.train_targets)
targets = [column[:length] for column in self.dataset._targets]
inputs = [column[:length] for column in self.dataset._inputs]
if len(self.history) > 0:
results = self.history[-1]
else:
if verbose > 0:
print("Evaluating initial training metrics...")
values = self.model.evaluate(inputs, targets, batch_size=batch_size, verbose=kverbose)
if not isinstance(values, list): # if metrics is just a single value
values = [values]
results = {metric: value for metric,value in zip(self.model.metrics_names, values)}
results_acc = self._compute_result_acc(results)
## look at split, use validation subset:
if self.dataset._split == 0.0: ## None
val_results = {}
elif self.dataset._split == 1.0: ## special case; use entire set; already done!
val_results = {"val_%s" % key: as_sum(results[key]) for key in results}
else: # split is greater than 0, less than 1
if verbose > 0:
print("Evaluating initial validation metrics...")
## need to split; check format based on output banks:
length = len(self.dataset.test_targets)
targets = [column[-length:] for column in self.dataset._targets]
inputs = [column[-length:] for column in self.dataset._inputs]
val_values = self.model.evaluate(inputs, targets, batch_size=batch_size, verbose=kverbose)
val_results = {"val_%s" % metric: value for metric,value in zip(self.model.metrics_names, val_values)}
if val_results:
val_results_acc = self._compute_result_acc(val_results)
if use_validation_to_stop:
if ((self.dataset._split > 0) and
((accuracy is not None) or (error is not None))):
need_to_train = True
if ((accuracy is not None) and (val_results_acc >= accuracy)):
print("No training required: validation accuracy already to desired value")
need_to_train = False
elif ((error is not None) and (val_results["loss"] <= error)):
print("No training required: validation error already to desired value")
need_to_train = False
if not need_to_train:
print("Training dataset status:")
self.report_epoch(self.epoch_count, results)
print("Testing dataset status:")
self.report_epoch(self.epoch_count, val_results)
return (self.epoch_count, results) if verbose == 0 else None
else: ## regular training to stop, use_validation_to_stop is False
if ((accuracy is not None) and (results_acc >= accuracy)):
print("No training required: accuracy already to desired value")
print("Training dataset status:")
self.report_epoch(self.epoch_count, results)
return (self.epoch_count, results) if verbose == 0 else None
elif ((error is not None) and (as_sum(results["loss"]) <= error)):
print("No training required: error already to desired value")
print("Training dataset status:")
self.report_epoch(self.epoch_count, results)
return (self.epoch_count, results) if verbose == 0 else None
## Ok, now we know we need to train:
results.update(val_results)
if len(self.history) == 0:
self.history = [results]
if record:
self.weight_history[0] = self.get_weights()
if verbose > 0:
print("Training...")
if self.in_console(mpl_backend) and verbose > 0:
self.report_epoch(self.epoch_count, self.history[-1])
interrupted = False
if kcallbacks is None:
kcallbacks = []
kcallbacks = kcallbacks + [
History(),
ReportCallback(self, verbose, report_rate, mpl_backend, record),
]
if accuracy is not None:
kcallbacks.append(StoppingCriteria("acc", ">=", accuracy, use_validation_to_stop))
if error is not None:
kcallbacks.append(StoppingCriteria("loss", "<=", error, use_validation_to_stop))
if plot:
pc = PlotCallback(self, report_rate, mpl_backend)
kcallbacks.append(pc)
if callbacks is not None:
for (on_method, function) in callbacks:
kcallbacks.append(FunctionCallback(self, on_method, function))
with _InterruptHandler(self) as handler:
if isinstance(self.dataset, VirtualDataset):
result = self.model.fit_generator(generator=self.dataset.get_generator(batch_size),
validation_data=self.dataset.get_validation_generator(batch_size),
epochs=epochs,
callbacks=kcallbacks,
shuffle=shuffle,
class_weight=class_weight,
verbose=kverbose)
elif self.dataset._split == 1:
result = self.model.fit(self.dataset._inputs,
self.dataset._targets,
batch_size=batch_size,
epochs=epochs,
validation_data=(self.dataset._inputs,
self.dataset._targets),
callbacks=kcallbacks,
shuffle=shuffle,
class_weight=class_weight,
sample_weight=sample_weight,
verbose=kverbose)
else:
result = self.model.fit(self.dataset._inputs,
self.dataset._targets,
batch_size=batch_size,
epochs=epochs,
validation_split=self.dataset._split,
callbacks=kcallbacks,
shuffle=shuffle,
class_weight=class_weight,
sample_weight=sample_weight,
verbose=kverbose)
if plot:
pc.on_epoch_end(-1)
if handler.interrupted:
interrupted = True
if interrupted:
if verbose:
print("Interrupted! Cleaning up...")
last_epoch = self.history[-1]
if record:
self.weight_history[self.epoch_count] = self.get_weights()
assert len(self.history) == self.epoch_count+1 # +1 is for epoch 0
if verbose:
print("=" * 56)
self.report_epoch(self.epoch_count, last_epoch)
if save:
if verbose:
print("Saving network... ", end="")
self.save()
if verbose:
print("Saved!")
if interrupted:
raise KeyboardInterrupt
if verbose == 0:
return (self.epoch_count, self.history[-1])
elif verbose == -1:
return result
[docs] def report_epoch(self, epoch_count, results):
"""
Print out stats for the epoch.
"""
if self._need_to_show_headings:
h1 = " "
h2 = "Epochs "
h3 = "------ "
if 'loss' in results:
h1 += "| Training "
h2 += "| Error "
h3 += "| --------- "
if 'acc' in results:
h1 += "| Training "
h2 += "| Accuracy "
h3 += "| --------- "
if 'val_loss' in results:
h1 += "| Validate "
h2 += "| Error "
h3 += "| --------- "
if 'val_acc' in results:
h1 += "| Validate "
h2 += "| Accuracy "
h3 += "| --------- "
for other in sorted(results):
if other not in ["loss", "acc", "val_loss", "val_acc"]:
if not other.endswith("_loss"):
w1, w2 = other.replace("_", " ").split(" ", 1)
maxlen = max(len(w1), len(w2), 9)
h1 += "| " + (("%%%ds " % maxlen) % w1)
h2 += "| " + (("%%%ds " % maxlen) % w2)
h3 += "| %s " % ("-" * (maxlen))
print(h1)
print(h2)
print(h3)
self._need_to_show_headings = False
s = "#%5d " % (epoch_count,)
if 'loss' in results:
s += "| %9.5f " % (as_sum(results['loss']),)
if 'acc' in results:
s += "| %9.5f " % (results['acc'],)
if 'val_loss' in results:
s += "| %9.5f " % (as_sum(results['val_loss']),)
if 'val_acc' in results:
s += "| %9.5f " % (results['val_acc'],)
for other in sorted(results):
if other not in ["loss", "acc", "val_loss", "val_acc"]:
if not other.endswith("_loss"):
s += "| %9.5f " % as_sum(results[other])
print(s)
[docs] def get_dataset(self, dataset_name):
"""
Get the named dataset and set this network's dataset
to it.
>>> import conx as cx
>>> net = cx.Network("MNIST-Autoencoder")
>>> net.add(cx.ImageLayer("input", (28,28), 1),
... cx.Conv2DLayer("conv", 3, (5,5), activation="relu"),
... cx.MaxPool2DLayer("pool", pool_size=(2,2)),
... cx.FlattenLayer("flatten"),
... cx.Layer("hidden3", 25, activation="relu"),
... cx.Layer("output", (28,28,1), activation="sigmoid"))
'output'
>>> net.connect()
>>> net.compile(error="mse", optimizer="adam")
>>> net.get_dataset("mnist")
"""
self.set_dataset(Dataset.get(dataset_name))
[docs] def set_dataset(self, dataset):
"""
Set the dataset for the network.
Examples:
>>> from conx import Dataset
>>> data = [[[0, 0], [0]],
... [[0, 1], [1]],
... [[1, 0], [1]],
... [[1, 1], [0]]]
>>> ds = Dataset()
>>> ds.load(data)
>>> net = Network("Set Dataset Test", 2, 2, 1)
>>> net.compile(error="mse", optimizer="adam")
>>> net.set_dataset(ds)
"""
if not isinstance(dataset, Dataset):
raise Exception("Network.set_dataset() takes a Dataset object")
if dataset.network is not None:
print("INFO: using dataset on a new network, replacing old network", file=sys.stderr)
self.dataset = dataset
self.dataset.network = self
self.test_dataset_ranges()
self.dataset._verify_network_dataset_match()
[docs] def set_activation(self, layer_name, activation):
"""
Swap activation function of a layer after compile.
"""
from keras.models import load_model
import keras.activations
import tempfile
if not isinstance(layer_name, str):
raise Exception("layer_name should be a string")
if not isinstance(activation, str):
activation = activation.__name__
acts = {
'relu': keras.activations.relu,
'sigmoid': keras.activations.sigmoid,
'linear': keras.activations.linear,
'softmax': keras.activations.softmax,
'tanh': keras.activations.tanh,
'elu': keras.activations.elu,
'selu': keras.activations.selu,
'softplus': keras.activations.softplus,
'softsign': keras.activations.softsign,
'hard_sigmoid': keras.activations.hard_sigmoid,
}
if self.model:
self[layer_name].keras_layer.activation = acts[activation]
self[layer_name].activation = activation
with tempfile.NamedTemporaryFile() as tf:
filename = tf.name
self.model.save(filename)
self.model = load_model(filename)
self._level_ordering = None
else:
raise Exception("can't change activation until after compile")
[docs] def get_weights_as_image(self, layer_name, colormap=None):
"""
Get the weights from the model.
>>> net = Network("Weight as Image Test", 2, 2, 5)
>>> net.compile(error="mse", optimizer="adam")
>>> net.get_weights_as_image("hidden") # doctest: +ELLIPSIS
<PIL.Image.Image image mode=RGBA size=2x2 at ...>
"""
from matplotlib import cm
if not isinstance(layer_name, str):
raise Exception("layer_name should be a string")
if self.model is None:
raise Exception("need to build network")
weights = [layer.get_weights() for layer in self.model.layers
if layer_name == layer.name][0]
weights = weights[0] # get the weight matrix, not the biases
vector = scale_output_for_image(weights, (-5,5), truncate=True)
if len(vector.shape) == 1:
vector = vector.reshape((1, vector.shape[0]))
size = self.config["pixels_per_unit"]
new_width = vector.shape[0] * size # in, pixels
new_height = vector.shape[1] * size # in, pixels
if colormap is None:
colormap = get_colormap() if self[layer_name].colormap is None else self[layer_name].colormap
try:
cm_hot = cm.get_cmap(colormap)
except:
cm_hot = cm.get_cmap("RdGy")
vector = cm_hot(vector)
vector = np.uint8(vector * 255)
image = PIL.Image.fromarray(vector)
image = image.resize((new_height, new_width))
return image
[docs] def get_weights(self, layer_name=None):
"""
Get the weights from a layer, or the entire model.
Examples:
>>> net = Network("Weight Test", 2, 2, 5)
>>> net.compile(error="mse", optimizer="adam")
>>> len(net.get_weights("input"))
0
>>> len(net.get_weights("hidden"))
2
>>> shape(net.get_weights("hidden")[0]) ## weights
(2, 2)
>>> shape(net.get_weights("hidden")[1]) ## biases
(2,)
>>> len(net.get_weights("output"))
2
>>> shape(net.get_weights("output")[0]) ## weights
(2, 5)
>>> shape(net.get_weights("output")[1]) ## biases
(5,)
>>> net = Network("Weight Get Test", 2, 2, 1, activation="sigmoid")
>>> net.compile(error="mse", optimizer="sgd")
>>> len(net.get_weights())
4
See also:
* `Network.to_array`
* `Network.from_array`
* `Network.get_weights_as_image`
"""
if self.model is None:
raise Exception("need to build network")
if layer_name is not None:
weights = [layer.get_weights() for layer in self.model.layers
if layer_name == layer.name][0]
return [m.tolist() for m in weights]
else:
return self.model.get_weights()
[docs] def propagate(self, input, batch_size=32, class_id=None,
update_pictures=False, sequence=False):
"""
Propagate an input (in human API) through the network.
If visualizing, the network image will be updated.
Inputs should be a vector if one input bank, or a list of vectors
if more than one input bank.
Alternatively, inputs can be a dictionary mapping
bank to vector.
>>> net = Network("Prop Test", 2, 2, 5)
>>> net.compile(error="mse", optimizer="adam")
>>> len(net.propagate([0.5, 0.5]))
5
>>> len(net.propagate({"input": [1, 1]}))
5
"""
if self.model is None:
raise Exception("Need to build network first")
if isinstance(input, dict):
input = [input[name] for name in self.input_bank_order]
if self.num_input_layers == 1:
input = input[0]
elif isinstance(input, PIL.Image.Image):
input = image_to_array(input)
## End of input setup
if not is_array_like(input):
raise Exception("inputs should be an array")
if sequence:
outputs = self.model.predict(np.array(input), batch_size=batch_size)
elif self.num_input_layers == 1:
outputs = self.model.predict(np.array([input]), batch_size=batch_size)
else:
inputs = [np.array([x], "float32") for x in input]
outputs = self.model.predict(inputs, batch_size=batch_size)
## Shape the outputs:
if sequence:
if isinstance(outputs, list):
outputs = [bank.tolist() for bank in outputs]
else:
outputs = outputs.tolist()
elif self.num_target_layers == 1:
shape = self[self.output_bank_order[0]].shape
try:
outputs = outputs[0].reshape(shape).tolist()
except:
outputs = outputs[0].tolist() # can't reshape; maybe a dynamically changing output
else:
shapes = [self[layer_name].shape for layer_name in self.output_bank_order]
## FIXME: may not be able to reshape; dynamically changing output
outputs = [outputs[i].reshape(shapes[i]).tolist() for i in range(len(self.output_bank_order))]
if update_pictures:
for layer in self.layers:
self.propagate_to(layer.name, input, batch_size, class_id=class_id,
update_pictures=update_pictures, sequence=sequence, update_path=False)
return outputs
[docs] def propagate_from(self, layer_name, input, output_layer_names=None,
batch_size=32, update_pictures=False, sequence=False):
"""
Propagate activations from the given layer name to the output layers.
"""
if not isinstance(layer_name, str):
raise Exception("layer_name should be a string")
if layer_name not in self.layer_dict:
raise Exception("No such layer '%s'" % layer_name)
if isinstance(input, dict):
input = [input[name] for name in self.input_bank_order]
if self.num_input_layers == 1:
input = input[0]
elif isinstance(input, PIL.Image.Image):
input = image_to_array(input)
## End of input setup
if not is_array_like(input):
raise Exception("inputs should be an array")
if output_layer_names is None:
if self.num_target_layers == 1:
output_layer_names = [layer.name for layer in self.layers if layer.kind() == "output"]
else:
output_layer_names = self.output_bank_order
else:
if isinstance(output_layer_names, str):
output_layer_names = [output_layer_names]
outputs = []
for output_layer_name in output_layer_names:
# We should be able to get the prop_from model:
## FIXME: could be multiple paths
prop_model = self.prop_from_dict.get((layer_name, output_layer_name), None)
if sequence:
inputs = input
else:
inputs = np.array([input])
if prop_model is not None:
outputs.append([list(x) for x in prop_model.predict(inputs)][0])
## FYI: outputs not shaped
if update_pictures:
## Update from start to rest of graph
if dynamic_pictures_check():
## viz this layer:
if self[layer_name].visible:
image = self[layer_name].make_image(inputs, config=self.config)
data_uri = self._image_to_uri(image)
class_id_name = "%s_%s" % (self.name, layer_name)
if self.config["svg_rotate"]:
class_id_name += "-rotated"
if self.debug: print("propagate_from 1: class_id_name:", class_id_name)
dynamic_pictures_send({'class': class_id_name, "xlink:href": data_uri})
for output_layer_name in output_layer_names:
path = find_path(self, layer_name, output_layer_name)
if path is not None:
for layer in path:
if not layer.visible:
continue
if (layer_name, layer.name) not in self.prop_from_dict:
continue
## FIXME: could be multiple paths
model = self.prop_from_dict[(layer_name, layer.name)]
vector = model.predict(inputs)[0]
## FYI: outputs not shaped
image = layer.make_image(vector, config=self.config)
data_uri = self._image_to_uri(image)
class_id_name = "%s_%s" % (self.name, layer.name)
if self.config["svg_rotate"]:
class_id_name += "-rotated"
if self.debug: print("propagate_from 2: class_id_name:", class_id_name)
dynamic_pictures_send({'class': class_id_name, "xlink:href": data_uri})
if sequence:
if isinstance(outputs, list):
outputs = [bank.tolist() for bank in outputs]
else:
outputs = outputs.tolist()
return outputs
index = 0
for layer_name in output_layer_names:
shape = self[layer_name].shape
if shape and all([isinstance(v, numbers.Integral) for v in shape]):
try:
outputs[index] = reshape(outputs[index], shape)
except:
outputs[index] = map(float, outputs[index])
else:
pass ## leave alone?
if len(output_layer_names) == 1 and len(outputs) > 0:
return outputs[0]
else:
return outputs
[docs] def display_component(self, vector, component, class_id=None, **opts):
"""
vector is a list, one each per output layer. component is "errors" or "targets"
"""
config = copy.copy(self.config)
config.update(opts)
output_names = self.output_bank_order
if dynamic_pictures_check():
for (target, layer_name) in zip(vector, output_names):
array = np.array(target)
if component == "targets":
colormap = self[layer_name].colormap
else:
colormap = get_error_colormap()
image = self[layer_name].make_image(array, colormap, config)
data_uri = self._image_to_uri(image)
if class_id is None:
class_id_name = "%s_%s" % (self.name, layer_name)
else:
class_id_name = "%s_%s" % (class_id, layer_name)
if self.debug: print("display_component: sending to class_id:", class_id_name + "_" + component)
dynamic_pictures_send({'class': class_id_name + "_" + component,
"xlink:href": data_uri})
[docs] def propagate_to(self, layer_name, inputs, batch_size=32, class_id=None,
update_pictures=False, update_path=True, sequence=False):
"""
Computes activation at a layer. Side-effect: updates live SVG.
Arguments:
layer_name (str) - name of layer to propagate activations to
inputs - list of numbers, vector to propagate
batch_size (int) - size of batch
update_pictures (bool) - send images to notebook SVG images
sequence (bool) - if True, treat inputs/outputs as sequences
"""
if not isinstance(layer_name, str):
raise Exception("layer_name should be a string")
if layer_name not in self.layer_dict:
raise Exception('unknown layer: %s' % (layer_name,))
if isinstance(inputs, dict):
inputs = [inputs[name] for name in self.input_bank_order]
if self.num_input_layers == 1:
inputs = inputs[0]
elif isinstance(inputs, PIL.Image.Image):
inputs = image_to_array(inputs)
## End of input setup
if not is_array_like(inputs):
raise Exception("inputs should be an array")
if sequence:
outputs = self[layer_name].model.predict(np.array(inputs), batch_size=batch_size)
elif self.num_input_layers == 1:
outputs = self[layer_name].model.predict(np.array([inputs]), batch_size=batch_size)
else:
# get just inputs for this layer, in order:
vector = [np.array([inputs[self.input_bank_order.index(name)]]) for name in
self._get_sorted_input_names(self[layer_name].input_names)]
outputs = self[layer_name].model.predict(vector, batch_size=batch_size)
## output shaped below:
if update_pictures:
if dynamic_pictures_check():
if update_path: ## update the whole path, from all inputs to the layer_name, if a path
## don't repeat any updates, so keep track of what you have done:
updated = set([])
for input_layer_name in self.input_bank_order:
if input_layer_name not in updated:
image = self._propagate_to_image(input_layer_name, inputs, sequence=sequence)
data_uri = self._image_to_uri(image)
if class_id is None:
class_id_name = "%s_%s" % (self.name, input_layer_name)
else:
class_id_name = "%s_%s" % (class_id, input_layer_name)
if self.config["svg_rotate"]:
class_id_name += "-rotated"
if self.debug: print("propagate_to 1: sending to class_id_name:", class_id_name)
dynamic_pictures_send({'class': class_id_name, "xlink:href": data_uri})
updated.add(input_layer_name)
path = find_path(self, input_layer_name, layer_name)
if path is not None:
for layer in path:
if layer.visible and layer.model is not None:
if layer.name not in updated:
image = self._propagate_to_image(layer.name, inputs, sequence=sequence)
data_uri = self._image_to_uri(image)
if class_id is None:
class_id_name = "%s_%s" % (self.name, layer.name)
else:
class_id_name = "%s_%s" % (class_id, layer.name)
if self.config["svg_rotate"]:
class_id_name += "-rotated"
if self.debug: print("propagate_to 2: sending to class_id_name:", class_id_name)
dynamic_pictures_send({'class': class_id_name, "xlink:href": data_uri})
updated.add(layer.name)
else: # not the whole path, just to the layer_name
image = self._propagate_to_image(layer_name, inputs, sequence=sequence)
data_uri = self._image_to_uri(image)
if class_id is None:
class_id_name = "%s_%s" % (self.name, layer_name)
else:
class_id_name = "%s_%s" % (class_id, layer_name)
if self.config["svg_rotate"]:
class_id_name += "-rotated"
if self.debug: print("propagate_to 3: sending to class_id_name:", class_id_name)
dynamic_pictures_send({'class': class_id_name, "xlink:href": data_uri})
## Shape the outputs:
if sequence:
if isinstance(outputs, list):
outputs = [bank.tolist() for bank in outputs]
else:
outputs = outputs.tolist()
return outputs
shape = self[layer_name].shape
if shape and all([isinstance(v, numbers.Integral) for v in shape]):
try:
outputs = outputs[0].reshape(shape).tolist()
except:
outputs = outputs[0].tolist()
else:
outputs = outputs[0].tolist()
return outputs
def _layer_has_features(self, layer_name):
output_shape = self[layer_name].get_output_shape()
return (isinstance(output_shape, tuple) and len(output_shape) == 4)
[docs] def propagate_to_features(self, layer_name, inputs, cols=5, resize=None, scale=1.0,
html=True, size=None, display=True, class_id=None,
update_pictures=False, sequence=False, minmax=None):
"""
if html is True, then generate HTML, otherwise send images.
"""
from IPython.display import HTML
if isinstance(inputs, dict):
inputs = [inputs[name] for name in self.input_bank_order]
if self.num_input_layers == 1:
inputs = inputs[0]
elif isinstance(inputs, PIL.Image.Image):
inputs = image_to_array(inputs)
## End of input setup
if not is_array_like(inputs):
raise Exception("inputs should be an array")
if not isinstance(layer_name, str):
raise Exception("layer_name should be a string")
output_shape = self[layer_name].get_output_shape()
outputs = np.array(self.propagate_to(layer_name, inputs))
if html:
retval = """<table><tr>"""
orig_minmax = self[layer_name].minmax
if minmax:
self[layer_name].minmax = minmax
elif self[layer_name].kind() == "input":
#self[layer_name].minmax = (minimum(outputs), maximum(outputs))
pass
else:
pass # leave minmax alone
if self._layer_has_features(layer_name):
## move the channel to first index:
outputs = np.moveaxis(outputs, -1, 0)
for i in range(outputs.shape[0]):
image = self[layer_name].make_image(outputs[i])
if resize is not None:
image = image.resize(resize)
if scale != 1.0:
image = image.resize((int(image.size[0] * scale), int(image.size[1] * scale)))
data_uri = self._image_to_uri(image)
retval += """<td style="border: 1px solid black;"><img style="image-rendering: pixelated;" class="%s_%s_feature%s" src="%s"/><br/><center>Feature %s</center></td>""" % (
self.name, layer_name, i, data_uri, i)
if (i + 1) % cols == 0:
retval += """</tr><tr>"""
self[layer_name].minmax = orig_minmax
retval += "</tr></table>"
else: ## no features
image = self[layer_name].make_image(outputs)
if resize is not None:
image = image.resize(resize)
if scale != 1.0:
image = image.resize((int(image.size[0] * scale), int(image.size[1] * scale)))
new_size = list(image.size)
if image.size[0] < 100:
new_size[0] = 100
if image.size[1] < 100:
new_size[1] = 100
if new_size != image.size:
image = image.resize(new_size)
self[layer_name].minmax = orig_minmax
data_uri = self._image_to_uri(image)
retval += """<td style="border: 1px solid black;"><img style="image-rendering: pixelated;" class="%s_%s_feature%s" src="%s"/><br/><center>Details</center></td>""" % (self.name, layer_name, 0, data_uri)
retval += """</tr>"""
if display:
return HTML(retval)
else:
return retval
else: ## not html
if self._layer_has_features(layer_name):
orig_feature = self[layer_name].feature
for i in range(output_shape[3]):
self[layer_name].feature = i
## This should return in proper orientation, regardless of rotate setting:
image = self.propagate_to_image(layer_name, inputs, class_id=class_id,
update_pictures=update_pictures, sequence=sequence)
if resize is not None:
image = image.resize(resize)
if scale != 1.0:
image = image.resize((int(image.size[0] * scale), int(image.size[1] * scale)))
new_size = list(image.size)
if image.size[0] < 100:
new_size[0] = 100
if image.size[1] < 100:
new_size[1] = 100
if new_size != image.size:
image = image.resize(new_size)
data_uri = self._image_to_uri(image)
if dynamic_pictures_check():
dynamic_pictures_send({'class': "%s_%s_feature%s" % (self.name, layer_name, i), "src": data_uri})
self[layer_name].feature = orig_feature
else: ## no features
## This should return in proper orientation, regardless of rotate setting:
image = self.propagate_to_image(layer_name, inputs, class_id=class_id,
update_pictures=update_pictures, sequence=sequence)
if resize is not None:
image = image.resize(resize)
if scale != 1.0:
image = image.resize((int(image.size[0] * scale), int(image.size[1] * scale)))
data_uri = self._image_to_uri(image)
if dynamic_pictures_check():
dynamic_pictures_send({'class': "%s_%s_feature%s" % (self.name, layer_name, 0), "src": data_uri})
[docs] def propagate_to_image(self, layer_name, input, batch_size=32, resize=None, scale=1.0,
class_id=None, update_pictures=False, sequence=False, feature=None):
"""
Gets an image of activations at a layer. Always returns image in
proper orientation.
"""
orig_rotate = self.config["svg_rotate"]
self.config["svg_rotate"] = False
if feature is not None:
orig_feature = self[layer_name].feature
self[layer_name].feature = feature
image = self._propagate_to_image(layer_name, input, batch_size, resize, scale,
class_id, update_pictures, sequence)
self.config["svg_rotate"] = orig_rotate
if feature is not None:
self[layer_name].feature = orig_feature
return image
def _propagate_to_image(self, layer_name, input, batch_size=32, resize=None, scale=1.0,
class_id=None, update_pictures=False, sequence=False):
"""
Internal version. Draws to whatever rotation is set.
"""
if isinstance(input, dict):
input = [input[name] for name in self.input_bank_order]
if self.num_input_layers == 1:
input = input[0]
elif isinstance(input, PIL.Image.Image):
input = image_to_array(input)
## End of input setup
if not is_array_like(input):
raise Exception("inputs should be an array")
if not isinstance(layer_name, str):
raise Exception("layer_name should be a string")
outputs = self.propagate_to(layer_name, input, batch_size, class_id=class_id,
update_pictures=update_pictures, sequence=sequence)
array = np.array(outputs)
image = self[layer_name].make_image(array, config=self.config)
if resize is not None:
image = image.resize(resize)
if scale != 1.0:
image = image.resize((int(image.size[0] * scale), int(image.size[1] * scale)))
return image
[docs] def plot_activation_map(self, from_layer='input', from_units=(0,1), to_layer='output',
to_unit=0, colormap=None, default_from_layer_value=0,
resolution=None, act_range=(0,1), show_values=False, title=None,
scatter=None, symbols=None, default_symbol="o",
format=None, update_pictures=False):
"""
Plot the activations at a bank/unit given two input units.
"""
# first do some error checking
assert self[from_layer] is not None, "unknown layer: %s" % (from_layer,)
assert type(from_units) in (tuple, list) and len(from_units) == 2, \
"expected a pair of ints for the %s units but got %s" % (from_layer, from_units)
ix, iy = from_units
assert 0 <= ix < self[from_layer].size, "no such %s layer unit: %d" % (from_layer, ix)
assert 0 <= iy < self[from_layer].size, "no such %s layer unit: %d" % (from_layer, iy)
assert self[to_layer] is not None, "unknown layer: %s" % (to_layer,)
assert type(to_unit) is int, "expected an int for the %s unit but got %s" % (to_layer, to_unit)
assert 0 <= to_unit < self[to_layer].size, "no such %s layer unit: %d" % (to_layer, to_unit)
if colormap is None: colormap = get_colormap()
if plt is None:
raise Exception("matplotlib was not loaded")
act_min, act_max = self[from_layer].get_act_minmax() if act_range is None else act_range
out_min, out_max = self[to_layer].get_act_minmax()
if resolution is None:
resolution = (act_max - act_min) / 50 # 50x50 pixels by default
xmin, xmax, xstep = act_min, act_max, resolution
ymin, ymax, ystep = act_min, act_max, resolution
xspan = xmax - xmin
yspan = ymax - ymin
xpixels = int(xspan/xstep)+1
ypixels = int(yspan/ystep)+1
mat = np.zeros((ypixels, xpixels))
ovector = self[from_layer].make_dummy_vector(default_from_layer_value)
for row in range(ypixels):
for col in range(xpixels):
# (x,y) corresponds to lower left corner point of pixel
x = xmin + xstep*col
y = ymin + ystep*row
vector = copy.deepcopy(ovector)
vector[ix] = x
vector[iy] = y
activations = self.propagate_from(from_layer, vector, to_layer, update_pictures=update_pictures)
mat[row,col] = activations[to_unit]
fig, ax = plt.subplots()
axim = ax.imshow(mat, origin='lower', cmap=colormap, vmin=out_min, vmax=out_max)
if scatter is not None:
if isinstance(scatter, dict):
scatter = scatter["data"]
if len(scatter) == 2 and isinstance(scatter[0], str):
scatter = [scatter]
for (label, data) in scatter:
kwargs = {}
args = []
xs = [min(vector[0], act_max - .01) * xpixels for vector in data]
ys = [min(vector[1], act_max - .01) * ypixels for vector in data]
if label:
kwargs["label"] = label
symbol = get_symbol(label, symbols, default_symbol)
if symbol:
args.append(symbol)
ax.plot(xs, ys, *args, **kwargs)
ax.legend()
if title is not None:
ax.set_title("Activation of %s[%s]: %s" % (to_layer, to_unit, title))
else:
ax.set_title("Activation of %s[%s]" % (to_layer, to_unit))
ax.set_xlabel("%s[%s]" % (from_layer, ix))
ax.set_ylabel("%s[%s]" % (from_layer, iy))
ax.xaxis.tick_bottom()
ax.set_xticks([i*(xpixels-1)/4 for i in range(5)])
ax.set_xticklabels([xmin+i*xspan/4 for i in range(5)])
ax.set_yticks([i*(ypixels-1)/4 for i in range(5)])
ax.set_yticklabels([ymin+i*yspan/4 for i in range(5)])
cbar = fig.colorbar(axim)
if format is None:
plt.show(block=False)
else:
from IPython.display import SVG
bytes = io.BytesIO()
if format == "svg":
plt.savefig(bytes, format="svg")
plt.close(fig)
img_bytes = bytes.getvalue()
return SVG(img_bytes.decode())
elif format == "image":
plt.savefig(bytes, format="png")
plt.close(fig)
bytes.seek(0)
pil_image = PIL.Image.open(bytes)
return pil_image
else:
raise Exception("format must be None, 'svg', or 'image'")
# optionally print out a table of activation values
if show_values:
s = '\n'
for y in np.linspace(act_max, act_min, 20):
for x in np.linspace(act_min, act_max, 20):
vector = [default_from_layer_value] * self[from_layer].size
vector[ix] = x
vector[iy] = y
out = self.propagate_from(from_layer, vector, to_layer)[to_unit]
s += '%4.2f ' % out
s += '\n'
separator = 100 * '-'
s += separator
print("%s\nActivation of %s[%d] as a function of %s[%d] and %s[%d]" %
(separator, to_layer, to_unit, from_layer, ix, from_layer, iy))
print("rows: %s[%d] decreasing from %.2f to %.2f" % (from_layer, iy, act_max, act_min))
print("cols: %s[%d] increasing from %.2f to %.2f" % (from_layer, ix, act_min, act_max))
print(s)
[docs] def plot_layer_weights(self, layer_name, units='all', wrange=None, wmin=None, wmax=None,
colormap='gray', vshape=None, cbar=True, ticks=5, format=None,
layout=None, spacing=0.2, figsize=None, scale=None, title=None):
"""weight range displayed on the colorbar can be specified as wrange=(wmin, wmax),
or individually via wmin/wmax keywords. if wmin or wmax is None, the actual min/max
value of the weight matrix is used. wrange overrides provided wmin/wmax values. ticks
is the number of colorbar ticks displayed. cbar=False turns off the colorbar. units
can be a single unit index number or a list/tuple/range of indices.
"""
if self[layer_name] is None:
raise Exception("unknown layer: %s" % (layer_name,))
if units == 'all':
units = list(range(self[layer_name].size))
elif isinstance(units, numbers.Integral):
units = [units]
elif not isinstance(units, (list, tuple, range)) or len(units) == 0:
raise Exception("units: expected an int or sequence of ints, but got %s" % (units,))
for unit in units:
if not 0 <= unit < self[layer_name].size:
raise Exception("no such unit: %s" % (unit,))
W, b = self[layer_name].keras_layer.get_weights()
W = W.transpose()
to_size, from_size = W.shape
if vshape is None:
rows, cols = 1, from_size
elif not isinstance(vshape, (list, tuple)) or len(vshape) != 2 \
or not isinstance(vshape[0], numbers.Integral) \
or not isinstance(vshape[1], numbers.Integral):
raise Exception("vshape: expected a pair of ints but got %s" % (vshape,))
else:
rows, cols = vshape
if rows*cols != from_size:
raise Exception("vshape %s is incompatible with the number of incoming weights to each %s unit (%d)"
% (vshape, layer_name, from_size))
aspect_ratio = max(rows,cols)/min(rows,cols)
#print("aspect_ratio is", aspect_ratio)
if aspect_ratio > 50: # threshold may need further refinement
self.warn_once("WARNING: using a visual display shape of (%d, %d), which may be hard to see."
% (rows, cols))
print("You can use vshape=(rows, cols) to specify a different display shape.")
if not isinstance(wmin, (numbers.Number, type(None))):
raise Exception("wmin: expected a number or None but got %s" % (wmin,))
if not isinstance(wmax, (numbers.Number, type(None))):
raise Exception("wmax: expected a number or None but got %s" % (wmax,))
if wrange is None:
if wmin is None:
wmin = np.min(W)
wmin_label = '0' if wmin == 0 else '%+.2f' % (wmin,)
else:
wmin_label = r'$\leq$ 0' if wmin == 0 else r'$\leq$ %+.2f' % (wmin,)
if wmax is None:
wmax = np.max(W)
wmax_label = '0' if wmax == 0 else '%+.2f' % (wmax,)
else:
wmax_label = r'$\geq$ 0' if wmax == 0 else r'$\geq$ %+.2f' % (wmax,)
elif not isinstance(wrange, (list, tuple)) or len(wrange) != 2 \
or not isinstance(wrange[0], (numbers.Number, type(None))) \
or not isinstance(wrange[1], (numbers.Number, type(None))):
raise Exception("wrange: expected a pair of numbers but got %s" % (wrange,))
else: # wrange overrides provided wmin/wmax values
wmin, wmax = wrange
return self.plot_layer_weights(layer_name, units=units, wrange=None, wmin=wmin,
wmax=wmax, colormap=colormap, vshape=vshape,
cbar=cbar, ticks=ticks, format=format, layout=layout,
spacing=spacing, figsize=figsize, scale=scale,
title=title)
if wmin >= wmax:
raise Exception("specified weight range is empty")
if not isinstance(ticks, numbers.Integral) or ticks < 2:
raise Exception("invalid number of colorbar ticks: %s" % (ticks,))
# clip weights to the range [wmin, wmax] and normalize to [0, 1]:
scaled_W = (np.clip(W, wmin, wmax) - wmin) / (wmax - wmin)
if not 0 <= spacing <= 1:
raise Exception("spacing must be between 0 and 1")
return
if scale is None:
scale = 1
elif scale <= 0:
raise Exception("scale must be an int > 0")
if layout is None:
layout = (1, len(units))
layout_rows, layout_cols = layout
border = spacing / max(layout_rows, layout_cols)
if figsize is None:
size_factor = 2.5
width = min(10, size_factor*(layout_cols+1)*scale)
height = min(8, size_factor*layout_rows*scale)
figsize = (width, height)
fig, axes = plt.subplots(layout_rows, layout_cols, squeeze=False,
figsize=figsize, num=title,
gridspec_kw={'wspace': spacing,
'hspace': spacing,
'left': border,
'right': 1-border,
'bottom': border,
'top': 1-border})
if title is not None:
fig.canvas.set_window_title(title)
for ax in axes.reshape(axes.size):
ax.axis('off')
k = 0
for r in range(layout_rows):
for c in range(layout_cols):
if k < len(units):
u = units[k]
axes[r][c].set_title('%s[%d]' % (layer_name, u))
axes[r][c].title.set_fontsize(8)
im = scaled_W[u,:].reshape((rows, cols))
axim = axes[r][c].imshow(im, cmap=colormap, vmin=0, vmax=1)
k += 1
if k < len(units):
print("WARNING: could not plot all requested weights with layout %s" % (layout,),
file=sys.stderr)
if cbar:
tick_locations = np.linspace(0, 1, ticks)
tick_values = tick_locations * (wmax - wmin) + wmin
s = 0.5 if layout_rows > 3 else 0.75 if 2 <= layout_rows <= 3 else 1
colorbar = fig.colorbar(axim, ax=axes, ticks=tick_locations, shrink=s)
cbar_labels = ['0' if t == 0 else '%+.2f' % (t,) for t in tick_values]
cbar_labels[0] = wmin_label
cbar_labels[-1] = wmax_label
colorbar.ax.tick_params(labelsize=8)
colorbar.ax.set_yticklabels(cbar_labels)
if format is None:
plt.show(block=False)
else:
from IPython.display import SVG
bytes = io.BytesIO()
if format == "svg":
plt.savefig(bytes, format="svg")
plt.close(fig)
img_bytes = bytes.getvalue()
return SVG(img_bytes.decode())
elif format == "image":
plt.savefig(bytes, format="png")
plt.close(fig)
bytes.seek(0)
pil_image = PIL.Image.open(bytes)
return pil_image
else:
raise Exception("format must be None, 'svg', or 'image'")
[docs] def show_unit_weights(self, layer_name, unit, vshape=None, ascii=False):
if self[layer_name] is None:
raise Exception("unknown layer: %s" % (layer_name,))
W, b = self[layer_name].keras_layer.get_weights()
W = W.transpose()
to_size, from_size = W.shape
if vshape is None:
rows, cols = 1, from_size
elif not isinstance(vshape, (list, tuple)) or len(vshape) != 2 \
or not isinstance(vshape[0], numbers.Integral) \
or not isinstance(vshape[1], numbers.Integral):
raise Exception("vshape: expected a pair of ints but got %s" % (vshape,))
else:
rows, cols = vshape
if rows*cols != from_size:
raise Exception("vshape %s is incompatible with the number of incoming weights to each %s unit (%d)"
% (vshape, layer_name, from_size))
weights = W[unit].reshape((rows,cols))
for r in range(rows):
for c in range(cols):
w = weights[r][c]
if ascii:
ch = ' ' if w <= 0 else '.' if w < 0.50 else 'o' if w < 0.75 else '@'
print(ch, end=" ")
else:
print('%5.2f' % (w,), end=" ")
print()
[docs] def get_metrics(self):
"""
Returns a list of the metrics available in the Network's history.
"""
metrics = set()
for epoch in self.history:
metrics = metrics.union(set(epoch.keys()))
return sorted(metrics)
[docs] def get_metric(self, metric):
"""
Returns the metric data from the network's history.
>>> net = Network("Test", 2, 2, 1)
>>> net.get_metric("loss")
[]
"""
return [epoch[metric] if metric in epoch else None for epoch in self.history]
[docs] def plot(self, metrics=None, ymin=None, ymax=None, start=0, end=None, legend='best',
label=None, symbols=None, default_symbol="-", title=None, return_fig_ax=False, fig_ax=None,
format=None, xs_rotation=None):
"""Plots the current network history for the specific epoch range and
metrics. metrics is '?', 'all', a metric keyword, or a list of metric keywords.
if metrics is None, loss and accuracy are plotted on separate graphs.
>>> net = Network("Plot Test", 1, 3, 1)
>>> net.compile(error="mse", optimizer="rmsprop")
>>> net.dataset.append([0.0], [1.0])
>>> net.dataset.append([1.0], [0.0])
>>> net.train(plot=False) # doctest: +ELLIPSIS
Evaluating initial training metrics...
Training...
...
>>> net.plot('?')
Available metrics: acc, loss
"""
## https://matplotlib.org/api/markers_api.html
## https://matplotlib.org/api/colors_api.html
if isinstance(ymin, str):
raise Exception("Network.plot() should be called with a metric, or list of metrics")
if len(self.history) == 0:
print("No history available")
return
available_metrics = self.get_metrics()
if metrics is None:
metrics = ['loss']
elif metrics is '?':
print("Available metrics:", ", ".join(available_metrics))
return
elif metrics == 'all':
metrics = available_metrics
elif isinstance(metrics, str):
metrics = [metrics]
elif isinstance(metrics, (list, tuple)):
pass
else:
print("metrics: expected a list or a string but got %s" % (metrics,))
return
## Check metrics, and expand regular expressions:
proposed_metrics = metrics
metrics = []
for metric in proposed_metrics:
for available_metric in available_metrics:
if re.match(metric, available_metric) is not None:
metrics.append(available_metric)
if fig_ax:
fig, ax = fig_ax
else:
fig, ax = plt.subplots(1)
x_values = range(self.epoch_count+1)
x_values = x_values[start:end]
ax.set_xlabel('Epoch')
data_found = False
for metric in metrics:
y_values = self.get_metric(metric)
y_values = y_values[start:end]
if y_values.count(None) == len(y_values):
print("WARNING: No %s data available for the specified epochs (%s-%s)" % (metric, start, end), file=sys.stderr)
else:
next_label = label if label else metric
symbol = get_symbol(label, symbols, default_symbol)
ax.plot(x_values, y_values, symbol, label=next_label)
data_found = True
if not data_found:
if return_fig_ax:
return (fig, ax)
else:
plt.close(fig)
return
if ymin is not None:
plt.ylim(ymin=ymin)
if ymax is not None:
plt.ylim(ymax=ymax)
if legend is not None:
plt.legend(loc=legend)
if title is None:
title = self.name
if xs_rotation is not None:
plt.setp(ax.get_xticklabels(),
rotation=xs_rotation,
horizontalalignment='right')
plt.title(title)
if return_fig_ax:
return (fig, ax)
elif format is None:
plt.show(block=False)
else:
from IPython.display import SVG
bytes = io.BytesIO()
if format == "svg":
plt.savefig(bytes, format="svg")
plt.close(fig)
img_bytes = bytes.getvalue()
return SVG(img_bytes.decode())
elif format == "image":
plt.savefig(bytes, format="png")
plt.close(fig)
bytes.seek(0)
pil_image = PIL.Image.open(bytes)
return pil_image
else:
raise Exception("format must be None, 'svg', or 'image'")
[docs] def show_results(self, report_rate=None):
"""
Show the history of training results. If report_rate is given
use that, else, try to use the last trained report_rate.
"""
report_rate = (report_rate
if report_rate is not None
else self.train_options.get("report_rate", 1))
self._need_to_show_headings = True
for epoch_count in range(0, len(self.history), report_rate):
results = self.history[epoch_count]
self.report_epoch(epoch_count, results)
if len(self.history) > 0:
print("=" * 56)
self.report_epoch(len(self.history) - 1, self.history[-1])
[docs] def plot_results(self, callback=None, format=None):
"""plots loss and accuracy on separate graphs, ignoring any other metrics"""
#print("called on_epoch_end with epoch =", epoch)
metrics = self.get_metrics()
if callback is not None and callback.figure is not None:
# figure and axes objects have already been created
fig, loss_ax, acc_ax = callback.figure
loss_ax.clear()
if acc_ax is not None:
acc_ax.clear()
else:
# first time called, so create figure and axes objects
if 'acc' in metrics or 'val_acc' in metrics:
fig, (loss_ax, acc_ax) = plt.subplots(1, 2, figsize=(10,4))
else:
fig, loss_ax = plt.subplots(1)
acc_ax = None
if callback is not None:
callback.figure = fig, loss_ax, acc_ax
x_values = range(self.epoch_count+1)
for metric in metrics:
y_values = self.get_metric(metric)
if metric == 'loss':
loss_ax.plot(x_values, y_values, label='Training set')
elif metric == 'val_loss':
loss_ax.plot(x_values, y_values, label='Testing set')
elif metric == 'acc' and acc_ax is not None:
acc_ax.plot(x_values, y_values, label='Training set')
elif metric == 'val_acc' and acc_ax is not None:
acc_ax.plot(x_values, y_values, label='Testing set')
loss_ax.set_ylim(bottom=0)
loss_ax.set_title("%s: Error" % (self.name,))
loss_ax.set_xlabel('Epoch')
loss_ax.legend(loc='best')
if acc_ax is not None:
acc_ax.set_ylim([-0.1, 1.1])
acc_ax.set_title("%s: Accuracy" % (self.name,))
acc_ax.set_xlabel('Epoch')
acc_ax.legend(loc='best')
if (callback is not None and not callback.in_console) or format == "svg":
from IPython.display import SVG, clear_output, display
bytes = io.BytesIO()
plt.savefig(bytes, format='svg')
img_bytes = bytes.getvalue()
clear_output(wait=True)
display(SVG(img_bytes.decode()))
#return SVG(img_bytes.decode())
else: # format is None
plt.pause(0.01)
#plt.show(block=False)
[docs] def compile(self, **kwargs):
"""
Compile the network with error function and optimizer.
You must provide error/loss and optimizer keywords.
Possible error/loss functions are:
* 'mse' - mean_squared_error
* 'mae' - mean_absolute_error
* 'mape' - mean_absolute_percentage_error
* 'msle' - mean_squared_logarithmic_error
* 'kld' - kullback_leibler_divergence
* 'cosine' - cosine_proximity
* 'categorical_crossentropy' - for categorical outputs
* 'binary_crossentropy' - for 0/1 outputs
* 'sparse_categorical_crossentropy' - sparse categories
* 'kullback_leibler_divergence'
* 'poisson'
* 'cosine_proximity'
Possible optimizers are:
* 'sgd'
* 'rmsprop'
* 'adagrad'
* 'adadelta'
* 'adam'
* 'adamax'
* 'nadam'
See https://keras.io/ `Model.compile` method for more details.
Examples:
>>> net = Network("XOR", 2, 5, 5, 1)
>>> net.compile(error="mse", optimizer="adam")
"""
try:
self.compile_args = copy.deepcopy(kwargs)
except:
self.compile_args = {} # can't copy the state of args
if self.model is None:
self.build_model()
self.compile_model(**kwargs)
[docs] def add_loss(self, layer_name, function):
"""
Add an error/loss function(s) to a layer.
>>> from conx import Network
>>> import keras.backend as K
>>> from keras.losses import mse
>>> net = Network("XOR", 2, 5, 1, activation="sigmoid", seed=1234)
>>> def loss(inputs):
... return mse(0.50, inputs[:,0])
>>> net.compile(error={"output": "mse",
... "hidden": loss},
... optimizer="adam")
>>> ds = [[[0, 0], [0]],
... [[0, 1], [1]],
... [[1, 0], [1]],
... [[1, 1], [0]]]
>>> net.dataset.load(ds)
>>> net.train(1, plot=False) # doctest: +ELLIPSIS
Evaluating ...
"""
losses = self[layer_name].keras_layer.get_losses_for(None)
if len(losses) > 0:
raise Exception("to add multiple error functions on layer named '%s' add all at once" % (layer_name,))
if isinstance(function, (list, tuple)):
for error_function in function:
self[layer_name].keras_layer.add_loss(error_function(self[layer_name].k), None)
else:
self[layer_name].keras_layer.add_loss(function(self[layer_name].k), None)
[docs] def process_compile_kwargs(self, kwargs):
"""
"""
## Error checking:
if len(self.layers) == 0:
raise Exception("network has no layers")
for layer in self.layers:
if layer.kind() == 'unconnected':
raise Exception("'%s' layer is unconnected" % layer.name)
if "error" in kwargs: # synonym
kwargs["loss"] = kwargs["error"]
del kwargs["error"]
if kwargs["loss"] == 'sparse_categorical_crossentropy':
raise Exception("'sparse_categorical_crossentropy' is not a valid error metric in conx; use 'categorical_crossentropy' with proper targets")
if "optimizer" not in kwargs or "loss" not in kwargs:
raise Exception("both optimizer and error/loss are required to compile a network")
if isinstance(kwargs["optimizer"], str) and kwargs["optimizer"].lower() not in self.OPTIMIZERS:
raise Exception("invalid optimizer '%s'; use valid function or one of %s" % (kwargs["optimizer"], Network.OPTIMIZERS,))
## Build an optimizer:
config = kwargs.get("config", {})
for kw in list(kwargs.keys()):
if kw not in ["loss", "metrics", "optimizer",
"loss_weights", "sample_weight_mode",
"weighted_metrics", "target_tensors"]:
if kw != "config":
config[kw] = kwargs[kw]
del kwargs[kw]
if config != {}:
error = False
try:
kwargs["optimizer"] = keras.optimizers.get({"class_name": kwargs["optimizer"], "config": config})
except:
error = True
if error:
class_instance = keras.optimizers.get(kwargs["optimizer"])
raise Exception("invalid optimizer arguments %s(**%s); for more information type: help(cx.%s)" % (
kwargs["optimizer"], config, class_instance.__class__.__name__))
### Optimizer is an instance, if given kwargs
using_softmax = False
for layer in self.layers:
if layer.kind() == "output":
if layer.activation is not None and layer.activation == "softmax":
using_softmax = True
if "crossentropy" not in kwargs["loss"]:
print("WARNING: you are using the 'softmax' activation function on layer '%s'" % layer.name, file=sys.stderr)
print(" but not using a 'crossentropy' error measure.", file=sys.stderr)
if "crossentropy" in kwargs["loss"]:
if layer.activation is not None and layer.activation != "softmax":
print("WARNING: you are using a crossentropy error measure", file=sys.stderr)
print(" but not using the 'softmax' activation function on layer '%s'"
% layer.name, file=sys.stderr)
if "metrics" in kwargs and kwargs["metrics"] is not None:
pass ## ok allow override
elif using_softmax: ## let's use Keras' default acc function
kwargs['metrics'] = ["acc"] ## Keras' default
if "tolerance" in kwargs:
print("WARNING: using softmax activation function; tolerance is ignored", file=sys.stderr)
else:
kwargs['metrics'] = [self.acc] ## Conx's default
## Handle loss functions on internal layers:
if "loss" in kwargs and isinstance(kwargs["loss"], dict):
for layer_name in list(kwargs["loss"]):
if self[layer_name].kind() == "hidden":
self.add_loss(layer_name, kwargs["loss"][layer_name])
del kwargs["loss"][layer_name]
elif self[layer_name].kind() != "output":
raise Exception("cannot put an error function " +
"on layer named '%s' of type '%s'" % (layer_name, self[layer_name].kind()))
return kwargs
[docs] def acc(self, targets, outputs):
# This is only used on non-multi-output-bank training:
return K.mean(K.all(K.less_equal(K.abs(targets - outputs), self._tolerance), axis=-1), axis=-1)
def _find_keras_layer(self, layer_name):
"""
Find the associated keras layer.
"""
return [x for x in self.model.layers if x.name == layer_name][0]
def _delete_intermediary_models(self):
"""
Remove these, as they don't pickle.
"""
for layer in self.layers:
layer.k = None
layer.input_names = set([])
layer.model = None
[docs] def build_model(self, starting_layers=None):
"""
Build the model.
"""
self._reset_layer_metadata()
self._build_intermediary_models(starting_layers=starting_layers)
output_k_layers = self._get_output_ks_in_order()
input_k_layers = self._get_input_ks_in_order(self.input_bank_order)
self.model = keras.models.Model(inputs=input_k_layers, outputs=output_k_layers)
self._level_ordering = None
for layer in self.layers:
layer.keras_layer = self._find_keras_layer(layer.name)
[docs] def compile_model(self, **kwargs):
"""
Compile the model:
"""
if self.model is None:
raise Exception("need to build network")
if kwargs:
kwargs = self.process_compile_kwargs(kwargs)
self.compile_options = copy.copy(kwargs)
self.model.compile(**self.compile_options)
[docs] def delete_layer(self, layer_name):
"""
Delete an output layer_name from a network.
Note: if compiled, you will need to re-compile.
>>> net = Network("XOR", 2, 2, 3, 1)
>>> net.compile(error="mse", optimizer="adam")
>>> len(net.layers) == 4
True
>>> len(net.propagate([-1, 1])) == 1
True
>>> net.delete_layer("output")
>>> net.compile(error="mse", optimizer="adam")
>>> len(net.layers) == 3
True
>>> len(net.propagate([-1, 1])) == 3
True
>>> net.train_one([-1, 1], [-.5, .5, .5]) # doctest: +ELLIPSIS
(...
>>> net.delete_layer("hidden2")
>>> net.compile(error="mse", optimizer="adam")
>>> len(net.layers) == 2
True
>>> len(net.propagate([-1, 1])) == 2
True
>>> net.train_one([-1, 1], [-.5, .5]) # doctest: +ELLIPSIS
(...
"""
layer = self[layer_name]
self._delete_layer_from_connections(layer)
self.model = None
self._level_ordering = None
def _delete_layer_from_connections(self, layer):
## Remove layer.outgoing_connections to deleted layer:
for incoming_layer in layer.incoming_connections:
if layer in incoming_layer.outgoing_connections:
index = incoming_layer.outgoing_connections.index(layer)
del incoming_layer.outgoing_connections[index]
## Remove layer.incoming_connections to deleted layer:
for outgoing_layer in layer.outgoing_connections:
if layer in outgoing_layer.incoming_connections:
index = outgoing_layer.incoming_connections.index(layer)
del outgoing_layer.incoming_connections[index]
## Delete from layer name dictionary:
del self.layer_dict[layer.name]
## Remove layer from list:
for i in range(len(self.layers)):
if self.layers[i] is layer:
del self.layers[i]
break
[docs] def connect_network(self, output_layer_name, network):
"""
Graft a network on to the end of this network.
>>> import conx as cx
>>> net1 = cx.Network("XOR1", 2, 2, 1)
>>> net1.compile(error="mse", optimizer="adam")
>>> net2 = cx.Network("XOR2")
>>> net2.add(cx.Layer("input2", 2),
... cx.Layer("hidden2", 2),
... cx.Layer("output2", 1))
'output2'
>>> net2.connect()
>>> net2.compile(error="mse", optimizer="adam")
>>> net1.delete_layer("output")
>>> net1.connect_network("hidden", net2)
"""
## Get new layers, skipping over input layer:
self.layers.extend(network.layers)
self.layer_dict.update({layer.name: layer
for layer in network.layers})
## Connect net1.output to net2.input:
output_layer = self[output_layer_name]
output_layer.outgoing_connections.append(network.layers[1])
network.layers[1].incoming_connections.append(output_layer)
## Remove input layer from network connections:
self.delete_layer(network.layers[0].name)
## Rebuild starting with output_layers
self.build_model(starting_layers=[self[output_layer_name]])
self.compile_model(**network.compile_options)
def _build_intermediary_models(self, starting_layers=None):
"""
Construct the layer.k, layer.input_names, and layer.model's.
"""
if starting_layers is None:
starting_layers = self.layers
self.prop_from_dict.clear()
self.keras_functions.clear()
sequence = topological_sort(self, starting_layers)
if self.debug: print("topological sort:", [l.name for l in sequence])
for layer in sequence:
if self.debug: print("sequence:", layer.name)
if layer.kind() == 'input':
if self.debug: print("making input layer for", layer.name)
layer.k = self.keras_functions.get(layer.name, layer.make_input_layer_k())
layer.input_names = set([layer.name])
outputs = keras.layers.Lambda(lambda v: v + 0)(layer.k) # identity
layer.model = keras.models.Model(inputs=layer.k, outputs=outputs)
self.prop_from_dict[(layer.name, layer.name)] = layer.model
else:
if self.debug: print("making layer for", layer.name)
if len(layer.incoming_connections) == 0:
raise Exception("non-input layer '%s' with no incoming connections" % layer.name)
kfuncs = self.keras_functions.get(layer.name, layer.make_keras_functions())
self.keras_functions[layer.name] = kfuncs
if len(layer.incoming_connections) == 1:
if self.debug: print("single input", layer.incoming_connections[0])
k = layer.incoming_connections[0].k
layer.input_names = layer.incoming_connections[0].input_names
else: # multiple inputs, some type of merge:
if self.debug: print("Merge detected!", [l.name for l in layer.incoming_connections])
if layer.handle_merge:
k = layer.make_keras_function()
else:
k = keras.layers.Concatenate()([incoming.k for incoming in layer.incoming_connections])
# flatten:
layer.input_names = set([item for sublist in
[incoming.input_names for incoming in layer.incoming_connections]
for item in sublist])
if self.debug: print("input names for", layer.name, layer.input_names)
if self.debug: print("applying k's", kfuncs)
for f in kfuncs:
k = f(k)
layer.k = k
## get the inputs to this branch, in order:
input_ks = self._get_input_ks_in_order(layer.input_names)
## From all inputs to this layer:
layer.model = keras.models.Model(inputs=input_ks, outputs=layer.k)
if len(layer.input_names) == 1:
self.prop_from_dict[(list(layer.input_names)[0], layer.name)] = layer.model
## Build all other prop_from models:
if self.build_propagate_from_models:
for in_layer_name in self.input_bank_order:
for out_layer_name in self.output_bank_order:
layer = self[out_layer_name]
if self.debug: print("from %s to %s" % (in_layer_name, layer.name))
all_paths = find_all_paths(self, self[in_layer_name], layer)
for path in all_paths:
for i in range(len(path) - 1):
abort_path = False
path_layer = path[i]
if (path_layer.name, layer.name) in self.prop_from_dict:
continue
if self.debug: print(" %s to %s" % (path_layer.name, layer.name))
if path_layer.shape is None:
## Skips FlattenLayer, Concat, Merge, etc. as from_layer
if self.debug: print(" %s: aborting this path; try next" % path_layer.name)
continue
k = starting_k = keras.layers.Input(path_layer.shape, name=path_layer.name)
rest_of_path = path[i + 1:]
for rest_of_path_layer in rest_of_path:
if abort_path:
break
if self.debug: print(" %s to %s" % (path_layer.name, rest_of_path_layer.name))
kfuncs = self.keras_functions[rest_of_path_layer.name]
for f in kfuncs:
try:
k = f(k)
except:
## Can't make this pathway; probably a merge
if self.debug: print(" merge; need other inputs")
abort_path = True
break
## FIXME: could be multiple paths
if (path_layer.name, rest_of_path_layer.name) not in self.prop_from_dict:
self.prop_from_dict[
(path_layer.name, rest_of_path_layer.name)
] = keras.models.Model(inputs=starting_k, outputs=k)
def _get_input_ks_in_order(self, layer_names):
"""
Get the Keras function for each of a set of layer names.
[in3, in4] sorted by input bank ordering
"""
sorted_layer_names = self._get_sorted_input_names(set(layer_names))
layer_ks = [self[layer_name].k for layer_name in sorted_layer_names]
if len(layer_ks) == 1:
layer_ks = layer_ks[0]
return layer_ks
def _get_sorted_input_names(self, layer_names):
"""
Given a set of input names, give them back in order.
"""
return [name for (index, name) in sorted([(self.input_bank_order.index(name), name)
for name in layer_names])]
def _get_output_ks_in_order(self):
"""
Get the Keras function for each output layer, in order.
"""
layer_ks = [self[layer_name].k for layer_name in self.output_bank_order]
if len(layer_ks) == 1:
layer_ks = layer_ks[0]
return layer_ks
def _image_to_uri(self, img_src):
# Convert to binary data:
b = io.BytesIO()
try:
img_src.save(b, format='gif')
except:
return ""
data = b.getvalue()
data = base64.b64encode(data)
if not isinstance(data, str):
data = data.decode("latin1")
return "data:image/gif;base64,%s" % html.escape(data)
[docs] def vshape(self, layer_name):
"""
Find the vshape of layer.
"""
layer = self[layer_name]
vshape = layer.vshape if layer.vshape else layer.shape if layer.shape else None
if vshape is None:
vshape = layer.get_output_shape()
return vshape
def _pre_process_struct(self, inputs, config, ordering, targets):
"""
Determine sizes and pre-compute images.
"""
### find max_width, image_dims, and row_height
# Go through and build images, compute max_width:
row_heights = []
max_width = 0
max_height = 0
images = {}
image_dims = {}
## if targets, then need to propagate for error:
if targets is not None and self.model is not None:
outputs = self.propagate(inputs)
if len(self.output_bank_order) == 1:
targets = [targets]
errors = (np.array(outputs) - np.array(targets)).tolist()
else:
errors = []
for bank in range(len(self.output_bank_order)):
errors.append((np.array(outputs[bank]) - np.array(targets[bank])).tolist())
#######################################################################
## For each level:
#######################################################################
hiding = {}
for level_tups in ordering: ## output to input:
# first make all images at this level
row_width = 0 # for this row
row_height = 0 # for this row
#######################################################################
## For each column:
#######################################################################
for column in range(len(level_tups)):
(layer_name, anchor, fname) = level_tups[column]
if not self[layer_name].visible:
if not hiding.get(column, False):
row_height = max(row_height, config["vspace"]) # space for hidden indicator
hiding[column] = True # in the middle of hiding some layers
row_width += config["hspace"] # space between
max_width = max(max_width, row_width) # of all rows
continue
elif anchor:
# No need to handle anchors here
# as they occupy no vertical space
hiding[column] = False
# give it some hspace for this column
# in case there is nothing else in this column
row_width += config["hspace"]
max_width = max(max_width, row_width)
continue
hiding[column] = False
#######################################################################
## The rest of this for loop is handling image of bank
#######################################################################
if inputs is not None:
v = inputs
elif len(self.dataset.inputs) > 0 and not isinstance(self.dataset, VirtualDataset):
## don't change cache if virtual... could take some time to rebuild cache
v = self.dataset.inputs[0]
else:
if self.num_input_layers > 1:
v = []
for in_name in self.input_bank_order:
v.append(self[in_name].make_dummy_vector())
else:
in_layer = [layer for layer in self.layers if layer.kind() == "input"][0]
v = in_layer.make_dummy_vector()
if self[layer_name].model:
orig_svg_rotate = self.config["svg_rotate"]
self.config["svg_rotate"] = config["svg_rotate"]
try:
image = self._propagate_to_image(layer_name, v)
except:
image = self[layer_name].make_image(np.array(self[layer_name].make_dummy_vector()), config=config)
self.config["svg_rotate"] = orig_svg_rotate
else:
self.warn_once("WARNING: network is uncompiled; activations cannot be visualized")
image = self[layer_name].make_image(np.array(self[layer_name].make_dummy_vector()), config=config)
(width, height) = image.size
images[layer_name] = image ## little image
if self[layer_name].kind() == "output":
if self.model is not None and targets is not None:
## Target image, targets set above:
target_colormap = self[layer_name].colormap
target_bank = targets[self.output_bank_order.index(layer_name)]
target_array = np.array(target_bank)
target_image = self[layer_name].make_image(target_array, target_colormap, config)
## Error image, error set above:
error_colormap = get_error_colormap()
error_bank = errors[self.output_bank_order.index(layer_name)]
error_array = np.array(error_bank)
error_image = self[layer_name].make_image(error_array, error_colormap, config)
images[layer_name + "_errors"] = error_image
images[layer_name + "_targets"] = target_image
else:
images[layer_name + "_errors"] = image
images[layer_name + "_targets"] = image
### Layer settings:
if self[layer_name].image_maxdim:
image_maxdim = self[layer_name].image_maxdim
else:
image_maxdim = config["image_maxdim"]
if self[layer_name].image_pixels_per_unit:
image_pixels_per_unit = self[layer_name].image_pixels_per_unit
else:
image_pixels_per_unit = config["image_pixels_per_unit"]
## First, try based on shape:
#pwidth, pheight = np.array(image.size) * image_pixels_per_unit
vshape = self.vshape(layer_name)
if vshape is None or self[layer_name].keep_aspect_ratio:
pass ## let the image set the shape
elif len(vshape) == 1:
if vshape[0] is not None:
width = vshape[0] * image_pixels_per_unit
height = image_pixels_per_unit
elif len(vshape) >= 2:
if vshape[0] is not None:
height = vshape[0] * image_pixels_per_unit
if vshape[1] is not None:
width = vshape[1] * image_pixels_per_unit
else:
if len(vshape) > 2:
if vshape[1] is not None:
height = vshape[1] * image_pixels_per_unit
width = vshape[2] * image_pixels_per_unit
elif vshape[1] is not None: # flatten
width = vshape[1] * image_pixels_per_unit
height = image_pixels_per_unit
## keep aspect ratio:
if self[layer_name].keep_aspect_ratio:
scale = image_maxdim / max(width, height)
image = image.resize((int(width * scale), int(height * scale)))
width, height = image.size
else:
## Change aspect ratio if too big/small
if width < image_pixels_per_unit:
width = image_pixels_per_unit
if height < image_pixels_per_unit:
height = image_pixels_per_unit
## make sure not too big:
if height > image_maxdim:
height = image_maxdim
if width > image_maxdim:
width = image_maxdim
image_dims[layer_name] = (width, height)
row_width += width + config["hspace"] # space between
row_height = max(row_height, height)
row_heights.append(row_height)
max_width = max(max_width, row_width) # of all rows
return max_width, max_height, row_heights, images, image_dims
def _find_spacing(self, row, ordering, max_width):
"""
Find the spacing for a row number
"""
return max_width / (len(ordering[row]) + 1)
def _get_border_color(self, layer_name, config):
if config.get("highlights") and layer_name in config.get("highlights"):
return config["highlights"][layer_name]["border_color"]
else:
return config["border_color"] if self.model is not None else "red"
def _get_border_width(self, layer_name, config):
if config.get("highlights") and layer_name in config.get("highlights"):
return config["highlights"][layer_name]["border_width"]
else:
return config["border_width"] if self.model is not None else 4
[docs] def build_struct(self, inputs, class_id, config, targets):
ordering = list(reversed(self._get_level_ordering())) # list of names per level, input to output
max_width, max_height, row_heights, images, image_dims = self._pre_process_struct(inputs, config, ordering, targets)
### Now that we know the dimensions:
struct = []
cheight = config["border_top"] # top border
#######################################################################
## Display targets?
#######################################################################
if config["show_targets"]:
spacing = self._find_spacing(0, ordering, max_width)
# draw the row of targets:
cwidth = 0
for (layer_name, anchor, fname) in ordering[0]: ## no anchors in output
if layer_name + "_targets" not in images:
continue
image = images[layer_name + "_targets"]
(width, height) = image_dims[layer_name]
cwidth += (spacing - width/2)
struct.append(["image_svg", {"name": layer_name + "_targets",
"svg_counter": self._svg_counter,
"x": cwidth,
"y": cheight,
"image": self._image_to_uri(image),
"width": width,
"height": height,
"tooltip": self[layer_name].tooltip(),
"border_color": self._get_border_color(layer_name, config),
"border_width": self._get_border_width(layer_name, config),
"rx": cwidth - 1, # based on arrow width
"ry": cheight - 1,
"rh": height + 2,
"rw": width + 2}])
## show a label
struct.append(["label_svg", {"x": cwidth + width + 5,
"y": cheight + height/2 + 2,
"label": "targets",
"font_size": config["font_size"],
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
cwidth += width/2
## Then we need to add height for output layer again, plus a little bit
cheight += row_heights[0] + 10 # max height of row, plus some
#######################################################################
## Display error?
#######################################################################
if config["show_errors"]:
spacing = self._find_spacing(0, ordering, max_width)
# draw the row of errors:
cwidth = 0
for (layer_name, anchor, fname) in ordering[0]: # no anchors in output
if layer_name + "_errors" not in images:
continue
image = images[layer_name + "_errors"]
(width, height) = image_dims[layer_name]
cwidth += (spacing - (width/2))
struct.append(["image_svg", {"name": layer_name + "_errors",
"svg_counter": self._svg_counter,
"x": cwidth,
"y": cheight,
"image": self._image_to_uri(image),
"width": width,
"height": height,
"tooltip": self[layer_name].tooltip(),
"border_color": self._get_border_color(layer_name, config),
"border_width": self._get_border_width(layer_name, config),
"rx": cwidth - 1, # based on arrow width
"ry": cheight - 1,
"rh": height + 2,
"rw": width + 2}])
## show a label
struct.append(["label_svg", {"x": cwidth + width + 5,
"y": cheight + height/2 + 2,
"label": "errors",
"font_size": config["font_size"],
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
cwidth += width/2
## Then we need to add height for output layer again, plus a little bit
cheight += row_heights[0] + 10 # max height of row, plus some
#######################################################################
## Show a separator that takes no space between output and targets/errors
#######################################################################
if config["show_errors"] or config["show_targets"]:
spacing = self._find_spacing(0, ordering, max_width)
## Draw a line for each column in putput:
cwidth = spacing/2 + spacing/2 # border + middle of first column
# number of columns:
for level_tups in ordering[0]:
struct.append(["line_svg", {"x1":cwidth - spacing/2,
"y1":cheight - 5, # half the space between them
"x2":cwidth + spacing/2,
"y2":cheight - 5,
"arrow_color": "green",
"tooltip": "",
}])
cwidth += spacing
#######################################################################
# Now we go through again and build SVG:
#######################################################################
positioning = {}
level_num = 0
#######################################################################
## For each level:
#######################################################################
hiding = {}
for row in range(len(ordering)):
level_tups = ordering[row]
## how many space at this level for this column?
spacing = self._find_spacing(row, ordering, max_width)
cwidth = 0
# See if there are any connections up:
any_connections_up = False
for (layer_name, anchor, fname) in level_tups:
if not self[layer_name].visible:
continue
elif anchor:
continue
for out in self[layer_name].outgoing_connections:
if out.name not in positioning: ## is it drawn yet? if not, continue,
## if yes, we need vertical space for arrows
continue
any_connections_up = True
if any_connections_up:
cheight += config["vspace"] # for arrows
else: # give a bit of room:
## FIXME: determine if there were spaces drawn last layer
## Right now, just skip any space at all
## cheight += 5
pass
row_height = 0 # for row of images
#######################################################################
# Draw each column:
#######################################################################
for column in range(len(level_tups)):
(layer_name, anchor, fname) = level_tups[column]
if not self[layer_name].visible:
if not hiding.get(column, False): # not already hiding, add some space:
struct.append(["label_svg", {"x": cwidth + spacing - 80, ## center the text
"y": cheight + 15,
"label": "[layer(s) not visible]",
"font_size": config["font_size"],
"font_color": "green",
"font_family": config["font_family"],
"text_anchor": "start",
"rotate": False,
}])
row_height = max(row_height, config["vspace"])
hiding[column] = True
cwidth += spacing # leave full column width
continue
## end run of hiding
hiding[column] = False
#######################################################################
## Anchor
#######################################################################
if anchor:
anchor_name = "%s-%s-anchor%s" % (layer_name, fname, level_num)
cwidth += spacing
positioning[anchor_name] = {"x": cwidth, "y": cheight + row_heights[row]}
x1 = cwidth
## now we are at an anchor. Is the thing that it anchors in the
## lower row? level_num is increasing
prev = [(oname, oanchor, lfname) for (oname, oanchor, lfname) in ordering[level_num - 1] if
(((layer_name == oname) and (oanchor is False)) or
((layer_name == oname) and (oanchor is True) and (fname == lfname)))]
if prev:
tooltip = html.escape(self.describe_connection_to(self[fname], self[layer_name]))
if prev[0][1]: # anchor
anchor_name2 = "%s-%s-anchor%s" % (layer_name, fname, level_num - 1)
## draw a line to this anchor point
x2 = positioning[anchor_name2]["x"]
y2 = positioning[anchor_name2]["y"]
struct.append(["curve", {
"endpoint": False,
"drawn": False,
"name": anchor_name2,
"x1": cwidth,
"y1": cheight,
"x2": x2,
"y2": y2,
"arrow_color": config["arrow_color"],
"tooltip": tooltip
}])
struct.append(["curve", {
"endpoint": False,
"drawn": False,
"name": anchor_name2,
"x1": cwidth,
"y1": cheight + row_heights[row],
"x2": cwidth,
"y2": cheight,
"arrow_color": config["arrow_color"],
"tooltip": tooltip
}])
else:
## draw a line to this bank
x2 = positioning[layer_name]["x"] + positioning[layer_name]["width"]/2
y2 = positioning[layer_name]["y"] + positioning[layer_name]["height"]
tootip ="TODO"
struct.append(["curve", {
"endpoint": True,
"drawn": False,
"name": layer_name,
"x1": cwidth,
"y1": cheight,
"x2": x2,
"y2": y2,
"arrow_color": config["arrow_color"],
"tooltip": tooltip
}])
struct.append(["curve", {
"endpoint": False,
"drawn": False,
"name": layer_name,
"x1":cwidth,
"y1":cheight + row_heights[row],
"x2":cwidth,
"y2":cheight,
"arrow_color": config["arrow_color"],
"tooltip": tooltip
}])
else:
print("that's weird!", layer_name, "is not in", prev)
continue
else:
#######################################################################
## Bank positioning
#######################################################################
image = images[layer_name]
(width, height) = image_dims[layer_name]
cwidth += (spacing - (width/2))
positioning[layer_name] = {"name": layer_name + ("-rotated" if config["svg_rotate"] else ""),
"svg_counter": self._svg_counter,
"x": cwidth,
"y": cheight,
"image": self._image_to_uri(image),
"width": width,
"height": height,
"tooltip": self[layer_name].tooltip(),
"border_color": self._get_border_color(layer_name, config),
"border_width": self._get_border_width(layer_name, config),
"rx": cwidth - 1, # based on arrow width
"ry": cheight - 1,
"rh": height + 2,
"rw": width + 2}
x1 = cwidth + width/2
y1 = cheight - 1
#######################################################################
## Arrows going up
#######################################################################
for out in self[layer_name].outgoing_connections:
if out.name not in positioning:
continue
# draw an arrow between layers:
anchor_name = "%s-%s-anchor%s" % (out.name, layer_name, level_num - 1)
## Don't draw this error, if there is an anchor in the next level
if anchor_name in positioning:
tooltip = html.escape(self.describe_connection_to(self[layer_name], out))
x2 = positioning[anchor_name]["x"]
y2 = positioning[anchor_name]["y"]
struct.append(["curve", {
"endpoint": False,
"drawn": False,
"name": anchor_name,
"x1": x1,
"y1": y1,
"x2": x2,
"y2": y2,
"arrow_color": config["arrow_color"],
"tooltip": tooltip
}])
continue
else:
tooltip = html.escape(self.describe_connection_to(self[layer_name], out))
x2 = positioning[out.name]["x"] + positioning[out.name]["width"]/2
y2 = positioning[out.name]["y"] + positioning[out.name]["height"]
struct.append(["curve", {
"endpoint": True,
"drawn": False,
"name": out.name,
"x1": x1,
"y1": y1,
"x2": x2,
"y2": y2 + 2,
"arrow_color": config["arrow_color"],
"tooltip": tooltip
}])
#######################################################################
## Bank images
#######################################################################
struct.append(["image_svg", positioning[layer_name]])
struct.append(["label_svg", {"x": positioning[layer_name]["x"] + positioning[layer_name]["width"] + 5,
"y": positioning[layer_name]["y"] + positioning[layer_name]["height"]/2 + 2,
"label": layer_name,
"font_size": config["font_size"],
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
output_shape = self[layer_name].get_output_shape()
if (isinstance(output_shape, tuple) and len(output_shape) == 4 and
self[layer_name].__class__.__name__ != "ImageLayer"):
features = str(output_shape[3])
feature = str(self[layer_name].feature)
if config["svg_rotate"]:
struct.append(["label_svg", {"x": positioning[layer_name]["x"] + 5,
"y": positioning[layer_name]["y"] - 10 - 5,
"label": features,
"font_size": config["font_size"],
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
struct.append(["label_svg", {"x": positioning[layer_name]["x"] + positioning[layer_name]["width"] - 10,
"y": positioning[layer_name]["y"] + positioning[layer_name]["height"] + 10 + 5,
"label": feature,
"font_size": config["font_size"],
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
else:
struct.append(["label_svg", {"x": positioning[layer_name]["x"] + positioning[layer_name]["width"] + 5,
"y": positioning[layer_name]["y"] + 5,
"label": features,
"font_size": config["font_size"],
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
struct.append(["label_svg", {"x": positioning[layer_name]["x"] - (len(feature) * 7) - 5 - 5,
"y": positioning[layer_name]["y"] + positioning[layer_name]["height"] - 5,
"label": feature,
"font_size": config["font_size"],
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
if (self[layer_name].dropout > 0):
struct.append(["label_svg", {"x": positioning[layer_name]["x"] - 1 * 2.0 - 18, # length of chars * 2.0
"y": positioning[layer_name]["y"] + 4,
"label": "o", # "⦻"
"font_size": config["font_size"] * 2.0,
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
struct.append(["label_svg", {"x": positioning[layer_name]["x"] - 1 * 2.0 - 15 + (-3 if config["svg_rotate"] else 0), # length of chars * 2.0
"y": positioning[layer_name]["y"] + 5 + (-1 if config["svg_rotate"] else 0),
"label": "x", # "⦻"
"font_size": config["font_size"] * 1.3,
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
cwidth += width/2
row_height = max(row_height, height)
self._svg_counter += 1
cheight += row_height
level_num += 1
cheight += config["border_bottom"]
### DONE!
## Draw live/static sign
if (class_id is None and dynamic_pictures_check()):
# dynamic image:
label = "*"
if config["svg_rotate"]:
struct.append(["label_svg", {"x": 10,
"y": cheight - 10,
"label": label,
"font_size": config["font_size"] * 2.0,
"font_color": "red",
"font_family": config["font_family"],
"text_anchor": "middle",
}])
else:
struct.append(["label_svg", {"x": 10,
"y": 10,
"label": label,
"font_size": config["font_size"] * 2.0,
"font_color": "red",
"font_family": config["font_family"],
"text_anchor": "middle",
}])
## Draw the title:
if config["svg_rotate"]:
struct.append(["label_svg", {"x": 10, ## really border_left
"y": cheight/2,
"label": self.name,
"font_size": config["font_size"] + 3,
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "middle",
}])
else:
struct.append(["label_svg", {"x": max_width/2,
"y": config["border_top"]/2,
"label": self.name,
"font_size": config["font_size"] + 3,
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "middle",
}])
## figure out scale optimal, if scale is None
## the fraction:
if config["svg_scale"] is not None: ## scale is given:
if config["svg_rotate"]:
scale_value = (config["svg_max_width"] / cheight) * config["svg_scale"]
else:
scale_value = (config["svg_max_width"] / max_width) * config["svg_scale"]
else:
if config["svg_rotate"]:
scale_value = config["svg_max_width"] / max(cheight, max_width)
else:
scale_value = config["svg_preferred_size"] / max(cheight, max_width)
svg_scale = "%s%%" % int(scale_value * 100)
scaled_width = (max_width * scale_value)
scaled_height = (cheight * scale_value)
#######################################################################
### Need a top-level width, height because Jupyter peeks at it
#######################################################################
if config["svg_rotate"]:
svg_transform = """ transform="rotate(90) translate(0 -%s)" """ % scaled_height
### Swap them:
top_width = scaled_height
top_height = scaled_width
else:
svg_transform = ""
top_width = scaled_width
top_height = scaled_height
struct.append(["svg_head", {
"viewbox_width": max_width, # view port width
"viewbox_height": cheight, # view port height
"width": scaled_width, ## actual pixels of image in page
"height": scaled_height, ## actual pixels of image in page
"netname": self.name,
"top_width": top_width,
"top_height": top_height,
"arrow_color": config["arrow_color"],
"arrow_width": config["arrow_width"],
"svg_transform": svg_transform,
}])
return struct
[docs] def to_svg(self, inputs=None, class_id=None, targets=None, **kwargs):
"""
opts - temporary override of config
includes:
"font_size": 12,
"border_top": 25,
"border_bottom": 25,
"hspace": 100,
"vspace": 50,
"image_maxdim": 200
"image_pixels_per_unit": 50
See .config for all options.
"""
if any([(layer.kind() == "unconnected") for layer in self.layers]) or len(self.layers) == 0:
return None
# defaults:
config = copy.copy(self.config)
config.update(kwargs)
struct = self.build_struct(inputs, class_id, config, targets)
### Define the SVG strings:
image_svg = """<rect x="{{rx}}" y="{{ry}}" width="{{rw}}" height="{{rh}}" style="fill:none;stroke:{{border_color}};stroke-width:{{border_width}}"/><image id="{netname}_{{name}}_{{svg_counter}}" class="{netname}_{{name}}" x="{{x}}" y="{{y}}" height="{{height}}" width="{{width}}" preserveAspectRatio="none" image-rendering="optimizeSpeed" xlink:href="{{image}}"><title>{{tooltip}}</title></image>""".format(
**{
"netname": class_id if class_id is not None else self.name,
})
line_svg = """<line x1="{{x1}}" y1="{{y1}}" x2="{{x2}}" y2="{{y2}}" stroke="{{arrow_color}}" stroke-width="{arrow_width}"><title>{{tooltip}}</title></line>""".format(**config)
arrow_svg = """<line x1="{{x1}}" y1="{{y1}}" x2="{{x2}}" y2="{{y2}}" stroke="{{arrow_color}}" stroke-width="{arrow_width}" marker-end="url(#arrow)"><title>{{tooltip}}</title></line>""".format(**config)
curve_svg = '''" stroke="{{arrow_color}}" stroke-width="{arrow_width}" marker-end="url(#arrow)" fill="none" />'''.format(**config)
arrow_rect = """<rect x="{rx}" y="{ry}" width="{rw}" height="{rh}" style="fill:white;stroke:none"><title>{tooltip}</title></rect>"""
label_svg = """<text x="{x}" y="{y}" font-family="{font_family}" font-size="{font_size}" text-anchor="{text_anchor}" fill="{font_color}" alignment-baseline="central" {transform}>{label}</text>"""
svg_head = """<svg id='{netname}' xmlns='http://www.w3.org/2000/svg' xmlns:xlink='http://www.w3.org/1999/xlink' image-rendering="pixelated" width="{top_width}px" height="{top_height}px">
<g {svg_transform}>
<svg viewBox="0 0 {viewbox_width} {viewbox_height}" width="{width}px" height="{height}px">
<defs>
<marker id="arrow" markerWidth="10" markerHeight="10" refX="9" refY="3" orient="auto" markerUnits="strokeWidth">
<path d="M0,0 L0,6 L9,3 z" fill="{arrow_color}" />
</marker>
</defs>"""
templates = {
"image_svg": image_svg,
"line_svg": line_svg,
"arrow_svg": arrow_svg,
"arrow_rect": arrow_rect,
"label_svg": label_svg,
"svg_head": svg_head,
"curve": curve_svg,
}
## get the header:
svg = None
for (template_name, dict) in struct:
if template_name == "svg_head":
svg = svg_head.format(**dict)
## build the rest:
for index in range(len(struct)):
(template_name, dict) = struct[index]
if template_name != "svg_head" and not template_name.startswith("_"):
rotate = dict.get("rotate", config["svg_rotate"])
if template_name == "label_svg" and rotate:
dict["x"] += 8
dict["text_anchor"] = "middle"
dict["transform"] = """ transform="rotate(-90 %s %s) translate(%s)" """ % (dict["x"], dict["y"], 2)
else:
dict["transform"] = ""
if template_name == "curve":
if dict["drawn"] == False:
curve_svg = self._render_curve(dict, struct[index + 1:],
templates[template_name],
config)
svg += curve_svg
else:
t = templates[template_name]
svg += t.format(**dict)
svg += """</svg></g></svg>"""
return svg
def _render_curve(self, start, struct, end_svg, config):
"""
Collect and render points on the line/curve.
"""
class Line():
"""
Properties of a line
Arguments:
pointA (array) [x,y]: coordinates
pointB (array) [x,y]: coordinates
Returns:
Line: { length: l, angle: a }
"""
def __init__(self, pointA, pointB):
lengthX = pointB[0] - pointA[0]
lengthY = pointB[1] - pointA[1]
self.length = math.sqrt(math.pow(lengthX, 2) + math.pow(lengthY, 2))
self.angle = math.atan2(lengthY, lengthX)
def controlPoint(current, previous_point, next_point, reverse=False):
"""
## Position of a control point
## I: - current (array) [x, y]: current point coordinates
## - previous (array) [x, y]: previous point coordinates
## - next (array) [x, y]: next point coordinates
## - reverse (boolean, optional): sets the direction
## O: - (array) [x,y]: a tuple of coordinates
"""
## When 'current' is the first or last point of the array
## 'previous' or 'next' don't exist.
## Replace with 'current'
p = previous_point or current
n = next_point or current
## // Properties of the opposed-line
o = Line(p, n)
## // If is end-control-point, add PI to the angle to go backward
angle = o.angle + (math.pi if reverse else 0)
length = o.length * config["svg_smoothing"]
## // The control point position is relative to the current point
x = current[0] + math.cos(angle) * length
y = current[1] + math.sin(angle) * length
return (x, y)
def bezier(points, index):
"""
Create the bezier curve command
Arguments:
points: complete array of points coordinates
index: index of 'point' in the array 'a'
Returns:
String of current path
"""
current = points[index]
if index == 0:
return "M %s,%s " % (current[0], current[1])
## start control point
prev1 = points[index - 1] if index >= 1 else None
prev2 = points[index - 2] if index >= 2 else None
next1 = points[index + 1] if index < len(points) - 1 else None
cps = controlPoint(prev1, prev2, current, False)
## end control point
cpe = controlPoint(current, prev1, next1, True)
return "C %s,%s %s,%s, %s,%s " % (cps[0], cps[1], cpe[0], cpe[1],
current[0], current[1])
def svgPath(points):
"""
// Render the svg <path> element
// I: - points (array): points coordinates
// - command (function)
// I: - point (array) [x,y]: current point coordinates
// - i (integer): index of 'point' in the array 'a'
// - a (array): complete array of points coordinates
// O: - (string) a svg path command
// O: - (string): a Svg <path> element
"""
## build the d attributes by looping over the points
return '<path d="' + ("".join([bezier(points, i)
for i in range(len(points))]))
points = [(start["x2"], start["y2"]), (start["x1"], start["y1"])] # may be direct line
start["drawn"] = True
for (template, dict) in struct:
## anchor! come in pairs
if ((template == "curve") and (dict["drawn"] == False) and
points[-1] == (dict["x2"], dict["y2"])):
points.append((dict["x1"], dict["y1"]))
dict["drawn"] = True
end_html = end_svg.format(**start)
if len(points) == 2: ## direct, no anchors, no curve:
svg_html = """<path d="M {sx} {sy} L {ex} {ey} """.format(**{
"sx": points[-1][0],
"sy": points[-1][1],
"ex": points[0][0],
"ey": points[0][1],
}) + end_html
else: # construct curve, at least 4 points
points = list(reversed(points))
svg_html = svgPath(points) + end_html
return svg_html
def _get_level_ordering(self):
"""
Returns a list of lists of tuples from
input to output of levels.
Each tuple contains: (layer_name, anchor?, from_name/None)
If anchor is True, this is just an anchor point.
"""
if self._level_ordering is not None:
return self._level_ordering
## First, get a level for all layers:
levels = {}
for layer in topological_sort(self, self.layers):
if not hasattr(layer, "model"):
continue
level = max([levels[lay.name] for lay in layer.incoming_connections] + [-1])
levels[layer.name] = level + 1
max_level = max(levels.values())
ordering = []
for i in range(max_level + 1): # input to output
layer_names = [layer.name for layer in self.layers if levels[layer.name] == i]
ordering.append([(name, False, [x.name for x in self[name].incoming_connections])
for name in layer_names]) # (going_to/layer_name, anchor, coming_from)
## promote all output banks to last row:
for level in range(len(ordering)): # input to output
tuples = ordering[level]
index = 0
for (name, anchor, none) in tuples[:]:
if self[name].kind() == "output":
## move it to last row
## find it and remove
ordering[-1].append(tuples.pop(index))
else:
index += 1
## insert anchor points for any in next level
## that doesn't go to a bank in this level
order_cache = {}
for level in range(len(ordering)): # input to output
tuples = ordering[level]
for (name, anchor, fname) in tuples:
if anchor:
## is this in next? if not add it
next_level = [(n, anchor) for (n, anchor, hfname) in ordering[level + 1]]
if (name, False) not in next_level: ## actual layer not in next level
ordering[level + 1].append((name, True, fname)) # add anchor point
else:
pass ## finally!
else:
## if next level doesn't contain an outgoing
## connection, add it to next level as anchor point
for layer in self[name].outgoing_connections:
next_level = [(n, anchor) for (n, anchor, fname) in ordering[level + 1]]
if (layer.name, False) not in next_level:
ordering[level + 1].append((layer.name, True, name)) # add anchor point
self._level_ordering = ordering = self._optimize_ordering(ordering)
return ordering
def _optimize_ordering(self, ordering):
def perms(l):
return list(itertools.permutations(l))
def distance(xy1, xy2):
return math.sqrt((xy1[0] - xy2[0]) ** 2 + (xy1[1] - xy2[1]) ** 2)
def find_start(cend, canchor, name, plevel):
"""
Return position and weight of link from cend/name to
col in previous level.
"""
col = 1
for bank in plevel:
pend, panchor, pstart_names = bank
if (name == pend):
if (not panchor and not canchor):
weight = 10.0
else:
weight = 1.0
return col, weight
elif cend == pend and name == pstart_names:
return col, 5.0
col += 1
raise Exception("connecting layer not found!")
## First level needs to be in bank_order, and cannot permutate:
first_level = [(bank_name, False, []) for bank_name in self.input_bank_order]
perm_count = reduce(operator.mul, [math.factorial(len(level)) for level in ordering[1:]])
if perm_count < 70000: ## globally minimize
permutations = itertools.product(*[perms(x) for x in ordering[1:]])
## measure arrow distances for them all and find the shortest:
best = (10000000, None, None)
for ordering in permutations:
ordering = (first_level,) + ordering
sum = 0.0
for level_num in range(1, len(ordering)):
level = ordering[level_num]
plevel = ordering[level_num - 1]
col1 = 1
for bank in level: # starts at level 1
cend, canchor, cstart_names = bank
if canchor:
cstart_names = [cstart_names] # put in list
for name in cstart_names:
col2, weight = find_start(cend, canchor, name, plevel)
dist = distance((col1/(len(level) + 1), 0),
(col2/(len(plevel) + 1), .1)) * weight
sum += dist
col1 += 1
if sum < best[0]:
best = (sum, ordering)
return best[1]
else: # locally minimize, between layers:
ordering[0] = first_level ## replace first level with sorted
for level_num in range(1, len(ordering)):
best = (10000000, None, None)
plevel = ordering[level_num - 1]
for level in itertools.permutations(ordering[level_num]):
sum = 0.0
col1 = 1
for bank in level: # starts at level 1
cend, canchor, cstart_names = bank
if canchor:
cstart_names = [cstart_names] # put in list
for name in cstart_names:
col2, weight = find_start(cend, canchor, name, plevel)
dist = distance((col1/(len(level) + 1), 0),
(col2/(len(plevel) + 1), .1)) * weight
sum += dist
col1 += 1
if sum < best[0]:
best = (sum, level)
ordering[level_num] = best[1]
return ordering
[docs] def describe_connection_to(self, layer1, layer2):
"""
Returns a textual description of the weights for the SVG tooltip.
"""
retval = "Weights from %s to %s" % (layer1.name, layer2.name)
if self.model is None:
return retval
for klayer in self.model.layers:
if klayer.name == layer2.name:
weights = klayer.get_weights()
for w in range(len(klayer.weights)):
retval += "\n %s has shape %s" % (
klayer.weights[w].name, weights[w].shape)
return retval
[docs] def saved(self, dir=None):
"""
Return True if network has been saved.
"""
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
return (os.path.isdir(dir) and
os.path.isfile("%s/network.pickle" % dir) and
os.path.isfile("%s/weights.npy" % dir))
[docs] def delete(self, dir=None):
"""
Delete network save folder.
"""
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
import shutil
if os.path.isdir(dir):
shutil.rmtree(dir)
else:
print("Nothing to delete.")
[docs] def load(self, dir=None):
"""
Load the model and the weights/history into an existing conx network.
"""
if self is None:
raise Exception("Network.load() requires a directory name")
elif isinstance(self, str):
dir = self
network = load_network(dir)
return network
else:
self.load_config(dir)
self.load_weights(dir)
[docs] def save(self, dir=None):
"""
Save the model and the weights/history (if compiled) to a dir.
"""
save_network(dir, self)
[docs] def load_model(self, dir=None, filename=None):
"""
Load a model from a dir/filename.
"""
from keras.models import load_model
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
if filename is None:
filename = "model.h5"
self.model = load_model(os.path.join(dir, filename))
self._level_ordering = None
if self.compile_options:
self.reset()
[docs] def save_model(self, dir=None, filename=None):
"""
Save a model (if compiled) to a dir/filename.
"""
if self.model:
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
if filename is None:
filename = "model.h5"
if not os.path.isdir(dir):
os.makedirs(dir)
self.model.save(os.path.join(dir, filename))
else:
raise Exception("need to build network before saving")
[docs] def load_dataset(self, dir=None, *args, **kwargs):
"""
Load a dataset from a directory. Returns True if data loaded,
else False.
"""
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
return self.dataset.load_from_disk(dir, *args, **kwargs)
[docs] def save_dataset(self, dir=None, *args, **kwargs):
"""
Save the dataset to directory.
"""
if len(self.dataset) > 0:
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
self.dataset.save_to_disk(dir, *args, **kwargs)
else:
raise Exception("need to load a dataset before saving")
[docs] def load_history(self, dir=None, filename=None):
"""
Load the history from a dir/file.
network.load_history()
"""
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
if filename is None:
filename = "history.pickle"
full_filename = os.path.join(dir, filename)
if os.path.isfile(full_filename):
with open(os.path.join(dir, filename), "rb") as fp:
self.history = pickle.load(fp)
self.weight_history = pickle.load(fp)
self.epoch_count = (len(self.history) - 1) if self.history else 0
else:
print("WARNING: no such history file '%s'" % full_filename, file=sys.stderr)
[docs] def save_history(self, dir=None, filename=None):
"""
Save the history to a file.
network.save_history()
"""
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
if filename is None:
filename = "history.pickle"
if not os.path.isdir(dir):
os.makedirs(dir)
with open(os.path.join(dir, filename), "wb") as fp:
pickle.dump(self.history, fp)
pickle.dump(self.weight_history, fp)
[docs] def load_weights(self, dir=None, filename=None):
"""
Load the network weights and history from dir/files.
network.load_weights()
"""
if self.model:
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
if filename is None:
filename = "weights.npy"
full_filename = os.path.join(dir, filename)
if os.path.exists(full_filename):
self.model.load_weights(full_filename)
self.load_history(dir)
else:
raise Exception("need to build network before loading weights")
[docs] def save_weights(self, dir=None, filename=None):
"""
Save the network weights and history to dir/files.
network.save_weights()
"""
if self.model:
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
if filename is None:
filename = "weights.npy"
if not os.path.isdir(dir):
os.makedirs(dir)
self.model.save_weights(os.path.join(dir, filename))
self.save_history(dir)
else:
raise Exception("need to build network before saving weights")
[docs] def dashboard(self, width="95%", height="550px", play_rate=0.5):
"""
Build the dashboard for Jupyter widgets. Requires running
in a notebook/jupyterlab.
"""
from .widgets import Dashboard
return Dashboard(self, width, height, play_rate)
[docs] def pp(self, *args, **opts):
"""
Pretty-print a vector.
"""
if isinstance(args[0], str):
label = args[0]
vector = args[1]
else:
label = ""
vector = args[0]
print(label + self.pf(vector[:20], **opts))
[docs] def pf(self, vector, max_line_width=79, **opts):
"""
Pretty-format a vector or item. Returns string.
Arguments:
vector (list): The first parameter.
precision (int): Number of decimal places to show for each
value in vector.
Returns:
str: Returns the vector formatted as a short string.
Examples:
These examples demonstrate the net.pf formatting function:
>>> import conx
>>> net = Network("Test")
>>> net.pf([1.01])
'[1.01]'
>>> net.pf(range(10), precision=2)
'[0.00, 1.00, 2.00, 3.00, 4.00, 5.00, 6.00, 7.00, 8.00, 9.00]'
>>> net.pf([0]*10000) # doctest: +ELLIPSIS
'[0.00, 0.00, 0.00, ...]'
"""
if isinstance(vector, (int, float)):
vector = np.float16(vector)
if isinstance(vector, collections.Iterable):
vector = list(vector)
if isinstance(vector, (list, tuple)):
vector = np.array(vector)
config = copy.copy(self.config)
config.update(opts)
precision = "{0:.%df}" % config["precision"]
retval = np.array2string(
vector,
formatter={'float_kind': precision.format,
'int_kind': precision.format},
separator=", ",
max_line_width=max_line_width).replace("\n", "")
if len(retval) <= max_line_width:
return retval
else:
return retval[:max_line_width - 3] + "..."
[docs] def set_weights(self, weights, layer_name=None):
"""
Set the model's weights, or a particular layer's weights.
>>> net = Network("Weight Set Test", 2, 2, 1, activation="sigmoid")
>>> net.compile(error="mse", optimizer="sgd")
>>> net.set_weights(net.get_weights())
>>> hw = net.get_weights("hidden")
>>> net.set_weights(hw, "hidden")
"""
if self.model is None:
raise Exception("need to build network")
if layer_name is None:
self.model.set_weights(weights)
else:
for i in range(len(self.model.layers)):
if self.model.layers[i].name == layer_name:
w = [np.array(x) for x in
self.model.layers[i].get_weights()]
self.model.layers[i].set_weights(w)
[docs] def to_array(self) -> list:
"""
Get the weights of a network as a flat, one-dimensional list.
Example:
>>> from conx import Network
>>> net = Network("Deep", 3, 4, 5, 2, 3, 4, 5)
>>> net.compile(optimizer="adam", error="mse")
>>> array = net.to_array()
>>> len(array)
103
Returns:
All of weights and biases of the network in a single, flat list.
"""
if self.model is None:
raise Exception("need to build network")
array = []
for layer in self.model.layers:
for weight in layer.get_weights():
array.extend(weight.flatten())
return array
[docs] def from_array(self, array: list):
"""
Load the weights from a list.
Arguments:
array: a sequence (e.g., list, np.array) of numbers
Example:
>>> from conx import Network
>>> net = Network("Deep", 3, 4, 5, 2, 3, 4, 5)
>>> net.compile(optimizer="adam", error="mse")
>>> net.from_array([0] * 103)
>>> array = net.to_array()
>>> len(array)
103
"""
if self.model is None:
raise Exception("need to build network")
position = 0
for layer in self.model.layers:
weights = layer.get_weights()
new_weights = []
for i in range(len(weights)):
w = weights[i]
size = reduce(operator.mul, w.shape)
new_w = np.array(array[position:position + size]).reshape(w.shape)
new_weights.append(new_w)
position += size
layer.set_weights(new_weights)
### Config methods:
[docs] def load_config(self, datadir=None, config_file=None):
"""
"""
if datadir is None:
datadir = "%s.conx" % self.name.replace(" ", "_")
if config_file is None:
config_file = "config.json"
datadir = os.path.expanduser(datadir)
if not os.path.exists(datadir):
## second try, here
datadir = os.path.join('/tmp', datadir)
full_config_file = os.path.join(datadir, config_file)
if os.path.isfile(full_config_file):
with open(full_config_file) as fp:
config_data = json.load(fp)
self.update_config(config_data)
## give up, fail silently
[docs] def save_config(self, datadir=None, config_file=None):
"""
"""
if datadir is None:
datadir = "%s.conx" % self.name.replace(" ", "_")
if config_file is None:
config_file = "config.json"
if not os.path.exists(datadir):
try:
os.makedirs(datadir)
except:
datadir = os.path.join('/tmp', datadir)
os.makedirs(datadir)
full_config_file = os.path.join(datadir, config_file)
self.rebuild_config()
with open(full_config_file, "w") as fp:
json.dump(self.config, fp, indent=" ")
[docs] def update_config(self, config):
"""
"""
self.config.update(config)
for layer in self.layers:
self.update_layer_from_config(layer)
[docs] def rebuild_config(self):
"""
"""
self.config["layers"].clear()
for layer in self.layers:
d = {}
self.config["layers"][layer.name] = d
for item in [
"visible",
"minmax",
"vshape",
"image_maxdim",
"image_pixels_per_unit",
"colormap",
"feature",
"max_draw_units"]:
d[item] = getattr(layer, item)
[docs] def update_layer_from_config(self, layer):
"""
"""
if layer.name in self.config["layers"]:
for item in self.config["layers"][layer.name]:
setattr(layer, item, self.config["layers"][layer.name][item])
[docs] def warn_once(self, message):
if message not in self.warnings:
print(message, file=sys.stderr)
self.warnings[message] = True
class _InterruptHandler():
"""
Class for handling interrupts so that state is not left
in inconsistant situation.
"""
def __init__(self, network, sig=signal.SIGINT):
self.network = network
self.sig = sig
self.interrupted = None
self.released = None
self.original_handler = None
def __enter__(self):
self.interrupted = False
self.released = False
self.original_handler = signal.getsignal(self.sig)
def handler(signum, frame):
self._release()
if self.interrupted:
raise KeyboardInterrupt
print("\nStopping at end of epoch... (^C again to quit now)...")
self.interrupted = True
self.network.model.stop_training = True
signal.signal(self.sig, handler)
return self
def __exit__(self, type, value, tb):
self._release()
def _release(self):
if self.released:
return False
signal.signal(self.sig, self.original_handler)
self.released = True
return True
[docs]def load_network(dir):
import pickle
## load the config file pickle:
with open("%s/network.pickle" % (("%s.conx" % network.name.replace(" ", "_"))
if dir is None else dir), "rb") as fp:
config = pickle.load(fp) ## full config
## Make the network:
network = Network(config["name"])
## Add the layers:
for layer_name in config["layers"]:
layer = make_layer(config["layers"][layer_name]["config"])
network.add(layer)
network.update_layer_from_config(layer)
## Make the connections:
for connection in config["connections"]:
from_name, to_name = connection
network.connect(from_name, to_name)
if "compile_args" in config and config["compile_args"]:
network.compile(**config["compile_args"])
if network.model:
network.load_weights(dir)
return network
[docs]def save_network(datadir, network):
"""
Save the network description in order to be able to recreate
it.
Saves the network name, layers, conecctions, compile args
to network.pickle.
Saves the weights to weights.npy.
Saves the training history to history.pickle.
Saves the network config to config.json.
"""
import pickle
if datadir is None:
datadir = "%s.conx" % network.name.replace(" ", "_")
if not os.path.exists(datadir):
try:
os.makedirs(datadir)
except:
datadir = os.path.join('/tmp', datadir)
os.makedirs(datadir)
config = {
"name": network.name,
"layers": {},
"connections": network.connections,
"compile_args": network.compile_args,
}
## get latest from layers:
for layer in network.layers:
d = {}
config["layers"][layer.name] = d
for item in ["config"]:
d[item] = getattr(layer, item)
success = False
with open("%s/network.pickle" % (("%s.conx" % network.name.replace(" ", "_"))
if datadir is None else datadir), "wb") as fp:
try:
pickle.dump(config, fp)
success = True
except TypeError:
success = False
if not success:
with open("%s/network.pickle" % (("%s.conx" % network.name.replace(" ", "_"))
if datadir is None else datadir), "wb") as fp:
config["compile_args"] = {}
pickle.dump(config, fp)
print("WARNING: can't save compile args; recompile after loading from disk",
file=sys.stderr)
network.save_config(datadir)
if network.model:
network.save_weights(datadir)