# conx - a neural network library
#
# Copyright (c) Douglas S. Blank <doug.blank@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor,
# Boston, MA 02110-1301 USA
"""
The network module contains the code for the Network class.
"""
import collections
import operator
from functools import reduce
import signal
import string
import numbers
import random
import pickle
import base64
import json
import html
import copy
import sys
import io
import os
import re
from typing import Any
import PIL
import numpy as np
import matplotlib.pyplot as plt
import keras
from keras.callbacks import Callback, History
import keras.backend as K
from .utils import *
from .layers import Layer
from .dataset import Dataset
try:
from IPython import get_ipython
except:
get_ipython = lambda: None
#------------------------------------------------------------------------
[docs]class ReportCallback(Callback):
def __init__(self, network, verbose, report_rate, mpl_backend, record):
# mpl_backend is matplotlib backend
super().__init__()
self.network = network
self.verbose = verbose
self.report_rate = report_rate
self.mpl_backend = mpl_backend
self.in_console = self.network.in_console(mpl_backend)
self.record = record
[docs] def on_epoch_end(self, epoch, logs=None):
self.network.history.append(logs)
self.network.epoch_count += 1
if (self.verbose > 0 and
self.in_console and
(epoch+1) % self.report_rate == 0):
self.network.report_epoch(self.network.epoch_count, logs)
if self.record != 0 and (epoch+1) % self.record == 0:
self.network.weight_history[self.network.epoch_count] = self.network.to_array()
[docs]class PlotCallback(Callback):
def __init__(self, network, report_rate, mpl_backend):
# mpl_backend te matplotlib backend string code
#
super().__init__()
self.network = network
self.report_rate = report_rate
self.mpl_backend = mpl_backend
self.in_console = self.network.in_console(mpl_backend)
self.figure = None
[docs] def on_epoch_end(self, epoch, logs=None):
if epoch == -1:
# training loop finished, so make a final update to plot
# in case the number of loop cycles wasn't a multiple of
# report_rate
self.network.plot_results(self)
if not self.in_console:
plt.close(self.figure[0])
elif (epoch+1) % self.report_rate == 0:
self.network.plot_results(self)
[docs]class FunctionCallback(Callback):
"""
'on_batch_begin',
'on_batch_end',
'on_epoch_begin',
'on_epoch_end',
'on_train_begin',
'on_train_end',
"""
def __init__(self, network, on_method, function):
super().__init__()
self.network = network
self.on_method = on_method
self.function = function
[docs] def on_batch_begin(self, batch, logs=None):
if self.on_method == "on_batch_begin":
self.function(self.network, batch, logs)
[docs] def on_batch_end(self, batch, logs=None):
if self.on_method == "on_batch_end":
self.function(self.network, batch, logs)
[docs] def on_epoch_begin(self, epoch, logs=None):
if self.on_method == "on_epoch_begin":
self.function(self.network, epoch, logs)
[docs] def on_epoch_end(self, epoch, logs=None):
if self.on_method == "on_epoch_end":
self.function(self.network, epoch, logs)
[docs] def on_train_begin(self, logs=None):
if self.on_method == "on_train_begin":
self.function(self.network, logs)
[docs] def on_train_end(self, logs=None):
if self.on_method == "on_train_end":
self.function(self.network, logs)
[docs]class StoppingCriteria(Callback):
def __init__(self, item, op, value, use_validation_to_stop):
super().__init__()
self.item = item
self.op = op
self.value = value
self.use_validation_to_stop = use_validation_to_stop
[docs] def on_epoch_end(self, epoch, logs=None):
key = ("val_" + self.item) if self.use_validation_to_stop else self.item
if key in logs: # we get what we need directly:
if self.compare(logs[key], self.op, self.value):
self.model.stop_training = True
else:
## ok, then let's sum/average anything that matches
total = 0
count = 0
for item in logs:
if self.use_validation_to_stop:
if item.startswith("val_") and item.endswith("_" + self.item):
count += 1
total += logs[item]
else:
if item.endswith("_" + self.item) and not item.startswith("val_"):
count += 1
total += logs[item]
if count > 0 and self.compare(total/count, self.op, self.value):
self.model.stop_training = True
[docs] def compare(self, v1, op, v2):
if v2 is None: return False
if op == "<":
return v1 < v2
elif op == ">":
return v1 > v2
elif op == "==":
return v1 == v2
elif op == "<=":
return v1 <= v2
elif op == ">=":
return v1 >= v2
[docs]class Network():
"""
The main class for the conx neural network package.
Arguments:
name: Required. The name of the network. Should not contain special HTML
characters.
sizes: Optional numbers. Defines the sizes of layers of a sequential
network. These will be created, added, and connected automatically.
config: Configuration overrides for the network.
Note:
To create a complete, operating network, you must do the following items:
1. create a network
2. add layers
3. connect the layers
4. compile the network
5. set the dataset
6. train the network
See also :any:`Layer`, :any:`Network.add`, :any:`Network.connect`,
and :any:`Network.compile`.
Examples:
>>> net = Network("XOR1", 2, 5, 2)
>>> len(net.layers)
3
>>> net = Network("XOR2")
>>> net.add(Layer("input", 2))
'input'
>>> net.add(Layer("hidden", 5))
'hidden'
>>> net.add(Layer("output", 2))
'output'
>>> net.connect()
>>> len(net.layers)
3
>>> net = Network("XOR3")
>>> net.add(Layer("input", 2))
'input'
>>> net.add(Layer("hidden", 5))
'hidden'
>>> net.add(Layer("output", 2))
'output'
>>> net.connect("input", "hidden")
>>> net.connect("hidden", "output")
>>> len(net.layers)
3
>>> net = Network("NMIST")
>>> net.name
'NMIST'
>>> len(net.layers)
0
>>> net = Network("NMIST", 10, 5, 1)
>>> len(net.layers)
3
>>> net = Network("NMIST", 10, 5, 5, 1, activation="sigmoid")
>>> net.config["activation"]
'sigmoid'
>>> net["output"].activation == "sigmoid"
True
>>> net["hidden1"].activation == "sigmoid"
True
>>> net["hidden2"].activation == "sigmoid"
True
>>> net["input"].activation is None
True
>>> net.layers[0].name == "input"
True
"""
OPTIMIZERS = ("sgd", "rmsprop", "adagrad", "adadelta", "adam",
"adamax", "nadam", "tfoptimizer")
ERROR_FUNCTIONS = ['binary_crossentropy', 'categorical_crossentropy',
'categorical_hinge', 'cosine', 'cosine_proximity', 'hinge',
'kld', 'kullback_leibler_divergence', 'logcosh', 'mae', 'mape',
'mean_absolute_error', 'mean_absolute_percentage_error', 'mean_squared_error',
'mean_squared_logarithmic_error', 'mse', 'msle', 'poisson',
'sparse_categorical_crossentropy', 'squared_hinge']
def __init__(self, name: str, *sizes: int, load_config=True, **config: Any):
if not isinstance(name, str):
raise Exception("first argument should be a name for the network")
self.debug = False
## Pick a place in the random stream, and remember it:
## (can override randomness with a particular seed):
if not isinstance(name, str):
raise Exception("conx layers need a name as a first parameter")
self._check_network_name(name)
self.name = name
if "seed" in config:
seed = config["seed"]
del config["seed"]
else:
seed = np.random.randint(2 ** 31 - 1)
self.seed = seed
np.random.seed(self.seed)
self.reset_config()
## Next, load a config if available, and override defaults:
self.layers = []
if load_config:
self.load_config()
## Override those with args:
self.config.update(config)
## Set initial values:
self.num_input_layers = 0
self.num_target_layers = 0
self.input_bank_order = []
self.output_bank_order = []
self.dataset = Dataset(self)
self.compile_options = {}
self.train_options = {}
self._tolerance = K.variable(0.1, dtype='float32', name='tolerance')
self.layer_dict = {}
self.epoch_count = 0
self.history = []
self.weight_history = {}
self.update_pictures = get_ipython() is not None
self._comm = None
self.model = None
self.prop_from_dict = {}
self._svg_counter = 1
self._need_to_show_headings = True
self._initialized_javascript = False
# If simple feed-forward network:
for i in range(len(sizes)):
if i > 0:
self.add(Layer(autoname(i, len(sizes)), shape=sizes[i],
activation=self.config["activation"]))
else:
self.add(Layer(autoname(i, len(sizes)), shape=sizes[i]))
# Connect them together:
for i in range(len(sizes) - 1):
self.connect(autoname(i, len(sizes)), autoname(i+1, len(sizes)))
[docs] def reset_config(self):
"""
Reset the config back to factor defaults.
"""
self.config = {
"font_size": 12, # for svg
"font_family": "monospace", # for svg
"border_top": 25, # for svg
"border_bottom": 25, # for svg
"hspace": 150, # for svg
"vspace": 30, # for svg, arrows
"image_maxdim": 200, # for svg
"image_pixels_per_unit": 50, # for svg
"activation": "linear", # Dense default, if none specified
"arrow_color": "black",
"arrow_width": "2",
"border_width": "2",
"border_color": "black",
"show_targets": False,
"show_errors": False,
"pixels_per_unit": 1,
"precision": 2,
"svg_scale": None, # for svg, 0 - 1, or None for optimal
"svg_rotate": False, # for rotating SVG
"svg_preferred_size": 400, # in pixels
"svg_max_width": 800, # in pixels
"dashboard.dataset": "Train",
"dashboard.features.bank": "",
"dashboard.features.columns": 3,
"dashboard.features.scale": 1.0,
"config_layers": {},
}
def _check_network_name(self, name):
"""
Check to see if a network name is appropriate.
Raises exception if invalid name.
"""
valid_chars = string.ascii_letters + string.digits + " _-"
if len(name) == 0:
raise Exception("network name must not be length 0: '%s'" % name)
if not all(char in valid_chars for char in name):
raise Exception("network name must only contain letters, numbers, '-', ' ', and '_': '%s'" % name)
def __getstate__(self):
return {
"name": self.name,
"layers": [layer.__getstate__() for layer in self.layers],
"outgoing_connections": {layer.name: [layer2.name for layer2 in layer.outgoing_connections]
for layer in self.layers},
"config": self.config,
}
def __setstate__(self, state):
from .layers import make_layer
Network.__init__(self, state["name"])
self.config = state["config"]
for layer_state in state["layers"]:
self.add(make_layer(layer_state))
for layer_from in self.layers:
for layer_to in state["outgoing_connections"][layer_from.name]:
self.connect(layer_from.name, layer_to)
def _get_tolerance(self):
return K.get_value(self._tolerance)
def _set_tolerance(self, value):
K.set_value(self._tolerance, value)
tolerance = property(_get_tolerance,
_set_tolerance)
def __getitem__(self, layer_name):
if layer_name not in self.layer_dict:
return None
else:
return self.layer_dict[layer_name]
def _repr_svg_(self):
return self.to_svg(show_errors=False, show_targets=False, svg_rotate=False,
svg_scale=None)
def __repr__(self):
return "<Network name='%s' (%s)>" % (
self.name, ("uncompiled" if not self.model else "compiled"))
[docs] def set_weights_from_history(self, index, epochs=None):
"""
Set the weights of the network from a particular point in the learning
sequence.
net.set_weights_from_history(0) # restore initial weights
net.set_weights_from_history(-1) # restore last weights
See also:
* `Network.get_weights_from_history`
"""
epochs = epochs if epochs is not None else sorted(self.weight_history.keys())
return self.from_array(self.get_weights_from_history(index, epochs))
[docs] def get_weights_from_history(self, index, epochs=None):
"""
Get the weights of the network from a particular point in the learning
sequence.
wts = net.get_weights_from_history(0) # get initial weights
wts = net.get_weights_from_history(-1) # get last weights
See also:
* `Network.set_weights_from_history`
"""
epochs = epochs if epochs is not None else sorted(self.weight_history.keys())
return self.weight_history[epochs[index]]
[docs] def playback(self, function):
"""
Playback a function over the set of recorded weights.
function has signature: function(network, epoch) and returns
a displayable, or list of displayables.
Example:
>>> net = Network("Playback Test", 2, 2, 1, activation="sigmoid")
>>> net.compile(error="mse", optimizer="sgd")
>>> net.dataset.load([
... [[0, 0], [0]],
... [[0, 1], [1]],
... [[1, 0], [1]],
... [[1, 1], [0]]])
>>> results = net.train(10, record=True, verbose=0, plot=False)
>>> def function(network, epoch):
... return None
>>> sv = net.playback(function)
>>> ## Testing:
>>> class Dummy:
... def update(self, result):
... return result
>>> sv.displayers = [Dummy()]
>>> print("Testing"); sv.goto("end") # doctest: +ELLIPSIS
Testing...
"""
from .widgets import SequenceViewer
if len(self.weight_history) == 0:
raise Exception("network wasn't trained with record=True; please train again")
epochs = sorted(self.weight_history.keys())
def display_weight_history(index):
self.set_weights_from_history(index, epochs)
return function(self, epochs[index])
sv = SequenceViewer("%s Playback:" % self.name, display_weight_history, len(epochs))
return sv
[docs] def movie(self, function, movie_name=None, start=0, stop=None, step=1,
loop=0, optimize=True, duration=100, embed=False, mp4=True):
"""
Make a movie from a playback function over the set of recorded weights.
function has signature: function(network, epoch) and should return
a PIL.Image.
Example:
>>> net = Network("Movie Test", 2, 2, 1, activation="sigmoid")
>>> net.compile(error='mse', optimizer="adam")
>>> ds = [[[0, 0], [0]],
... [[0, 1], [1]],
... [[1, 0], [1]],
... [[1, 1], [0]]]
>>> net.dataset.load(ds)
>>> epochs, khistory = net.train(10, verbose=0, report_rate=1000, record=True, plot=False)
>>> img = net.movie(lambda net, epoch: net.propagate_to_image("hidden", [1, 1],
... resize=(500, 100)),
... "/tmp/movie.gif", mp4=False)
>>> img
<IPython.core.display.Image object>
"""
from IPython.display import Image
if len(self.weight_history) == 0:
raise Exception("network wasn't trained with record=True; please train again")
epochs = sorted(self.weight_history.keys())
if stop is None:
stop = len(epochs)
frames = []
indices = []
for index in range(start, stop, step):
self.set_weights_from_history(index, epochs)
frames.append(function(self, epochs[index]))
indices.append(index)
if stop - 1 not in indices:
self.set_weights_from_history(stop - 1, epochs)
frames.append(function(self, epochs[stop - 1]))
if movie_name is None:
movie_name = "%s-movie.gif" % self.name.replace(" ", "_")
if frames:
frames[0].save(movie_name, save_all=True, append_images=frames[1:],
optimize=optimize, loop=loop, duration=duration)
if mp4 is False:
return Image(url=movie_name, embed=embed)
else:
return gif2mp4(movie_name)
[docs] def picture(self, inputs=None, dynamic=False, rotate=False, scale=None,
show_errors=False, show_targets=False, format="html", class_id=None,
**kwargs):
"""
Create an SVG of the network given some inputs (optional).
>>> net = Network("Picture", 2, 2, 1)
>>> net.compile(error="mse", optimizer="adam")
>>> net.picture([.5, .5])
<IPython.core.display.HTML object>
>>> net.picture([.5, .5], dynamic=True)
<IPython.core.display.HTML object>
"""
from IPython.display import HTML
if any([(layer.kind() == "unconnected") for layer in self.layers]) or len(self.layers) == 0:
print("Network error: please add layers and connect them")
return
if not dynamic:
if class_id is not None:
print("WARNING: class_id given but ignored", file=sys.stderr)
r = random.randint(1, 1000000)
class_id = "picture-static-%s-%s" % (self.name, r)
orig_rotate = self.config["svg_rotate"]
orig_show_errors = self.config["show_errors"]
orig_show_targets = self.config["show_targets"]
orig_svg_scale = self.config["svg_scale"]
self.config["svg_rotate"] = rotate
self.config["show_errors"] = show_errors
self.config["show_targets"] = show_targets
self.config["svg_scale"] = scale
svg = self.to_svg(inputs=inputs, class_id=class_id, **kwargs)
self.config["svg_rotate"] = orig_rotate
self.config["show_errors"] = orig_show_errors
self.config["show_targets"] = orig_show_targets
self.config["svg_scale"] = orig_svg_scale
if format == "html":
return HTML(svg)
elif format == "svg":
return svg
elif format == "image":
return svg_to_image(svg)
[docs] def in_console(self, mpl_backend: str) -> bool:
"""
Return True if running connected to a console; False if connected
to notebook, or other non-console system.
Possible values:
* 'TkAgg' - console with Tk
* 'Qt5Agg' - console with Qt
* 'MacOSX' - mac console
* 'module://ipykernel.pylab.backend_inline' - default for notebook and
non-console, and when using %matplotlib inline
* 'NbAgg' - notebook, using %matplotlib notebook
Here, None means not plotting, or just use text.
Note:
If you are running ipython without a DISPLAY with the QT
background, you may wish to:
export QT_QPA_PLATFORM='offscreen'
"""
return mpl_backend not in [
'module://ipykernel.pylab.backend_inline',
'NbAgg',
]
[docs] def add(self, *layers: Layer) -> None:
"""
Add layers to the network layer connections. Order is not
important, unless calling :any:`Network.connect` without any
arguments.
Arguments:
layer: One or more layer instances.
Returns:
layer_name (str) - name of last layer added
Examples:
>>> net = Network("XOR2")
>>> net.add(Layer("input", 2))
'input'
>>> len(net.layers)
1
>>> net = Network("XOR3")
>>> net.add(Layer("input", 2))
'input'
>>> net.add(Layer("hidden", 5))
'hidden'
>>> net.add(Layer("hidden2", 5),
... Layer("hidden3", 5),
... Layer("hidden4", 5),
... Layer("hidden5", 5))
'hidden5'
>>> net.add(Layer("output", 2))
'output'
>>> len(net.layers)
7
Note:
See :any:`Network` for more information.
"""
last_name = None
for layer in layers:
if not isinstance(layer.name, str):
raise Exception("layer_name should be a string")
if layer.name in self.layer_dict:
raise Exception("duplicate layer name '%s'" % layer.name)
## Automatic layer naming by pattern:
if "%d" in layer.name:
layer_names = [layer.name for layer in self.layers]
i = 1
while (layer.name % i) in layer_names:
i += 1
layer.name = layer.name % i
if hasattr(layer, "params") and "name" in layer.params:
layer.params["name"] = layer.name
self.layers.append(layer)
self.layer_dict[layer.name] = layer
## Layers have link back to network
layer.network = self
## Finally, override any config from network.config:
self.update_layer_from_config(layer)
## Return name, for possible connections
last_name = layer.name
return last_name
def update_layer_from_config(self, layer):
if layer.name in self.config["config_layers"]:
for item in self.config["config_layers"][layer.name]:
setattr(layer, item, self.config["config_layers"][layer.name][item])
[docs] def connect(self, from_layer_name : str=None, to_layer_name : str=None):
"""
Connect two layers together if called with arguments. If
called with no arguments, then it will make a sequential
run through the layers in order added.
Arguments:
from_layer_name: Name of layer where connect begins.
to_layer_name: Name of layer where connection ends.
If both from_layer_name and to_layer_name are None, then
all of the layers are connected sequentially in the order
added.
Examples:
>>> net = Network("XOR2")
>>> net.add(Layer("input", 2))
'input'
>>> net.add(Layer("hidden", 5))
'hidden'
>>> net.add(Layer("output", 2))
'output'
>>> net.connect()
>>> [layer.name for layer in net["input"].outgoing_connections]
['hidden']
"""
if len(self.layers) == 0:
raise Exception("no layers have been added")
if from_layer_name is not None and not isinstance(from_layer_name, str):
raise Exception("from_layer_name should be a string or None")
if to_layer_name is not None and not isinstance(to_layer_name, str):
raise Exception("to_layer_name should be a string or None")
if from_layer_name is None and to_layer_name is None:
if (any([layer.outgoing_connections for layer in self.layers]) or
any([layer.incoming_connections for layer in self.layers])):
raise Exception("layers already have connections")
for i in range(len(self.layers) - 1):
self.connect(self.layers[i].name, self.layers[i+1].name)
else:
if from_layer_name == to_layer_name:
raise Exception("self connections are not allowed")
if not isinstance(from_layer_name, str):
raise Exception("from_layer_name should be a string")
if from_layer_name not in self.layer_dict:
raise Exception('unknown layer: %s' % from_layer_name)
if not isinstance(to_layer_name, str):
raise Exception("to_layer_name should be a string")
if to_layer_name not in self.layer_dict:
raise Exception('unknown layer: %s' % to_layer_name)
from_layer = self.layer_dict[from_layer_name]
to_layer = self.layer_dict[to_layer_name]
## NOTE: these could be allowed, I guess:
if to_layer in from_layer.outgoing_connections:
raise Exception("attempting to duplicate connection: %s to %s" % (from_layer_name, to_layer_name))
from_layer.outgoing_connections.append(to_layer)
if from_layer in to_layer.incoming_connections:
raise Exception("attempting to duplicate connection: %s to %s" % (to_layer_name, from_layer_name))
## Check for input going to a Dense to warn:
if from_layer.shape and len(from_layer.shape) > 1 and to_layer.CLASS.__name__ == "Dense":
print("WARNING: connected multi-dimensional input layer '%s' to layer '%s'; consider adding a FlattenLayer between them" % (
from_layer.name, to_layer.name), file=sys.stderr)
to_layer.incoming_connections.append(from_layer)
## Post connection hooks:
to_layer.on_connect("to", from_layer)
from_layer.on_connect("from", to_layer)
## Compute input/target layers:
input_layers = [layer for layer in self.layers if layer.kind() == "input"]
self.num_input_layers = len(input_layers)
self.input_bank_order = [layer.name for layer in input_layers]
target_layers = [layer for layer in self.layers if layer.kind() == "output"]
self.num_target_layers = len(target_layers)
self.output_bank_order = [layer.name for layer in target_layers]
## Set up a layer's input names, as best possible:
sequence = topological_sort(self, self.layers)
for layer in sequence:
if layer.kind() == 'input':
layer.input_names = set([layer.name])
else:
if len(layer.incoming_connections) == 1:
layer.input_names = layer.incoming_connections[0].input_names
else:
layer.input_names = set([item for sublist in
[incoming.input_names for incoming in layer.incoming_connections]
for item in sublist])
[docs] def depth(self):
"""
Find the depth of the network graph of connections.
"""
max_depth = 0
for in_layer_name in self.input_bank_order:
for out_layer_name in self.output_bank_order:
path = find_path(self, in_layer_name, out_layer_name)
if path:
max_depth = max(len(list(path)) + 1, max_depth)
return max_depth
[docs] def summary(self):
"""
Print out a summary of the network.
"""
if self.model:
self.model.summary()
else:
print("Compile network in order to see summary.")
[docs] def reset(self, clear=False, **overrides):
"""
Reset all of the weights/biases in a network.
The magnitude is based on the size of the network.
"""
self.epoch_count = 0
self.history = []
self.weight_history = {}
self.prop_from_dict = {}
if self.model:
if "seed" in overrides:
self.seed = overrides["seed"]
np.random.seed(self.seed)
del overrides["seed"]
# Compile the whole model again:
if clear:
self.compile_options = {}
self.compile_options.update(overrides)
self.compile(**self.compile_options)
[docs] def test(self, batch_size=32, show=False, tolerance=None, force=False,
show_inputs=True, show_outputs=True,
filter="all", interactive=True):
"""
Test a dataset.
"""
tolerance = tolerance if tolerance is not None else self.tolerance
if len(self.dataset.inputs) == 0:
raise Exception("nothing to test")
length = len(self.dataset.train_targets)
if self.dataset._split == 1.0: ## special case; use entire set
inputs = self.dataset._inputs
targets = self.dataset._targets
else:
## need to split; check format based on output banks:
targets = [column[:length] for column in self.dataset._targets]
inputs = [column[:length] for column in self.dataset._inputs]
if interactive:
self._test(inputs, targets, "validation dataset", batch_size, show,
tolerance, force, show_inputs, show_outputs, filter, interactive)
else:
results = self._test(inputs, targets, "validation dataset", batch_size, show,
tolerance, force, show_inputs, show_outputs, filter, interactive)
categories = {}
for i in range(length):
label = "%s (%s)" % (self.dataset.labels[i],
"correct" if results[i] else "wrong")
if not label in categories:
categories[label] = []
categories[label].append(self.dataset.inputs[i])
return sorted(categories.items())
def _test(self, inputs, targets, dataset, batch_size=32, show=False,
tolerance=None, force=False,
show_inputs=True, show_outputs=True,
filter="all", interactive=True):
"""
>>> net = Network("Playback Test", 2, 2, 1, activation="sigmoid")
>>> net.compile(error="mse", optimizer="sgd")
>>> net.dataset.load([
... [[0, 0], [0]],
... [[0, 1], [1]],
... [[1, 0], [1]],
... [[1, 1], [0]]])
>>> array = net.to_array()
>>> net.from_array(np.zeros(len(array))) ## Zero-out weights
>>> net._test(net.dataset._inputs, net.dataset._targets, "TEST")
========================================================
Testing TEST with tolerance None...
Total count: 4
correct: 0
incorrect: 4
Total percentage correct: 0.0
>>> net._test(net.dataset._inputs, net.dataset._targets, "TEST", show=True)
========================================================
Testing TEST with tolerance None...
# | inputs | targets | outputs | result
---------------------------------------
0 | [[0.00,0.00]] | [[0.00]] | [0.50] | X
1 | [[0.00,1.00]] | [[1.00]] | [0.50] | X
2 | [[1.00,0.00]] | [[1.00]] | [0.50] | X
3 | [[1.00,1.00]] | [[0.00]] | [0.50] | X
Total count: 4
correct: 0
incorrect: 4
Total percentage correct: 0.0
"""
if interactive:
print("=" * 56)
print("Testing %s with tolerance %.6s..." % (dataset, tolerance))
outputs = self.model.predict(inputs, batch_size=batch_size)
## FYI: outputs not shaped
if self.num_target_layers > 1:
correct = self.compute_correct(outputs, targets, tolerance)
else:
## Warning:
## keras returns outputs as a single column
## conx targets are always multi-column
correct = self.compute_correct([outputs], targets, tolerance)
count = len(correct)
if show:
if show_inputs:
in_formatted = self.pf_matrix(inputs, force)
count = len(in_formatted)
if show_outputs:
targ_formatted = self.pf_matrix(targets, force)
out_formatted = self.pf_matrix(outputs, force)
count = len(out_formatted)
header = "# | "
if show_inputs:
header += "inputs | "
if show_outputs:
header += "targets | outputs | "
header += "result"
print(header)
print("---------------------------------------")
for i in range(count):
show_it = ((filter == "all") or
(filter == "correct" and correct[i]) or
(filter == "incorrect" and not correct[i]))
if show_it:
line = "%d | " % i
if show_inputs:
line += "%s | " % in_formatted[i]
if show_outputs:
line += "%s | %s | " % (targ_formatted[i], out_formatted[i])
line += "correct" if correct[i] else "X"
print(line)
if interactive:
print("Total count:", len(correct))
print(" correct:", len([c for c in correct if c]))
print(" incorrect:", len([c for c in correct if not c]))
print("Total percentage correct:", list(correct).count(True)/len(correct))
else:
return list(correct)
[docs] def compute_correct(self, outputs, targets, tolerance=None):
"""
Both are np.arrays. Return [True, ...].
"""
tolerance = tolerance if tolerance is not None else self.tolerance
correct = []
for r in range(len(outputs[0])):
row = []
for c in range(len(outputs)):
row.extend(list(map(lambda v: v <= tolerance, np.abs(outputs[c][r] - targets[c][r]))))
correct.append(all(row))
return correct
[docs] def train_one(self, inputs, targets, batch_size=32, update_pictures=False):
"""
Train on one input/target pair.
Inputs should be a vector if one input bank, or a list of vectors
if more than one input bank.
Targets should be a vector if one output bank, or a list of vectors
if more than one output bank.
Alternatively, inputs and targets can each be a dictionary mapping
bank to vector.
Examples:
>>> from conx import Network, Layer, SGD, Dataset
>>> net = Network("XOR", 2, 2, 1, activation="sigmoid")
>>> net.compile(error='mean_squared_error',
... optimizer=SGD(lr=0.3, momentum=0.9))
>>> ds = [[[0, 0], [0]],
... [[0, 1], [1]],
... [[1, 0], [1]],
... [[1, 1], [0]]]
>>> net.dataset.load(ds)
>>> out, err = net.train_one({"input": [0, 0]},
... {"output": [0]})
>>> len(out)
1
>>> len(err)
1
>>> from conx import Network, Layer, SGD, Dataset
>>> net = Network("XOR2")
>>> net.add(Layer("input%d", shape=1))
'input1'
>>> net.add(Layer("input%d", shape=1))
'input2'
>>> net.add(Layer("hidden%d", shape=2, activation="sigmoid"))
'hidden1'
>>> net.add(Layer("hidden%d", shape=2, activation="sigmoid"))
'hidden2'
>>> net.add(Layer("shared-hidden", shape=2, activation="sigmoid"))
'shared-hidden'
>>> net.add(Layer("output%d", shape=1, activation="sigmoid"))
'output1'
>>> net.add(Layer("output%d", shape=1, activation="sigmoid"))
'output2'
>>> net.connect("input1", "hidden1")
>>> net.connect("input2", "hidden2")
>>> net.connect("hidden1", "shared-hidden")
>>> net.connect("hidden2", "shared-hidden")
>>> net.connect("shared-hidden", "output1")
>>> net.connect("shared-hidden", "output2")
>>> net.compile(error='mean_squared_error',
... optimizer=SGD(lr=0.3, momentum=0.9))
>>> ds = [([[0],[0]], [[0],[0]]),
... ([[0],[1]], [[1],[1]]),
... ([[1],[0]], [[1],[1]]),
... ([[1],[1]], [[0],[0]])]
>>> net.dataset.load(ds)
>>> net.compile(error='mean_squared_error',
... optimizer=SGD(lr=0.3, momentum=0.9))
>>> out, err = net.train_one({"input1": [0], "input2": [0]},
... {"output1": [0], "output2": [0]})
>>> len(out)
2
>>> len(err)
2
>>> net.dataset._num_input_banks()
2
>>> net.dataset._num_target_banks()
2
"""
if isinstance(inputs, dict):
inputs = [inputs[name] for name in self.input_bank_order]
if self.num_input_layers == 1:
inputs = inputs[0]
if isinstance(targets, dict):
targets = [targets[name] for name in self.output_bank_order]
if self.num_target_layers == 1:
targets = targets[0]
pairs = [(inputs, targets)]
if self.num_input_layers == 1:
ins = np.array([pair[0] for pair in pairs], "float32")
else:
ins = []
for i in range(len(pairs[0][0])):
ins.append(np.array([pair[0][i] for pair in pairs], "float32"))
if self.num_target_layers == 1:
targs = np.array([pair[1] for pair in pairs], "float32")
else:
targs = []
for i in range(len(pairs[0][1])):
targs.append(np.array([pair[1][i] for pair in pairs], "float32"))
history = self.model.fit(ins, targs, epochs=1, verbose=0, batch_size=batch_size)
## may need to update history?
outputs = self.propagate(inputs, batch_size=batch_size, update_pictures=update_pictures)
if len(self.output_bank_order) == 1:
errors = (np.array(outputs) - np.array(targets)).tolist()
else:
errors = []
for bank in range(len(self.output_bank_order)):
errors.append((np.array(outputs[bank]) - np.array(targets[bank])).tolist())
if update_pictures:
if self.config["show_targets"]:
if len(self.output_bank_order) == 1:
self.display_component([targets], "targets")
else:
self.display_component(targets, "targets")
if self.config["show_errors"]: ## min max is error:
if len(self.output_bank_order) == 1:
self.display_component([errors], "errors", minmax=(-1, 1))
else:
errors = []
for bank in range(len(self.output_bank_order)):
errors.append( np.array(outputs[bank]) - np.array(targets[bank]))
self.display_component(errors, "errors", minmax=(-1, 1))
return (outputs, errors)
[docs] def retrain(self, **overrides):
"""
Call network.train() again with same options as last call, unless overrides.
"""
for key in overrides:
if key not in self.train_options:
raise Exception("Unknown train option: %s" % key)
self.train_options.update(overrides)
self.train(**self.train_options)
def _compute_result_acc(self, results):
"""
Compute accuracy from results. There are no val_ items here.
"""
if "acc" in results: return results["acc"]
values = [results[key] for key in results if key.endswith("_acc")]
if len(values) > 0:
return sum(values)/len(values)
else:
raise Exception("attempting to find accuracy in results, but there aren't any")
[docs] def evaluate(self, batch_size=32):
"""
Test the network on the train and test data, returning a dict of results.
Example:
>>> net = Network("Evaluate", 2, 2, 1, activation="sigmoid")
>>> net.compile(error='mean_squared_error', optimizer="adam")
>>> ds = [[[0, 0], [0]],
... [[0, 1], [1]],
... [[1, 0], [1]],
... [[1, 1], [0]]]
>>> net.dataset.load(ds)
>>> net.evaluate() # doctest: +ELLIPSIS
{'loss': ..., 'acc': ...}
"""
if len(self.dataset.inputs) == 0:
raise Exception("no dataset loaded")
if self.model is None:
raise Exception("need to compile network")
(train_inputs, train_targets), (test_inputs, test_targets) = self.dataset._split_data()
train_metrics = self.model.evaluate(train_inputs, train_targets, batch_size=batch_size, verbose=0)
results = {k:v for k, v in zip(self.model.metrics_names, train_metrics)}
if len(test_inputs) > 0:
test_metrics = self.model.evaluate(test_inputs, test_targets, batch_size=batch_size, verbose=0)
results.update({"val_"+k: v for k, v in zip(self.model.metrics_names, test_metrics)})
return results
[docs] def test_dataset_ranges(self):
"""
Test the dataset ranges to see if in range of activation functions.
"""
if len(self.dataset.targets) == 0:
return # nothing to test
for index in range(len(self.dataset._targets)):
if len(self.dataset._targets[index].shape) > 2:
print("WARNING: network '%s' target bank #%s has a multi-dimensional shape, which is not allowed" %
(self.name, index), file=sys.stderr)
for index in range(len(self.output_bank_order)):
layer_name = self.output_bank_order[index]
if self[layer_name].activation == "linear":
continue
lmin, lmax = self[layer_name].get_act_minmax()
# test dataset min to see if in range of act output:
if not (lmin <= self.dataset._targets_range[index][0] <= lmax):
print("WARNING: output bank '%s' has activation function, '%s', that is not consistent with minimum value of targets" %
(layer_name, self[layer_name].activation), file=sys.stderr)
# test dataset min to see if in range of act output:
if not (lmin <= self.dataset._targets_range[index][1] <= lmax):
print("WARNING: output bank '%s' has activation function, '%s', that is not consistent with maximum value of targets" %
(layer_name, self[layer_name].activation), file=sys.stderr)
[docs] def train(self, epochs=1, accuracy=None, error=None, batch_size=32,
report_rate=1, verbose=1, kverbose=0, shuffle=True, tolerance=None,
class_weight=None, sample_weight=None, use_validation_to_stop=False,
plot=True, record=0, callbacks=None, save=False):
"""
Train the network.
To stop before number of epochs, give either error=VALUE, or accuracy=VALUE.
Normally, it will check training info to stop, unless you
use_validation_to_stop = True.
Arguments:
epochs (int): Maximum number of epochs (sweeps) through
training data.
accuracy (float): Value of correctness (0.0 - 1.0) to attain in
order to stop. Depends on tolerance to determine accuracy.
error (float): Error to attain in order to stop. Depends on error
function given in `Network.compile`.
batch_size (int): Size of batch to train on.
report_rate (int): Rate of feedback on learning, in epochs.
verbose (int): Level of feedback on training. verbose=0 gives no
feedback, but returns (epoch_count, result)
kverbose (int): Level of feedback from Keras.
shuffle (bool or str): Should the training data be shuffled?
'batch' shuffles in batch-sized chunks.
tolerance (float): The maximum difference between target and output
that should be considered correct.
class_weight (float):
sample_weight (float):
use_validation_to_stop (bool): If `True`, then accuracy and error will
use the validation set rather than the training set.
plot (bool): If `True`, then the feedback will be shown in graphical form.
record (int): If 'record != 0', the weights will be saved every record
number of epochs.
callbacks (list): A list of (str, function) where str is 'on_batch_begin',
'on_batch_end', 'on_epoch_begin', 'on_epoch_end', 'on_train_begin',
or 'on_train_end', and function takes a network, and other
parameters, depending on str.
save (bool): If `True`, then the network is saved at end, whether
interrupted or not.
Returns:
tuple: (epoch_count, result) if verbose == 0
None: if verbose != 0
Examples:
>>> net = Network("Train Test", 1, 3, 1)
>>> net.compile(error="mse", optimizer="rmsprop")
>>> net.dataset.append([0.0], [1.0])
>>> net.dataset.append([1.0], [0.0])
>>> net.train(plot=False) # doctest: +ELLIPSIS
Evaluating initial training metrics...
Training...
...
"""
self.train_options = {
"epochs": epochs,
"accuracy": accuracy,
"error": error,
"batch_size": batch_size,
"report_rate": report_rate,
"verbose": verbose,
"shuffle": shuffle,
"class_weight": class_weight,
"sample_weight": sample_weight,
"tolerance": tolerance,
"use_validation_to_stop": use_validation_to_stop,
"plot": plot,
"record": record,
"callbacks": callbacks,
"save": save,
}
if plot:
import matplotlib
mpl_backend = matplotlib.get_backend()
else:
mpl_backend = None
if self.model is None:
raise Exception("need to compile network")
if not isinstance(report_rate, numbers.Integral) or report_rate < 1:
raise Exception("bad report rate: %s" % (report_rate,))
if not (isinstance(batch_size, numbers.Integral) or batch_size is None):
raise Exception("bad batch size: %s" % (batch_size,))
## Test for targets in range of activation function:
self.test_dataset_ranges()
if epochs == 0: return
if len(self.dataset.inputs) == 0:
print("No training data available")
return
if use_validation_to_stop:
if (self.dataset._split == 0):
print("Attempting to use validation to stop, but Network.dataset.split() is 0")
return
elif ((accuracy is None) and (error is None)):
print("Attempting to use validation to stop, but neither accuracy nor error was set")
return
self._need_to_show_headings = True
if tolerance is not None:
if accuracy is None:
raise Exception("tolerance given but unknown accuracy")
K.set_value(self._tolerance, tolerance)
## Going to need evaluation on training set in any event:
if self.dataset._split == 1.0: ## special case; use entire set
inputs = self.dataset._inputs
targets = self.dataset._targets
else:
## need to split; check format based on output banks:
length = len(self.dataset.train_targets)
targets = [column[:length] for column in self.dataset._targets]
inputs = [column[:length] for column in self.dataset._inputs]
if len(self.history) > 0:
results = self.history[-1]
else:
if verbose > 0:
print("Evaluating initial training metrics...")
values = self.model.evaluate(inputs, targets, batch_size=batch_size, verbose=0)
if not isinstance(values, list): # if metrics is just a single value
values = [values]
results = {metric: value for metric,value in zip(self.model.metrics_names, values)}
results_acc = self._compute_result_acc(results)
## look at split, use validation subset:
if self.dataset._split == 0.0: ## None
val_results = {}
elif self.dataset._split == 1.0: ## special case; use entire set; already done!
val_results = {"val_%s" % key: results[key] for key in results}
else: # split is greater than 0, less than 1
if verbose > 0:
print("Evaluating initial validation metrics...")
## need to split; check format based on output banks:
length = len(self.dataset.test_targets)
targets = [column[-length:] for column in self.dataset._targets]
inputs = [column[-length:] for column in self.dataset._inputs]
val_values = self.model.evaluate(inputs, targets, batch_size=batch_size, verbose=0)
val_results = {"val_%s" % metric: value for metric,value in zip(self.model.metrics_names, val_values)}
if val_results:
val_results_acc = self._compute_result_acc(val_results)
if use_validation_to_stop:
if ((self.dataset._split > 0) and
((accuracy is not None) or (error is not None))):
need_to_train = True
if ((accuracy is not None) and (val_results_acc >= accuracy)):
print("No training required: validation accuracy already to desired value")
need_to_train = False
elif ((error is not None) and (val_results["loss"] <= error)):
print("No training required: validation error already to desired value")
need_to_train = False
if not need_to_train:
print("Training dataset status:")
self.report_epoch(self.epoch_count, results)
print("Validation dataset status:")
self.report_epoch(self.epoch_count, val_results)
return
else: ## regular training to stop, use_validation_to_stop is False
if ((accuracy is not None) and (results_acc >= accuracy)):
print("No training required: accuracy already to desired value")
print("Training dataset status:")
self.report_epoch(self.epoch_count, results)
return
elif ((error is not None) and (results["loss"] <= error)):
print("No training required: error already to desired value")
print("Training dataset status:")
self.report_epoch(self.epoch_count, results)
return
## Ok, now we know we need to train:
results.update(val_results)
if len(self.history) == 0:
self.history = [results]
if record:
self.weight_history[0] = self.to_array()
if verbose > 0:
print("Training...")
if self.in_console(mpl_backend) and verbose > 0:
self.report_epoch(self.epoch_count, self.history[-1])
interrupted = False
kcallbacks = [
History(),
ReportCallback(self, verbose, report_rate, mpl_backend, record),
]
if accuracy is not None:
kcallbacks.append(StoppingCriteria("acc", ">=", accuracy, use_validation_to_stop))
if error is not None:
kcallbacks.append(StoppingCriteria("loss", "<=", error, use_validation_to_stop))
if plot:
pc = PlotCallback(self, report_rate, mpl_backend)
kcallbacks.append(pc)
if callbacks is not None:
for (on_method, function) in callbacks:
kcallbacks.append(FunctionCallback(self, on_method, function))
with _InterruptHandler(self) as handler:
if self.dataset._split == 1:
result = self.model.fit(self.dataset._inputs,
self.dataset._targets,
batch_size=batch_size,
epochs=epochs,
validation_data=(self.dataset._inputs,
self.dataset._targets),
callbacks=kcallbacks,
shuffle=shuffle,
class_weight=class_weight,
sample_weight=sample_weight,
verbose=kverbose)
else:
result = self.model.fit(self.dataset._inputs,
self.dataset._targets,
batch_size=batch_size,
epochs=epochs,
validation_split=self.dataset._split,
callbacks=kcallbacks,
shuffle=shuffle,
class_weight=class_weight,
sample_weight=sample_weight,
verbose=kverbose)
if plot:
pc.on_epoch_end(-1)
if handler.interrupted:
interrupted = True
if interrupted:
if verbose:
print("Interrupted! Cleaning up...")
last_epoch = self.history[-1]
if record:
self.weight_history[self.epoch_count] = self.to_array()
assert len(self.history) == self.epoch_count+1 # +1 is for epoch 0
if verbose:
print("=" * 56)
self.report_epoch(self.epoch_count, last_epoch)
if save:
if verbose:
print("Saving network... ", end="")
self.save()
if verbose:
print("Saved!")
if interrupted:
raise KeyboardInterrupt
if verbose == 0:
return (self.epoch_count, result)
[docs] def report_epoch(self, epoch_count, results):
"""
Print out stats for the epoch.
"""
if self._need_to_show_headings:
h1 = " "
h2 = "Epochs "
h3 = "------ "
if 'loss' in results:
h1 += "| Training "
h2 += "| Error "
h3 += "| --------- "
if 'acc' in results:
h1 += "| Training "
h2 += "| Accuracy "
h3 += "| --------- "
if 'val_loss' in results:
h1 += "| Validate "
h2 += "| Error "
h3 += "| --------- "
if 'val_acc' in results:
h1 += "| Validate "
h2 += "| Accuracy "
h3 += "| --------- "
for other in sorted(results):
if other not in ["loss", "acc", "val_loss", "val_acc"]:
if not other.endswith("_loss"):
w1, w2 = other.replace("_", " ").split(" ", 1)
maxlen = max(len(w1), len(w2), 9)
h1 += "| " + (("%%%ds " % maxlen) % w1)
h2 += "| " + (("%%%ds " % maxlen) % w2)
h3 += "| %s " % ("-" * (maxlen))
print(h1)
print(h2)
print(h3)
self._need_to_show_headings = False
s = "#%5d " % (epoch_count,)
if 'loss' in results:
s += "| %9.5f " % (results['loss'],)
if 'acc' in results:
s += "| %9.5f " % (results['acc'],)
if 'val_loss' in results:
s += "| %9.5f " % (results['val_loss'],)
if 'val_acc' in results:
s += "| %9.5f " % (results['val_acc'],)
for other in sorted(results):
if other not in ["loss", "acc", "val_loss", "val_acc"]:
if not other.endswith("_loss"):
other_str = other
if other.endswith("_acc"):
other_str = other[:-4] + " accuracy"
s += "| %9.5f " % results[other]
print(s)
[docs] def set_dataset(self, dataset):
"""
Set the dataset for the network.
Examples:
>>> from conx import Dataset
>>> data = [[[0, 0], [0]],
... [[0, 1], [1]],
... [[1, 0], [1]],
... [[1, 1], [0]]]
>>> ds = Dataset()
>>> ds.load(data)
>>> net = Network("Set Dataset Test", 2, 2, 1)
>>> net.compile(error="mse", optimizer="adam")
>>> net.set_dataset(ds)
"""
if not isinstance(dataset, Dataset):
raise Exception("Network.set_dataset() takes a Dataset object")
if dataset.network is not None:
print("INFO: using dataset on a new network, replacing old network", file=sys.stderr)
self.dataset = dataset
self.dataset.network = self
self.test_dataset_ranges()
self.dataset._verify_network_dataset_match()
[docs] def set_activation(self, layer_name, activation):
"""
Swap activation function of a layer after compile.
"""
from keras.models import load_model
import keras.activations
import tempfile
if not isinstance(layer_name, str):
raise Exception("layer_name should be a string")
if not isinstance(activation, str):
activation = activation.__name__
acts = {
'relu': keras.activations.relu,
'sigmoid': keras.activations.sigmoid,
'linear': keras.activations.linear,
'softmax': keras.activations.softmax,
'tanh': keras.activations.tanh,
'elu': keras.activations.elu,
'selu': keras.activations.selu,
'softplus': keras.activations.softplus,
'softsign': keras.activations.softsign,
'hard_sigmoid': keras.activations.hard_sigmoid,
}
if self.model:
self[layer_name].keras_layer.activation = acts[activation]
self[layer_name].activation = activation
with tempfile.NamedTemporaryFile() as tf:
filename = tf.name
self.model.save(filename)
self.model = load_model(filename)
else:
raise Exception("can't change activation until after compile")
[docs] def get_weights_as_image(self, layer_name, colormap=None):
"""
Get the weights from the model.
>>> net = Network("Weight as Image Test", 2, 2, 5)
>>> net.compile(error="mse", optimizer="adam")
>>> net.get_weights_as_image("hidden") # doctest: +ELLIPSIS
<PIL.Image.Image image mode=RGBA size=2x2 at ...>
"""
from matplotlib import cm
if not isinstance(layer_name, str):
raise Exception("layer_name should be a string")
if self.model is None:
raise Exception("need to compile network")
weights = [layer.get_weights() for layer in self.model.layers
if layer_name == layer.name][0]
weights = weights[0] # get the weight matrix, not the biases
vector = scale_output_for_image(weights, (-5,5), truncate=True)
if len(vector.shape) == 1:
vector = vector.reshape((1, vector.shape[0]))
size = self.config["pixels_per_unit"]
new_width = vector.shape[0] * size # in, pixels
new_height = vector.shape[1] * size # in, pixels
if colormap is None:
colormap = get_colormap() if self[layer_name].colormap is None else self[layer_name].colormap
try:
cm_hot = cm.get_cmap(colormap)
except:
cm_hot = cm.get_cmap("RdGy")
vector = cm_hot(vector)
vector = np.uint8(vector * 255)
image = PIL.Image.fromarray(vector)
image = image.resize((new_height, new_width))
return image
[docs] def get_weights(self, layer_name=None):
"""
Get the weights from a layer, or the entire model.
Examples:
>>> net = Network("Weight Test", 2, 2, 5)
>>> net.compile(error="mse", optimizer="adam")
>>> len(net.get_weights("input"))
0
>>> len(net.get_weights("hidden"))
2
>>> shape(net.get_weights("hidden")[0]) ## weights
(2, 2)
>>> shape(net.get_weights("hidden")[1]) ## biases
(2,)
>>> len(net.get_weights("output"))
2
>>> shape(net.get_weights("output")[0]) ## weights
(2, 5)
>>> shape(net.get_weights("output")[1]) ## biases
(5,)
>>> net = Network("Weight Get Test", 2, 2, 1, activation="sigmoid")
>>> net.compile(error="mse", optimizer="sgd")
>>> len(net.get_weights())
3
See also:
* `Network.to_array`
* `Network.from_array`
* `Network.get_weights_as_image`
"""
if self.model is None:
raise Exception("need to compile network")
if layer_name is not None:
weights = [layer.get_weights() for layer in self.model.layers
if layer_name == layer.name][0]
return [m.tolist() for m in weights]
else:
weights = []
for i in range(len(self.model.layers)):
w = self.model.layers[i].get_weights()
weights.append(w)
return weights
[docs] def propagate(self, input, batch_size=32, class_id=None,
update_pictures=False, raw=False):
"""
Propagate an input (in human API) through the network.
If visualizing, the network image will be updated.
Inputs should be a vector if one input bank, or a list of vectors
if more than one input bank.
Alternatively, inputs can be a dictionary mapping
bank to vector.
>>> net = Network("Prop Test", 2, 2, 5)
>>> net.compile(error="mse", optimizer="adam")
>>> len(net.propagate([0.5, 0.5]))
5
>>> len(net.propagate({"input": [1, 1]}))
5
"""
if self.model is None:
raise Exception("Need to compile network first")
if isinstance(input, dict):
input = [input[name] for name in self.input_bank_order]
if self.num_input_layers == 1:
input = input[0]
elif isinstance(input, PIL.Image.Image):
input = image_to_array(input)
## End of input setup
if not is_array_like(input):
raise Exception("inputs should be an array")
if raw:
outputs = self.model.predict(np.array(input), batch_size=batch_size)
elif self.num_input_layers == 1:
outputs = self.model.predict(np.array([input]), batch_size=batch_size)
else:
inputs = [np.array([x], "float32") for x in input]
outputs = self.model.predict(inputs, batch_size=batch_size)
## Shape the outputs:
if raw:
pass
elif self.num_target_layers == 1:
shape = self[self.output_bank_order[0]].shape
try:
outputs = outputs[0].reshape(shape).tolist()
except:
outputs = outputs[0].tolist() # can't reshape; maybe a dynamically changing output
else:
shapes = [self[layer_name].shape for layer_name in self.output_bank_order]
## FIXME: may not be able to reshape; dynamically changing output
outputs = [outputs[i].reshape(shapes[i]).tolist() for i in range(len(self.output_bank_order))]
if update_pictures:
for layer in self.layers:
self.propagate_to(layer.name, input, batch_size, class_id=class_id,
update_pictures=update_pictures, raw=raw, update_path=False)
return outputs
[docs] def propagate_from(self, layer_name, input, output_layer_names=None,
batch_size=32, update_pictures=False, raw=False):
"""
Propagate activations from the given layer name to the output layers.
"""
if not isinstance(layer_name, str):
raise Exception("layer_name should be a string")
if layer_name not in self.layer_dict:
raise Exception("No such layer '%s'" % layer_name)
if isinstance(input, dict):
input = [input[name] for name in self.input_bank_order]
if self.num_input_layers == 1:
input = input[0]
elif isinstance(input, PIL.Image.Image):
input = image_to_array(input)
## End of input setup
if not is_array_like(input):
raise Exception("inputs should be an array")
if output_layer_names is None:
if self.num_target_layers == 1:
output_layer_names = [layer.name for layer in self.layers if layer.kind() == "output"]
else:
output_layer_names = self.output_bank_order
else:
if isinstance(output_layer_names, str):
output_layer_names = [output_layer_names]
outputs = []
for output_layer_name in output_layer_names:
if (layer_name, output_layer_name) not in self.prop_from_dict:
path = find_path(self, layer_name, output_layer_name)
# Make a new Input to start here:
if self[layer_name].shape is not None:
input_k = k = keras.layers.Input(self[layer_name].shape, name=self[layer_name].name)
else:
input_k = k = keras.layers.Input(shape(input), name=self[layer_name].name)
# So that we can display activations here:
if (layer_name, layer_name) not in self.prop_from_dict:
self.prop_from_dict[(layer_name, layer_name)] = keras.models.Model(inputs=input_k,
outputs=k)
for layer in path:
k = layer.keras_layer(k)
if (layer_name, layer.name) not in self.prop_from_dict:
self.prop_from_dict[(layer_name, layer.name)] = keras.models.Model(inputs=input_k,
outputs=k)
# Now we should be able to get the prop_from model:
prop_model = self.prop_from_dict.get((layer_name, output_layer_name), None)
if raw:
inputs = input
else:
inputs = np.array([input])
outputs.append([list(x) for x in prop_model.predict(inputs)][0])
## FYI: outputs not shaped
if update_pictures:
if not self._comm:
from ipykernel.comm import Comm
self._comm = Comm(target_name='conx_svg_control')
## Update from start to rest of graph
if self._comm.kernel:
## viz this layer:
if self[layer_name].visible:
image = self[layer_name].make_image(inputs, config=self.config)
data_uri = self._image_to_uri(image)
class_id_name = "%s_%s" % (self.name, layer_name)
if self.config["svg_rotate"]:
class_id_name += "-rotated"
if self.debug: print("propagate_from 1: class_id_name:", class_id_name)
self._comm.send({'class': class_id_name, "href": data_uri})
for output_layer_name in output_layer_names:
path = find_path(self, layer_name, output_layer_name)
if path is not None:
for layer in path:
if not layer.visible: continue
model = self.prop_from_dict[(layer_name, layer.name)]
vector = model.predict(inputs)[0]
## FYI: outputs not shaped
image = layer.make_image(vector, config=self.config)
data_uri = self._image_to_uri(image)
class_id_name = "%s_%s" % (self.name, layer.name)
if self.config["svg_rotate"]:
class_id_name += "-rotated"
if self.debug: print("propagate_from 2: class_id_name:", class_id_name)
self._comm.send({'class': class_id_name, "href": data_uri})
if raw:
return outputs
elif len(output_layer_names) == 1:
return outputs[0]
else:
return outputs
[docs] def display_component(self, vector, component, class_id=None, **opts):
"""
vector is a list, one each per output layer. component is "errors" or "targets"
"""
config = copy.copy(self.config)
config.update(opts)
output_names = self.output_bank_order
if self._comm.kernel:
for (target, layer_name) in zip(vector, output_names):
array = np.array(target)
if component == "targets":
colormap = self[layer_name].colormap
else:
colormap = get_error_colormap()
image = self[layer_name].make_image(array, colormap, config)
data_uri = self._image_to_uri(image)
if class_id is None:
class_id_name = "%s_%s" % (self.name, layer_name)
else:
class_id_name = "%s_%s" % (class_id, layer_name)
if self.debug: print("display_component: sending to class_id:", class_id_name + "_" + component)
self._comm.send({'class': class_id_name + "_" + component,
"href": data_uri})
[docs] def propagate_to(self, layer_name, inputs, batch_size=32, class_id=None,
update_pictures=False, update_path=True, raw=False):
"""
Computes activation at a layer. Side-effect: updates live SVG.
Arguments:
layer_name (str) - name of layer to propagate activations to
inputs - list of numbers, vector to propagate
batch_size (int) - size of batch
update_pictures (bool) - send images to notebook SVG images
raw (bool) - if True, don't process inputs or outputs
"""
if not isinstance(layer_name, str):
raise Exception("layer_name should be a string")
if layer_name not in self.layer_dict:
raise Exception('unknown layer: %s' % (layer_name,))
if isinstance(inputs, dict):
inputs = [inputs[name] for name in self.input_bank_order]
if self.num_input_layers == 1:
inputs = inputs[0]
elif isinstance(inputs, PIL.Image.Image):
inputs = image_to_array(inputs)
## End of input setup
if not is_array_like(inputs):
raise Exception("inputs should be an array")
if raw:
outputs = self[layer_name].model.predict(np.array(inputs), batch_size=batch_size)
elif self.num_input_layers == 1:
outputs = self[layer_name].model.predict(np.array([inputs]), batch_size=batch_size)
else:
# get just inputs for this layer, in order:
vector = [np.array([inputs[self.input_bank_order.index(name)]]) for name in
self._get_sorted_input_names(self[layer_name].input_names)]
outputs = self[layer_name].model.predict(vector, batch_size=batch_size)
## output shaped below:
if update_pictures:
if not self._comm:
from ipykernel.comm import Comm
self._comm = Comm(target_name='conx_svg_control')
if self._comm.kernel:
if update_path: ## update the whole path, from all inputs to the layer_name, if a path
## don't repeat any updates, so keep track of what you have done:
updated = set([])
for input_layer_name in self.input_bank_order:
if input_layer_name not in updated:
image = self._propagate_to_image(input_layer_name, inputs, raw=raw)
data_uri = self._image_to_uri(image)
if class_id is None:
class_id_name = "%s_%s" % (self.name, input_layer_name)
else:
class_id_name = "%s_%s" % (class_id, input_layer_name)
if self.config["svg_rotate"]:
class_id_name += "-rotated"
if self.debug: print("propagate_to 1: sending to class_id_name:", class_id_name)
self._comm.send({'class': class_id_name, "href": data_uri})
updated.add(input_layer_name)
path = find_path(self, input_layer_name, layer_name)
if path is not None:
for layer in path:
if layer.visible and layer.model is not None:
if layer.name not in updated:
image = self._propagate_to_image(layer.name, inputs, raw=raw)
data_uri = self._image_to_uri(image)
if class_id is None:
class_id_name = "%s_%s" % (self.name, layer.name)
else:
class_id_name = "%s_%s" % (class_id, layer.name)
if self.config["svg_rotate"]:
class_id_name += "-rotated"
if self.debug: print("propagate_to 2: sending to class_id_name:", class_id_name)
self._comm.send({'class': class_id_name, "href": data_uri})
updated.add(layer.name)
else: # not the whole path, just to the layer_name
image = self._propagate_to_image(layer_name, inputs, raw=raw)
data_uri = self._image_to_uri(image)
if class_id is None:
class_id_name = "%s_%s" % (self.name, layer_name)
else:
class_id_name = "%s_%s" % (class_id, layer_name)
if self.config["svg_rotate"]:
class_id_name += "-rotated"
if self.debug: print("propagate_to 3: sending to class_id_name:", class_id_name)
self._comm.send({'class': class_id_name, "href": data_uri})
## Shape the outputs:
if raw:
return outputs
shape = self[layer_name].shape
if shape and all([isinstance(v, numbers.Integral) for v in shape]):
try:
outputs = outputs[0].reshape(shape).tolist()
except:
outputs = outputs[0].tolist()
else:
outputs = outputs[0].tolist()
return outputs
def _layer_has_features(self, layer_name):
output_shape = self[layer_name].get_output_shape()
return (isinstance(output_shape, tuple) and len(output_shape) == 4)
[docs] def propagate_to_features(self, layer_name, inputs, cols=5, resize=None, scale=1.0,
html=True, size=None, display=True, class_id=None,
update_pictures=False, raw=False):
"""
if html is True, then generate HTML, otherwise send images.
"""
from IPython.display import HTML
if isinstance(inputs, dict):
inputs = [inputs[name] for name in self.input_bank_order]
if self.num_input_layers == 1:
inputs = inputs[0]
elif isinstance(inputs, PIL.Image.Image):
inputs = image_to_array(inputs)
## End of input setup
if not is_array_like(inputs):
raise Exception("inputs should be an array")
if not isinstance(layer_name, str):
raise Exception("layer_name should be a string")
output_shape = self[layer_name].get_output_shape()
retval = """<table><tr>"""
if self._layer_has_features(layer_name):
if html:
orig_feature = self[layer_name].feature
for i in range(output_shape[3]):
self[layer_name].feature = i
## This should return in proper orientation, regardless of rotate setting:
image = self.propagate_to_image(layer_name, inputs, class_id=class_id,
update_pictures=update_pictures, raw=raw)
if resize is not None:
image = image.resize(resize)
if scale != 1.0:
image = image.resize((int(image.size[0] * scale), int(image.size[1] * scale)))
data_uri = self._image_to_uri(image)
retval += """<td style="border: 1px solid black;"><img style="image-rendering: pixelated;" class="%s_%s_feature%s" src="%s"/><br/><center>Feature %s</center></td>""" % (
self.name, layer_name, i, data_uri, i)
if (i + 1) % cols == 0:
retval += """</tr><tr>"""
retval += "</tr></table>"
self[layer_name].feature = orig_feature
if display:
return HTML(retval)
else:
return retval
else:
orig_feature = self[layer_name].feature
for i in range(output_shape[3]):
self[layer_name].feature = i
## This should return in proper orientation, regardless of rotate setting:
image = self.propagate_to_image(layer_name, inputs, class_id=class_id,
update_pictures=update_pictures, raw=raw)
if resize is not None:
image = image.resize(resize)
if scale != 1.0:
image = image.resize((int(image.size[0] * scale), int(image.size[1] * scale)))
data_uri = self._image_to_uri(image)
if not self._comm:
from ipykernel.comm import Comm
self._comm = Comm(target_name='conx_svg_control')
if self._comm.kernel:
self._comm.send({'class': "%s_%s_feature%s" % (self.name, layer_name, i), "src": data_uri})
self[layer_name].feature = orig_feature
else:
raise Exception("layer '%s' has no features" % layer_name)
[docs] def propagate_to_image(self, layer_name, input, batch_size=32, resize=None, scale=1.0,
class_id=None, update_pictures=False, raw=False):
"""
Gets an image of activations at a layer. Always returns image in
proper orientation.
"""
orig_rotate = self.config["svg_rotate"]
self.config["svg_rotate"] = False
image = self._propagate_to_image(layer_name, input, batch_size, resize, scale,
class_id, update_pictures, raw)
self.config["svg_rotate"] = orig_rotate
return image
def _propagate_to_image(self, layer_name, input, batch_size=32, resize=None, scale=1.0,
class_id=None, update_pictures=False, raw=False):
"""
Internal version. Draws to whatever rotation is set.
"""
if isinstance(input, dict):
input = [input[name] for name in self.input_bank_order]
if self.num_input_layers == 1:
input = input[0]
elif isinstance(input, PIL.Image.Image):
input = image_to_array(input)
## End of input setup
if not is_array_like(input):
raise Exception("inputs should be an array")
if not isinstance(layer_name, str):
raise Exception("layer_name should be a string")
outputs = self.propagate_to(layer_name, input, batch_size, class_id=class_id,
update_pictures=update_pictures, raw=raw)
array = np.array(outputs)
image = self[layer_name].make_image(array, config=self.config)
if resize is not None:
image = image.resize(resize)
if scale != 1.0:
image = image.resize((int(image.size[0] * scale), int(image.size[1] * scale)))
return image
[docs] def plot_activation_map(self, from_layer='input', from_units=(0,1), to_layer='output',
to_unit=0, colormap=None, default_from_layer_value=0,
resolution=None, act_range=(0,1), show_values=False, title=None,
scatter=None, symbols=None, default_symbol="o",
format=None, update_pictures=False):
"""
Plot the activations at a bank/unit given two input units.
"""
# first do some error checking
assert self[from_layer] is not None, "unknown layer: %s" % (from_layer,)
assert type(from_units) in (tuple, list) and len(from_units) == 2, \
"expected a pair of ints for the %s units but got %s" % (from_layer, from_units)
ix, iy = from_units
assert 0 <= ix < self[from_layer].size, "no such %s layer unit: %d" % (from_layer, ix)
assert 0 <= iy < self[from_layer].size, "no such %s layer unit: %d" % (from_layer, iy)
assert self[to_layer] is not None, "unknown layer: %s" % (to_layer,)
assert type(to_unit) is int, "expected an int for the %s unit but got %s" % (to_layer, to_unit)
assert 0 <= to_unit < self[to_layer].size, "no such %s layer unit: %d" % (to_layer, to_unit)
if colormap is None: colormap = get_colormap()
if plt is None:
raise Exception("matplotlib was not loaded")
act_min, act_max = self[from_layer].get_act_minmax() if act_range is None else act_range
out_min, out_max = self[to_layer].get_act_minmax()
if resolution is None:
resolution = (act_max - act_min) / 50 # 50x50 pixels by default
xmin, xmax, xstep = act_min, act_max, resolution
ymin, ymax, ystep = act_min, act_max, resolution
xspan = xmax - xmin
yspan = ymax - ymin
xpixels = int(xspan/xstep)+1
ypixels = int(yspan/ystep)+1
mat = np.zeros((ypixels, xpixels))
ovector = self[from_layer].make_dummy_vector(default_from_layer_value)
for row in range(ypixels):
for col in range(xpixels):
# (x,y) corresponds to lower left corner point of pixel
x = xmin + xstep*col
y = ymin + ystep*row
vector = copy.deepcopy(ovector)
vector[ix] = x
vector[iy] = y
activations = self.propagate_from(from_layer, vector, to_layer, update_pictures=update_pictures)
mat[row,col] = activations[to_unit]
fig, ax = plt.subplots()
axim = ax.imshow(mat, origin='lower', cmap=colormap, vmin=out_min, vmax=out_max)
if scatter is not None:
if isinstance(scatter, dict):
scatter = scatter["data"]
if len(scatter) == 2 and isinstance(scatter[0], str):
scatter = [scatter]
for (label, data) in scatter:
kwargs = {}
args = []
xs = [min(vector[0], act_max - .01) * xpixels for vector in data]
ys = [min(vector[1], act_max - .01) * ypixels for vector in data]
if label:
kwargs["label"] = label
symbol = get_symbol(label, symbols, default_symbol)
if symbol:
args.append(symbol)
ax.plot(xs, ys, *args, **kwargs)
ax.legend()
if title is not None:
ax.set_title("Activation of %s[%s]: %s" % (to_layer, to_unit, title))
else:
ax.set_title("Activation of %s[%s]" % (to_layer, to_unit))
ax.set_xlabel("%s[%s]" % (from_layer, ix))
ax.set_ylabel("%s[%s]" % (from_layer, iy))
ax.xaxis.tick_bottom()
ax.set_xticks([i*(xpixels-1)/4 for i in range(5)])
ax.set_xticklabels([xmin+i*xspan/4 for i in range(5)])
ax.set_yticks([i*(ypixels-1)/4 for i in range(5)])
ax.set_yticklabels([ymin+i*yspan/4 for i in range(5)])
cbar = fig.colorbar(axim)
if format is None:
plt.show(block=False)
else:
from IPython.display import SVG
bytes = io.BytesIO()
if format == "svg":
plt.savefig(bytes, format="svg")
plt.close(fig)
img_bytes = bytes.getvalue()
return SVG(img_bytes.decode())
elif format == "image":
plt.savefig(bytes, format="png")
plt.close(fig)
bytes.seek(0)
pil_image = PIL.Image.open(bytes)
return pil_image
else:
raise Exception("format must be None, 'svg', or 'image'")
# optionally print out a table of activation values
if show_values:
s = '\n'
for y in np.linspace(act_max, act_min, 20):
for x in np.linspace(act_min, act_max, 20):
vector = [default_from_layer_value] * self[from_layer].size
vector[ix] = x
vector[iy] = y
out = self.propagate_from(from_layer, vector, to_layer)[to_unit]
s += '%4.2f ' % out
s += '\n'
separator = 100 * '-'
s += separator
print("%s\nActivation of %s[%d] as a function of %s[%d] and %s[%d]" %
(separator, to_layer, to_unit, from_layer, ix, from_layer, iy))
print("rows: %s[%d] decreasing from %.2f to %.2f" % (from_layer, iy, act_max, act_min))
print("cols: %s[%d] increasing from %.2f to %.2f" % (from_layer, ix, act_min, act_max))
print(s)
[docs] def plot_layer_weights(self, layer_name, units='all', wrange=None, wmin=None, wmax=None,
colormap='gray', vshape=None, cbar=True, ticks=5, figsize=(12,3),
format=None):
"""weight range displayed on the colorbar can be specified as wrange=(wmin, wmax),
or individually via wmin/wmax keywords. if wmin or wmax is None, the actual min/max
value of the weight matrix is used. wrange overrides provided wmin/wmax values. ticks
is the number of colorbar ticks displayed. cbar=False turns off the colorbar. units
can be a single unit index number or a list/tuple/range of indices.
"""
if self[layer_name] is None:
raise Exception("unknown layer: %s" % (layer_name,))
if units == 'all':
units = list(range(self[layer_name].size))
elif isinstance(units, numbers.Integral):
units = [units]
elif not isinstance(units, (list, tuple, range)) or len(units) == 0:
raise Exception("units: expected an int or sequence of ints, but got %s" % (units,))
for unit in units:
if not 0 <= unit < self[layer_name].size:
raise Exception("no such unit: %s" % (unit,))
W, b = self[layer_name].keras_layer.get_weights()
W = W.transpose()
to_size, from_size = W.shape
if vshape is None:
rows, cols = 1, from_size
elif not isinstance(vshape, (list, tuple)) or len(vshape) != 2 \
or not isinstance(vshape[0], numbers.Integral) \
or not isinstance(vshape[1], numbers.Integral):
raise Exception("vshape: expected a pair of ints but got %s" % (vshape,))
else:
rows, cols = vshape
if rows*cols != from_size:
raise Exception("vshape %s is incompatible with the number of incoming weights to each %s unit (%d)"
% (vshape, layer_name, from_size))
aspect_ratio = max(rows,cols)/min(rows,cols)
#print("aspect_ratio is", aspect_ratio)
if aspect_ratio > 50: # threshold may need further refinement
print("WARNING: using a visual display shape of (%d, %d), which may be hard to see."
% (rows, cols), file=sys.stderr)
print("You can use vshape=(rows, cols) to specify a different display shape.")
if not isinstance(wmin, (numbers.Number, type(None))):
raise Exception("wmin: expected a number or None but got %s" % (wmin,))
if not isinstance(wmax, (numbers.Number, type(None))):
raise Exception("wmax: expected a number or None but got %s" % (wmax,))
if wrange is None:
if wmin is None:
wmin = np.min(W)
wmin_label = '0' if wmin == 0 else '%+.2f' % (wmin,)
else:
wmin_label = r'$\leq$ 0' if wmin == 0 else r'$\leq$ %+.2f' % (wmin,)
if wmax is None:
wmax = np.max(W)
wmax_label = '0' if wmax == 0 else '%+.2f' % (wmax,)
else:
wmax_label = r'$\geq$ 0' if wmax == 0 else r'$\geq$ %+.2f' % (wmax,)
elif not isinstance(wrange, (list, tuple)) or len(wrange) != 2 \
or not isinstance(wrange[0], (numbers.Number, type(None))) \
or not isinstance(wrange[1], (numbers.Number, type(None))):
raise Exception("wrange: expected a pair of numbers but got %s" % (wrange,))
else: # wrange overrides provided wmin/wmax values
wmin, wmax = wrange
return self.plot_layer_weights(layer_name, units, None, wmin, wmax, colormap, vshape,
cbar, ticks, figsize)
if wmin >= wmax:
raise Exception("specified weight range is empty")
if not isinstance(ticks, numbers.Integral) or ticks < 2:
raise Exception("invalid number of colorbar ticks: %s" % (ticks,))
# clip weights to the range [wmin, wmax] and normalize to [0, 1]:
scaled_W = (np.clip(W, wmin, wmax) - wmin) / (wmax - wmin)
fig, axes = plt.subplots(1, len(units), figsize=figsize)
if len(units) == 1:
axes = [axes]
for unit, ax in zip(units, axes):
ax.axis('off')
ax.set_title('weights to %s[%d]' % (layer_name, unit))
im = scaled_W[unit,:].reshape((rows, cols))
axim = ax.imshow(im, cmap=colormap, vmin=0, vmax=1)
if cbar:
tick_locations = np.linspace(0, 1, ticks)
tick_values = tick_locations * (wmax - wmin) + wmin
colorbar = fig.colorbar(axim, ticks=tick_locations)
cbar_labels = ['0' if t == 0 else '%+.2f' % (t,) for t in tick_values]
cbar_labels[0] = wmin_label
cbar_labels[-1] = wmax_label
colorbar.ax.set_yticklabels(cbar_labels)
if format is None:
plt.show(block=False)
else:
from IPython.display import SVG
bytes = io.BytesIO()
if format == "svg":
plt.savefig(bytes, format="svg")
plt.close(fig)
img_bytes = bytes.getvalue()
return SVG(img_bytes.decode())
elif format == "image":
plt.savefig(bytes, format="png")
plt.close(fig)
bytes.seek(0)
pil_image = PIL.Image.open(bytes)
return pil_image
else:
raise Exception("format must be None, 'svg', or 'image'")
[docs] def show_unit_weights(self, layer_name, unit, vshape=None, ascii=False):
if self[layer_name] is None:
raise Exception("unknown layer: %s" % (layer_name,))
W, b = self[layer_name].keras_layer.get_weights()
W = W.transpose()
to_size, from_size = W.shape
if vshape is None:
rows, cols = 1, from_size
elif not isinstance(vshape, (list, tuple)) or len(vshape) != 2 \
or not isinstance(vshape[0], numbers.Integral) \
or not isinstance(vshape[1], numbers.Integral):
raise Exception("vshape: expected a pair of ints but got %s" % (vshape,))
else:
rows, cols = vshape
if rows*cols != from_size:
raise Exception("vshape %s is incompatible with the number of incoming weights to each %s unit (%d)"
% (vshape, layer_name, from_size))
weights = W[unit].reshape((rows,cols))
for r in range(rows):
for c in range(cols):
w = weights[r][c]
if ascii:
ch = ' ' if w <= 0 else '.' if w < 0.50 else 'o' if w < 0.75 else '@'
print(ch, end=" ")
else:
print('%5.2f' % (w,), end=" ")
print()
[docs] def get_metrics(self):
"""
Returns a list of the metrics available in the Network's history.
"""
metrics = set()
for epoch in self.history:
metrics = metrics.union(set(epoch.keys()))
return sorted(metrics)
[docs] def get_metric(self, metric):
"""
Returns the metric data from the network's history.
>>> net = Network("Test", 2, 2, 1)
>>> net.get_metric("loss")
[]
"""
return [epoch[metric] if metric in epoch else None for epoch in self.history]
[docs] def plot(self, metrics=None, ymin=None, ymax=None, start=0, end=None, legend='best',
label=None, symbols=None, default_symbol="-", title=None, return_fig_ax=False, fig_ax=None,
format=None):
"""Plots the current network history for the specific epoch range and
metrics. metrics is '?', 'all', a metric keyword, or a list of metric keywords.
if metrics is None, loss and accuracy are plotted on separate graphs.
>>> net = Network("Plot Test", 1, 3, 1)
>>> net.compile(error="mse", optimizer="rmsprop")
>>> net.dataset.append([0.0], [1.0])
>>> net.dataset.append([1.0], [0.0])
>>> net.train(plot=False) # doctest: +ELLIPSIS
Evaluating initial training metrics...
Training...
...
>>> net.plot('?')
Available metrics: acc, loss
"""
## https://matplotlib.org/api/markers_api.html
## https://matplotlib.org/api/colors_api.html
if isinstance(ymin, str):
raise Exception("Network.plot() should be called with a metric, or list of metrics")
if len(self.history) == 0:
print("No history available")
return
available_metrics = self.get_metrics()
if metrics is None:
metrics = ['loss']
elif metrics is '?':
print("Available metrics:", ", ".join(available_metrics))
return
elif metrics == 'all':
metrics = available_metrics
elif isinstance(metrics, str):
metrics = [metrics]
elif isinstance(metrics, (list, tuple)):
pass
else:
print("metrics: expected a list or a string but got %s" % (metrics,))
return
## Check metrics, and expand regular expressions:
proposed_metrics = metrics
metrics = []
for metric in proposed_metrics:
for available_metric in available_metrics:
if re.match(metric, available_metric) is not None:
metrics.append(available_metric)
if fig_ax:
fig, ax = fig_ax
else:
fig, ax = plt.subplots(1)
x_values = range(self.epoch_count+1)
x_values = x_values[start:end]
ax.set_xlabel('Epoch')
data_found = False
for metric in metrics:
y_values = self.get_metric(metric)
y_values = y_values[start:end]
if y_values.count(None) == len(y_values):
print("WARNING: No %s data available for the specified epochs (%s-%s)" % (metric, start, end), file=sys.stderr)
else:
next_label = label if label else metric
symbol = get_symbol(label, symbols, default_symbol)
ax.plot(x_values, y_values, symbol, label=next_label)
data_found = True
if not data_found:
if return_fig_ax:
return (fig, ax)
else:
plt.close(fig)
return
if ymin is not None:
plt.ylim(ymin=ymin)
if ymax is not None:
plt.ylim(ymax=ymax)
if legend is not None:
plt.legend(loc=legend)
if title is None:
title = self.name
plt.title(title)
if return_fig_ax:
return (fig, ax)
elif format is None:
plt.show(block=False)
else:
from IPython.display import SVG
bytes = io.BytesIO()
if format == "svg":
plt.savefig(bytes, format="svg")
plt.close(fig)
img_bytes = bytes.getvalue()
return SVG(img_bytes.decode())
elif format == "image":
plt.savefig(bytes, format="png")
plt.close(fig)
bytes.seek(0)
pil_image = PIL.Image.open(bytes)
return pil_image
else:
raise Exception("format must be None, 'svg', or 'image'")
[docs] def show_results(self, report_rate=None):
"""
Show the history of training results. If report_rate is given
use that, else, try to use the last trained report_rate.
"""
report_rate = (report_rate
if report_rate is not None
else self.train_options.get("report_rate", 1))
self._need_to_show_headings = True
for epoch_count in range(0, len(self.history), report_rate):
results = self.history[epoch_count]
self.report_epoch(epoch_count, results)
if len(self.history) > 0:
print("=" * 56)
self.report_epoch(len(self.history) - 1, self.history[-1])
[docs] def plot_results(self, callback=None, format=None):
"""plots loss and accuracy on separate graphs, ignoring any other metrics"""
#print("called on_epoch_end with epoch =", epoch)
metrics = self.get_metrics()
if callback is not None and callback.figure is not None:
# figure and axes objects have already been created
fig, loss_ax, acc_ax = callback.figure
loss_ax.clear()
if acc_ax is not None:
acc_ax.clear()
else:
# first time called, so create figure and axes objects
if 'acc' in metrics or 'val_acc' in metrics:
fig, (loss_ax, acc_ax) = plt.subplots(1, 2, figsize=(10,4))
else:
fig, loss_ax = plt.subplots(1)
acc_ax = None
if callback is not None:
callback.figure = fig, loss_ax, acc_ax
x_values = range(self.epoch_count+1)
for metric in metrics:
y_values = self.get_metric(metric)
if metric == 'loss':
loss_ax.plot(x_values, y_values, label='Training set')
elif metric == 'val_loss':
loss_ax.plot(x_values, y_values, label='Validation set')
elif metric == 'acc' and acc_ax is not None:
acc_ax.plot(x_values, y_values, label='Training set')
elif metric == 'val_acc' and acc_ax is not None:
acc_ax.plot(x_values, y_values, label='Validation set')
loss_ax.set_ylim(bottom=0)
loss_ax.set_title("%s: Error" % (self.name,))
loss_ax.set_xlabel('Epoch')
loss_ax.legend(loc='best')
if acc_ax is not None:
acc_ax.set_ylim([-0.1, 1.1])
acc_ax.set_title("%s: Accuracy" % (self.name,))
acc_ax.set_xlabel('Epoch')
acc_ax.legend(loc='best')
if (callback is not None and not callback.in_console) or format == "svg":
from IPython.display import SVG, clear_output, display
bytes = io.BytesIO()
plt.savefig(bytes, format='svg')
img_bytes = bytes.getvalue()
clear_output(wait=True)
display(SVG(img_bytes.decode()))
#return SVG(img_bytes.decode())
else: # format is None
plt.pause(0.01)
#plt.show(block=False)
[docs] def compile(self, **kwargs):
"""
Check and compile the network.
You must provide error/loss and optimizer keywords.
Possible error/loss functions are:
* 'mse' - mean_squared_error
* 'mae' - mean_absolute_error
* 'mape' - mean_absolute_percentage_error
* 'msle' - mean_squared_logarithmic_error
* 'kld' - kullback_leibler_divergence
* 'cosine' - cosine_proximity
Possible optimizers are:
* 'sgd'
* 'rmsprop'
* 'adagrad'
* 'adadelta'
* 'adam'
* 'adamax'
* 'nadam'
See https://keras.io/ `Model.compile` method for more details.
"""
## Error checking:
if len(self.layers) == 0:
raise Exception("network has no layers")
for layer in self.layers:
if layer.kind() == 'unconnected':
raise Exception("'%s' layer is unconnected" % layer.name)
if "error" in kwargs: # synonym
kwargs["loss"] = kwargs["error"]
del kwargs["error"]
if kwargs["loss"] == 'sparse_categorical_crossentropy':
raise Exception("'sparse_categorical_crossentropy' is not a valid error metric in conx; use 'categorical_crossentropy' with proper targets")
if "optimizer" not in kwargs or "loss" not in kwargs:
raise Exception("both optimizer and error/loss are required to compile a network")
if isinstance(kwargs["optimizer"], str) and kwargs["optimizer"].lower() not in self.OPTIMIZERS:
raise Exception("invalid optimizer '%s'; use valid function or one of %s" % (kwargs["optimizer"], Network.OPTIMIZERS,))
## Build an optimizer:
config = kwargs.get("config", {})
for kw in list(kwargs.keys()):
if kw not in ["loss", "metrics", "optimizer"]:
if kw != "config":
config[kw] = kwargs[kw]
del kwargs[kw]
if config != {}:
error = False
try:
kwargs["optimizer"] = keras.optimizers.get({"class_name": kwargs["optimizer"], "config": config})
except:
error = True
if error:
class_instance = keras.optimizers.get(kwargs["optimizer"])
raise Exception("invalid optimizer arguments %s(**%s); for more information type: help(cx.%s)" % (
kwargs["optimizer"], config, class_instance.__class__.__name__))
### Optimizer is an instance, if given kwargs
using_softmax = False
for layer in self.layers:
if layer.kind() == "output":
if layer.activation is not None and layer.activation == "softmax":
using_softmax = True
if "crossentropy" not in kwargs["loss"]:
print("WARNING: you are using the 'softmax' activation function on layer '%s'" % layer.name, file=sys.stderr)
print(" but not using a 'crossentropy' error measure.", file=sys.stderr)
if "crossentropy" in kwargs["loss"]:
if layer.activation is not None and layer.activation != "softmax":
print("WARNING: you are using a crossentropy error measure", file=sys.stderr)
print(" but not using the 'softmax' activation function on layer '%s'"
% layer.name, file=sys.stderr)
self._build_intermediary_models()
output_k_layers = self._get_output_ks_in_order()
input_k_layers = self._get_input_ks_in_order(self.input_bank_order)
self.model = keras.models.Model(inputs=input_k_layers, outputs=output_k_layers)
if "metrics" in kwargs and kwargs["metrics"] is not None:
pass ## ok allow override
elif using_softmax: ## let's use Keras' default acc function
kwargs['metrics'] = ["acc"] ## Keras' default
if "tolerance" in kwargs:
print("WARNING: using softmax activation function; tolerance is ignored", file=sys.stderr)
else:
kwargs['metrics'] = [self.acc] ## Conx's default
self.compile_options = copy.copy(kwargs)
self.model.compile(**kwargs)
# set each conx layer to point to corresponding keras model layer
for layer in self.layers:
layer.keras_layer = self._find_keras_layer(layer.name)
[docs] def acc(self, targets, outputs):
# This is only used on non-multi-output-bank training:
return K.mean(K.all(K.less_equal(K.abs(targets - outputs), self._tolerance), axis=-1), axis=-1)
def _find_keras_layer(self, layer_name):
"""
Find the associated keras layer.
"""
return [x for x in self.model.layers if x.name == layer_name][0]
def _delete_intermediary_models(self):
"""
Remove these, as they don't pickle.
"""
for layer in self.layers:
layer.k = None
layer.input_names = set([])
layer.model = None
[docs] def update_model(self):
"""
Useful if you change, say, an activation function after training.
"""
self._build_intermediary_models()
def _build_intermediary_models(self):
"""
Construct the layer.k, layer.input_names, and layer.model's.
"""
sequence = topological_sort(self, self.layers)
if self.debug: print("topological sort:", [l.name for l in sequence])
for layer in sequence:
if layer.kind() == 'input':
if self.debug: print("making input layer for", layer.name)
layer.k = layer.make_input_layer_k()
layer.input_names = set([layer.name])
layer.model = keras.models.Model(inputs=layer.k, outputs=layer.k) # identity
else:
if self.debug: print("making layer for", layer.name)
if len(layer.incoming_connections) == 0:
raise Exception("non-input layer '%s' with no incoming connections" % layer.name)
kfuncs = layer.make_keras_functions()
if len(layer.incoming_connections) == 1:
if self.debug: print("single input", layer.incoming_connections[0])
k = layer.incoming_connections[0].k
layer.input_names = layer.incoming_connections[0].input_names
else: # multiple inputs, some type of merge:
if self.debug: print("Merge detected!", [l.name for l in layer.incoming_connections])
if layer.handle_merge:
k = layer.make_keras_function()
else:
k = keras.layers.Concatenate()([incoming.k for incoming in layer.incoming_connections])
# flatten:
layer.input_names = set([item for sublist in
[incoming.input_names for incoming in layer.incoming_connections]
for item in sublist])
if self.debug: print("input names for", layer.name, layer.input_names)
if self.debug: print("applying k's", kfuncs)
for f in kfuncs:
k = f(k)
layer.k = k
## get the inputs to this branch, in order:
input_ks = self._get_input_ks_in_order(layer.input_names)
layer.model = keras.models.Model(inputs=input_ks, outputs=layer.k)
## IS THIS A BETTER WAY?:
#layer.model = keras.models.Model(inputs=input_ks, outputs=layer.keras_layer.output)
def _get_input_ks_in_order(self, layer_names):
"""
Get the Keras function for each of a set of layer names.
[in3, in4] sorted by input bank ordering
"""
sorted_layer_names = self._get_sorted_input_names(set(layer_names))
layer_ks = [self[layer_name].k for layer_name in sorted_layer_names]
if len(layer_ks) == 1:
layer_ks = layer_ks[0]
return layer_ks
def _get_sorted_input_names(self, layer_names):
"""
Given a set of input names, give them back in order.
"""
return [name for (index, name) in sorted([(self.input_bank_order.index(name), name)
for name in layer_names])]
def _get_output_ks_in_order(self):
"""
Get the Keras function for each output layer, in order.
"""
layer_ks = [self[layer_name].k for layer_name in self.output_bank_order]
if len(layer_ks) == 1:
layer_ks = layer_ks[0]
return layer_ks
def _image_to_uri(self, img_src):
# Convert to binary data:
b = io.BytesIO()
try:
img_src.save(b, format='gif')
except:
return ""
data = b.getvalue()
data = base64.b64encode(data)
if not isinstance(data, str):
data = data.decode("latin1")
return "data:image/gif;base64,%s" % html.escape(data)
[docs] def vshape(self, layer_name):
"""
Find the vshape of layer.
"""
layer = self[layer_name]
vshape = layer.vshape if layer.vshape else layer.shape if layer.shape else None
if vshape is None:
vshape = layer.get_output_shape()
return vshape
[docs] def build_struct(self, inputs, class_id, config):
ordering = list(reversed(self._get_level_ordering())) # list of names per level, input to output
### find max_width, image_dims, and row_height
max_width = 0
images = {}
image_dims = {}
row_height = []
# Go through and build images, compute max_width:
for level_tups in ordering: ## output to input:
# first make all images at this level
total_width = 0 # for this row
max_height = 0
for (layer_name, anchor, fname) in level_tups:
if not self[layer_name].visible or anchor: # not need to handle anchors here
continue
if inputs is not None:
v = inputs
elif len(self.dataset.inputs) > 0:
v = self.dataset.inputs[0]
else:
if self.num_input_layers > 1:
v = []
for in_name in self.input_bank_order:
v.append(self[in_name].make_dummy_vector())
else:
in_layer = [layer for layer in self.layers if layer.kind() == "input"][0]
v = in_layer.make_dummy_vector()
if self[layer_name].model:
try:
orig_svg_rotate = self.config["svg_rotate"]
self.config["svg_rotate"] = config["svg_rotate"]
image = self._propagate_to_image(layer_name, v)
self.config["svg_rotate"] = orig_svg_rotate
except:
image = self[layer_name].make_image(np.array(self[layer_name].make_dummy_vector()), config=config)
else:
image = self[layer_name].make_image(np.array(self[layer_name].make_dummy_vector()), config=config)
(width, height) = image.size
images[layer_name] = image ## little image
### Layer settings:
if self[layer_name].image_maxdim:
image_maxdim = self[layer_name].image_maxdim
else:
image_maxdim = config["image_maxdim"]
if self[layer_name].image_pixels_per_unit:
image_pixels_per_unit = self[layer_name].image_pixels_per_unit
else:
image_pixels_per_unit = config["image_pixels_per_unit"]
## First, try based on shape:
#pwidth, pheight = np.array(image.size) * image_pixels_per_unit
vshape = self.vshape(layer_name)
if vshape is None:
pass # leave it define by image
elif len(vshape) == 1:
if vshape[0] is not None:
width = vshape[0] * image_pixels_per_unit
height = image_pixels_per_unit
elif len(vshape) >= 2:
if vshape[0] is not None:
height = vshape[0] * image_pixels_per_unit
if vshape[1] is not None:
width = vshape[1] * image_pixels_per_unit
else:
if len(vshape) > 2:
if vshape[1] is not None:
height = vshape[1] * image_pixels_per_unit
width = vshape[2] * image_pixels_per_unit
elif vshape[1] is not None: # flatten
width = vshape[1] * image_pixels_per_unit
height = image_pixels_per_unit
## make sure not too small:
if width < image_pixels_per_unit:
width = image_pixels_per_unit
if height < image_pixels_per_unit:
height = image_pixels_per_unit
## make sure not too big:
if height > image_maxdim:
height = image_maxdim
if width > image_maxdim:
width = image_maxdim
image_dims[layer_name] = (width, height)
total_width += width + config["hspace"] # space between
max_height = max(max_height, height)
row_height.append(max_height)
max_width = max(max_width, total_width)
### Now that we know the dimensions:
struct = []
cheight = config["border_top"] # top border
## Display targets?
if config["show_targets"]:
# Find the spacing for row:
for (layer_name, anchor, fname) in ordering[0]:
if not self[layer_name].visible:
continue
image = images[layer_name]
(width, height) = image_dims[layer_name]
spacing = max_width / (len(ordering[0]) + 1)
# draw the row of targets:
cwidth = 0
for (layer_name, anchor, fname) in ordering[0]: ## no anchors in output
image = images[layer_name]
(width, height) = image_dims[layer_name]
cwidth += (spacing - width/2)
struct.append(["image_svg", {"name": layer_name + "_targets",
"svg_counter": self._svg_counter,
"x": cwidth,
"y": cheight,
"image": self._image_to_uri(image),
"width": width,
"height": height,
"tooltip": self[layer_name].tooltip(),
"rx": cwidth - 1, # based on arrow width
"ry": cheight - 1,
"rh": height + 2,
"rw": width + 2}])
## show a label
struct.append(["label_svg", {"x": cwidth + width + 5,
"y": cheight + height/2 + 2,
"label": "targets",
"font_size": config["font_size"],
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
cwidth += width/2
## Then we need to add height for output layer again, plus a little bit
cheight += row_height[0] + 10 # max height of row, plus some
## Display error?
if config["show_errors"]:
# Find the spacing for row:
for (layer_name, anchor, fname) in ordering[0]: # no anchors in output
if not self[layer_name].visible:
continue
image = images[layer_name]
(width, height) = image_dims[layer_name]
spacing = max_width / (len(ordering[0]) + 1)
# draw the row of errors:
cwidth = 0
for (layer_name, anchor, fname) in ordering[0]: # no anchors in output
image = images[layer_name]
(width, height) = image_dims[layer_name]
cwidth += (spacing - (width/2))
struct.append(["image_svg", {"name": layer_name + "_errors",
"svg_counter": self._svg_counter,
"x": cwidth,
"y": cheight,
"image": self._image_to_uri(image),
"width": width,
"height": height,
"tooltip": self[layer_name].tooltip(),
"rx": cwidth - 1, # based on arrow width
"ry": cheight - 1,
"rh": height + 2,
"rw": width + 2}])
## show a label
struct.append(["label_svg", {"x": cwidth + width + 5,
"y": cheight + height/2 + 2,
"label": "errors",
"font_size": config["font_size"],
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
cwidth += width/2
## Then we need to add height for output layer again, plus a little bit
cheight += row_height[0] + 10 # max height of row, plus some
# Now we go through again and build SVG:
positioning = {}
level_num = 0
for level_tups in ordering:
spacing = max_width / (len(level_tups) + 1)
cwidth = 0
# See if there are any connections up:
any_connections_up = False
for (layer_name, anchor, fname) in level_tups:
if not self[layer_name].visible or anchor:
continue
for out in self[layer_name].outgoing_connections:
if out.name not in positioning:
continue
any_connections_up = True
if any_connections_up:
cheight += config["vspace"] # for arrows
else: # give a bit of room:
## FIXME: determine if there were spaces drawn last layer
## Right now, just skip any space at all
## cheight += 5
pass
max_height = 0 # for row of images
for (layer_name, anchor, fname) in level_tups:
if not self[layer_name].visible:
continue
if anchor:
anchor_name = "%s-%s-anchor%s" % (layer_name, fname, level_num)
cwidth += spacing
positioning[anchor_name] = {"x": cwidth, "y": cheight}
x1 = cwidth
## now we are at an anchor. Is the thing that it anchors in the
## lower row? level_num is increasing
prev = [(oname, oanchor, lfname) for (oname, oanchor, lfname) in ordering[level_num - 1] if
(((layer_name == oname) and (oanchor is False)) or
((layer_name == oname) and (oanchor is True) and (fname == lfname)))]
if prev:
tooltip = html.escape(self.describe_connection_to(self[fname], self[layer_name]))
if prev[0][1]: # anchor
anchor_name2 = "%s-%s-anchor%s" % (layer_name, fname, level_num - 1)
## draw a line to this anchor point
x2 = positioning[anchor_name2]["x"]
y2 = positioning[anchor_name2]["y"]
struct.append(["line_svg", {"x1":cwidth,
"y1":cheight,
"x2":x2,
"y2":y2,
"arrow_color": config["arrow_color"],
"tooltip": tooltip
}])
else:
## draw a line to this bank
x2 = positioning[layer_name]["x"] + positioning[layer_name]["width"]/2
y2 = positioning[layer_name]["y"] + positioning[layer_name]["height"]
tootip ="TODO"
struct.append(["arrow_svg", {"x1":cwidth,
"y1":cheight,
"x2":x2,
"y2":y2,
"arrow_color": config["arrow_color"],
"tooltip": tooltip
}])
else:
print("that's weird!", layer_name, "is not in", prev)
continue
else:
image = images[layer_name]
(width, height) = image_dims[layer_name]
cwidth += (spacing - (width/2))
positioning[layer_name] = {"name": layer_name + ("-rotated" if config["svg_rotate"] else ""),
"svg_counter": self._svg_counter,
"x": cwidth,
"y": cheight,
"image": self._image_to_uri(image),
"width": width,
"height": height,
"tooltip": self[layer_name].tooltip(),
"rx": cwidth - 1, # based on arrow width
"ry": cheight - 1,
"rh": height + 2,
"rw": width + 2}
x1 = cwidth + width/2
y1 = cheight - 1
#### Background rects for arrow mouseovers
# for out in self[layer_name].outgoing_connections:
# if out.name not in positioning:
# continue
# # draw background to arrows to allow mouseover tooltips:
# x2 = positioning[out.name]["x"] + positioning[out.name]["width"]/2
# y2 = positioning[out.name]["y"] + positioning[out.name]["height"]
# rect_width = abs(x1 - x2)
# rect_extra = 0
# if rect_width < 20:
# rect_extra = 10
# tooltip = html.escape(self.describe_connection_to(self[layer_name], out))
# svg += arrow_rect.format(**{"tooltip": tooltip,
# "rx": min(x2, x1) - rect_extra,
# "ry": min(y2, y1) + 2, # bring down
# "rw": rect_width + rect_extra * 2,
# "rh": abs(y1 - y2) - 2})
# Draw all of the connections up from here:
for out in self[layer_name].outgoing_connections:
if out.name not in positioning:
continue
# draw an arrow between layers:
anchor_name = "%s-%s-anchor%s" % (out.name, layer_name, level_num - 1)
## Don't draw this error, if there is an anchor in the next level
if anchor_name in positioning:
tooltip = html.escape(self.describe_connection_to(self[layer_name], out))
x2 = positioning[anchor_name]["x"]
y2 = positioning[anchor_name]["y"]
struct.append(["line_svg", {"x1":x1,
"y1":y1,
"x2":x2,
"y2":y2,
"arrow_color": config["arrow_color"],
"tooltip": tooltip
}])
continue
else:
tooltip = html.escape(self.describe_connection_to(self[layer_name], out))
x2 = positioning[out.name]["x"] + positioning[out.name]["width"]/2
y2 = positioning[out.name]["y"] + positioning[out.name]["height"]
struct.append(["arrow_svg", {"x1":x1,
"y1":y1,
"x2":x2,
"y2":y2 + 2,
"arrow_color": config["arrow_color"],
"tooltip": tooltip
}])
struct.append(["image_svg", positioning[layer_name]])
struct.append(["label_svg", {"x": positioning[layer_name]["x"] + positioning[layer_name]["width"] + 5,
"y": positioning[layer_name]["y"] + positioning[layer_name]["height"]/2 + 2,
"label": layer_name,
"font_size": config["font_size"],
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
output_shape = self[layer_name].get_output_shape()
if (isinstance(output_shape, tuple) and len(output_shape) == 4 and
self[layer_name].__class__.__name__ != "ImageLayer"):
features = str(output_shape[3])
feature = str(self[layer_name].feature)
if config["svg_rotate"]:
struct.append(["label_svg", {"x": positioning[layer_name]["x"] + 5,
"y": positioning[layer_name]["y"] - 10 - 5,
"label": features,
"font_size": config["font_size"],
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
struct.append(["label_svg", {"x": positioning[layer_name]["x"] + positioning[layer_name]["width"] - 10,
"y": positioning[layer_name]["y"] + positioning[layer_name]["height"] + 10 + 5,
"label": feature,
"font_size": config["font_size"],
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
else:
struct.append(["label_svg", {"x": positioning[layer_name]["x"] + positioning[layer_name]["width"] + 5,
"y": positioning[layer_name]["y"] + 5,
"label": features,
"font_size": config["font_size"],
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
struct.append(["label_svg", {"x": positioning[layer_name]["x"] - (len(feature) * 7) - 5 - 5,
"y": positioning[layer_name]["y"] + positioning[layer_name]["height"] - 5,
"label": feature,
"font_size": config["font_size"],
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
if (self[layer_name].dropout > 0):
label = "⦻"
struct.append(["label_svg", {"x": positioning[layer_name]["x"] - len(label) * 2.0 - 5,
"y": positioning[layer_name]["y"] + 5,
"label": label,
"font_size": config["font_size"] * 2.0,
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "start",
}])
cwidth += width/2
max_height = max(max_height, height)
self._svg_counter += 1
cheight += max_height
level_num += 1
cheight += config["border_bottom"]
### DONE!
## Draw live/static sign
if (class_id is None):
label = "*" # lightning bold, dynamic image
if config["svg_rotate"]:
struct.append(["label_svg", {"x": 10,
"y": cheight - 10,
"label": label,
"font_size": config["font_size"] * 2.0,
"font_color": "red",
"font_family": config["font_family"],
"text_anchor": "middle",
}])
else:
struct.append(["label_svg", {"x": 10,
"y": 10,
"label": label,
"font_size": config["font_size"] * 2.0,
"font_color": "red",
"font_family": config["font_family"],
"text_anchor": "middle",
}])
## Draw the title:
if config["svg_rotate"]:
struct.append(["label_svg", {"x": 10, ## really border_left
"y": cheight/2,
"label": self.name,
"font_size": config["font_size"] + 3,
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "middle",
}])
else:
struct.append(["label_svg", {"x": max_width/2,
"y": config["border_top"]/2,
"label": self.name,
"font_size": config["font_size"] + 3,
"font_color": "black",
"font_family": config["font_family"],
"text_anchor": "middle",
}])
## figure out scale optimal, if scale is None
## the fraction:
if config["svg_scale"] is not None: ## scale is given:
if config["svg_rotate"]:
scale_value = (config["svg_max_width"] / cheight) * config["svg_scale"]
else:
scale_value = (config["svg_max_width"] / max_width) * config["svg_scale"]
else:
if config["svg_rotate"]:
scale_value = config["svg_max_width"] / max(cheight, max_width)
else:
scale_value = config["svg_preferred_size"] / max(cheight, max_width)
svg_scale = "%s%%" % int(scale_value * 100)
scaled_width = (max_width * scale_value)
scaled_height = (cheight * scale_value)
### Need a top-level width, height because Jupyter peeks at it
if config["svg_rotate"]:
svg_transform = """ transform="rotate(90) translate(0 -%s)" """ % scaled_height
### Swap them:
top_width = scaled_height
top_height = scaled_width
else:
svg_transform = ""
top_width = scaled_width
top_height = scaled_height
struct.append(["svg_head", {
"viewbox_width": max_width, # view port width
"viewbox_height": cheight, # view port height
"width": scaled_width, ## actual pixels of image in page
"height": scaled_height, ## actual pixels of image in page
"netname": self.name,
"top_width": top_width,
"top_height": top_height,
"arrow_color": config["arrow_color"],
"arrow_width": config["arrow_width"],
"svg_transform": svg_transform,
}])
return struct
def _initialize_javascript(self):
from IPython.display import Javascript, display
js = """
require(['base/js/namespace'], function(Jupyter) {
Jupyter.notebook.kernel.comm_manager.register_target('conx_svg_control', function(comm, msg) {
comm.on_msg(function(msg) {
var data = msg["content"]["data"];
var images = document.getElementsByClassName(data["class"]);
for (var i = 0; i < images.length; i++) {
if (data["href"]) {
images[i].setAttributeNS(null, "href", data["href"]);
}
if (data["src"]) {
images[i].setAttributeNS(null, "src", data["src"]);
}
}
});
});
});
"""
display(Javascript(js))
self._initialized_javascript = True
[docs] def to_svg(self, inputs=None, class_id=None, **kwargs):
"""
opts - temporary override of config
includes:
"font_size": 12,
"border_top": 25,
"border_bottom": 25,
"hspace": 100,
"vspace": 50,
"image_maxdim": 200
"image_pixels_per_unit": 50
See .config for all options.
"""
if any([(layer.kind() == "unconnected") for layer in self.layers]) or len(self.layers) == 0:
return None
# defaults:
config = copy.copy(self.config)
config.update(kwargs)
struct = self.build_struct(inputs, class_id, config)
### Define the SVG strings:
image_svg = """<rect x="{{rx}}" y="{{ry}}" width="{{rw}}" height="{{rh}}" style="fill:none;stroke:{border_color};stroke-width:{border_width}"/><image id="{netname}_{{name}}_{{svg_counter}}" class="{netname}_{{name}}" x="{{x}}" y="{{y}}" height="{{height}}" width="{{width}}" preserveAspectRatio="none" href="{{image}}"><title>{{tooltip}}</title></image>""".format(
**{
"netname": class_id if class_id is not None else self.name,
"border_color": config["border_color"],
"border_width": config["border_width"],
})
line_svg = """<line x1="{{x1}}" y1="{{y1}}" x2="{{x2}}" y2="{{y2}}" stroke="{{arrow_color}}" stroke-width="{arrow_width}"><title>{{tooltip}}</title></line>""".format(**config)
arrow_svg = """<line x1="{{x1}}" y1="{{y1}}" x2="{{x2}}" y2="{{y2}}" stroke="{{arrow_color}}" stroke-width="{arrow_width}" marker-end="url(#arrow)"><title>{{tooltip}}</title></line>""".format(**config)
arrow_rect = """<rect x="{rx}" y="{ry}" width="{rw}" height="{rh}" style="fill:white;stroke:none"><title>{tooltip}</title></rect>"""
label_svg = """<text x="{x}" y="{y}" font-family="{font_family}" font-size="{font_size}" text-anchor="{text_anchor}" fill="{font_color}" alignment-baseline="central" {transform}>{label}</text>"""
svg_head = """<svg id='{netname}' xmlns='http://www.w3.org/2000/svg' image-rendering="pixelated" width="{top_width}px" height="{top_height}px">
<g {svg_transform}>
<svg viewBox="0 0 {viewbox_width} {viewbox_height}" width="{width}px" height="{height}px">
<defs>
<marker id="arrow" markerWidth="10" markerHeight="10" refX="9" refY="3" orient="auto" markerUnits="strokeWidth">
<path d="M0,0 L0,6 L9,3 z" fill="{arrow_color}" />
</marker>
</defs>"""
templates = {
"image_svg": image_svg,
"line_svg": line_svg,
"arrow_svg": arrow_svg,
"arrow_rect": arrow_rect,
"label_svg": label_svg,
"svg_head": svg_head,
}
## get the header:
svg = None
for (template_name, dict) in struct:
if template_name == "svg_head":
svg = svg_head.format(**dict)
## build the rest:
for (template_name, dict) in struct:
if template_name != "svg_head" and not template_name.startswith("_"):
if template_name == "label_svg" and config["svg_rotate"]:
dict["x"] += 8
dict["text_anchor"] = "middle"
dict["transform"] = """ transform="rotate(-90 %s %s) translate(%s)" """ % (dict["x"], dict["y"], 2)
else:
dict["transform"] = ""
t = templates[template_name]
svg += t.format(**dict)
svg += """</svg></g></svg>"""
if (not self._initialized_javascript and get_ipython()):
self._initialize_javascript()
return svg
def _get_level_ordering(self):
"""
Returns a list of lists of tuples from
input to output of levels.
Each tuple contains: (layer_name, anchor?, from_name/None)
If anchor is True, this is just an anchor point.
"""
## First, get a level for all layers:
levels = {}
for layer in topological_sort(self, self.layers):
if not hasattr(layer, "model"):
continue
level = max([levels[lay.name] for lay in layer.incoming_connections] + [-1])
levels[layer.name] = level + 1
max_level = max(levels.values())
ordering = []
for i in range(max_level + 1): # input to output
layer_names = [layer.name for layer in self.layers if levels[layer.name] == i]
ordering.append([(name, False, None) for name in layer_names]) # (going_to/layer_name, anchor, coming_from)
## promote all output banks to last row:
for level in range(len(ordering)): # input to output
tuples = ordering[level]
for (name, anchor, none) in tuples[:]: # go through copy
if self[name].kind() == "output":
## move it to last row
## find it and remove
index = tuples.index((name, anchor, None))
ordering[-1].append(tuples.pop(index))
## insert anchor points for any in next level
## that doesn't go to a bank in this level
for level in range(len(ordering)): # input to output
tuples = ordering[level]
for (name, anchor, fname) in tuples:
if anchor:
## is this in next? if not add it
next_level = [(n, hfname) for (n, anchor, hfname) in ordering[level + 1]]
if (name, None) not in next_level and (name, fname) not in next_level:
ordering[level + 1].append((name, True, fname)) # add anchor point
else:
pass ## finally!
else:
## if next level doesn't contain an outgoing
## connection, add it to next level as anchor point
for layer in self[name].outgoing_connections:
next_level = [(n,fname) for (n, anchor, fname) in ordering[level + 1]]
if (layer.name, None) not in next_level:
ordering[level + 1].append((layer.name, True, name)) # add anchor point
## replace level with sorted level:
def input_index(name):
return min([self.input_bank_order.index(iname) for iname in self[name].input_names])
lev = sorted([(input_index(fname if anchor else name), name, anchor, fname) for (name, anchor, fname) in ordering[level]])
ordering[level] = [(name, anchor, fname) for (index, name, anchor, fname) in lev]
return ordering
[docs] def describe_connection_to(self, layer1, layer2):
"""
Returns a textual description of the weights for the SVG tooltip.
"""
retval = "Weights from %s to %s" % (layer1.name, layer2.name)
if self.model is None:
return retval
for klayer in self.model.layers:
if klayer.name == layer2.name:
weights = klayer.get_weights()
for w in range(len(klayer.weights)):
retval += "\n %s has shape %s" % (
klayer.weights[w].name, weights[w].shape)
return retval
[docs] def saved(self, dir=None):
"""
Return True if network has been saved.
"""
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
return (os.path.isdir(dir) and
os.path.isfile("%s/network.pickle" % dir) and
os.path.isfile("%s/model.h5" % dir) and
os.path.isfile("%s/weights.h5" % dir))
[docs] def delete(self, dir=None):
"""
Delete network save folder.
"""
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
import shutil
if os.path.isdir(dir):
shutil.rmtree(dir)
else:
print("Nothing to delete.")
[docs] def load(self, dir=None):
"""
Load the model and the weights/history into an existing conx network.
"""
import pickle
if self is None:
raise Exception("Network.load() requires a directory name")
elif isinstance(self, str):
dir = self
with open("%s/network.pickle" % (("%s.conx" % self.name.replace(" ", "_"))
if dir is None else dir), "rb") as fp:
network = pickle.load(fp)
network.load(dir)
return network
else:
self.load_model(dir)
self.load_weights(dir)
self.load_config(dir)
[docs] def save(self, dir=None):
"""
Save the model and the weights/history (if compiled) to a dir.
"""
if self.model:
self.save_model(dir)
self.save_weights(dir)
self.save_config(dir)
with open("%s/network.pickle" % (("%s.conx" % self.name.replace(" ", "_"))
if dir is None else dir), "wb") as fp:
pickle.dump(self, fp)
else:
raise Exception("need to compile network before saving")
[docs] def load_model(self, dir=None, filename=None):
"""
Load a model from a dir/filename.
"""
from keras.models import load_model
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
if filename is None:
filename = "model.h5"
self.model = load_model(os.path.join(dir, filename))
if self.compile_options:
self.reset()
[docs] def save_model(self, dir=None, filename=None):
"""
Save a model (if compiled) to a dir/filename.
"""
if self.model:
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
if filename is None:
filename = "model.h5"
if not os.path.isdir(dir):
os.makedirs(dir)
self.model.save(os.path.join(dir, filename))
else:
raise Exception("need to compile network before saving")
[docs] def load_history(self, dir=None, filename=None):
"""
Load the history from a dir/file.
network.load_history()
"""
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
if filename is None:
filename = "history.pickle"
full_filename = os.path.join(dir, filename)
if os.path.isfile(full_filename):
with open(os.path.join(dir, filename), "rb") as fp:
self.history = pickle.load(fp)
self.weight_history = pickle.load(fp)
self.epoch_count = (len(self.history) - 1) if self.history else 0
else:
print("WARNING: no such history file '%s'" % full_filename, file=sys.stderr)
[docs] def save_history(self, dir=None, filename=None):
"""
Save the history to a file.
network.save_history()
"""
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
if filename is None:
filename = "history.pickle"
if not os.path.isdir(dir):
os.makedirs(dir)
with open(os.path.join(dir, filename), "wb") as fp:
pickle.dump(self.history, fp)
pickle.dump(self.weight_history, fp)
[docs] def load_weights(self, dir=None, filename=None):
"""
Load the network weights and history from dir/files.
network.load_weights()
"""
if self.model:
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
if filename is None:
filename = "weights.h5"
self.model.load_weights(os.path.join(dir, filename))
self.load_history(dir)
else:
raise Exception("need to compile network before loading weights")
[docs] def save_weights(self, dir=None, filename=None):
"""
Save the network weights and history to dir/files.
network.save_weights()
"""
if self.model:
if dir is None:
dir = "%s.conx" % self.name.replace(" ", "_")
if filename is None:
filename = "weights.h5"
if not os.path.isdir(dir):
os.makedirs(dir)
self.model.save_weights(os.path.join(dir, filename))
self.save_history(dir)
else:
raise Exception("need to compile network before saving weights")
[docs] def dashboard(self, width="95%", height="550px", play_rate=0.5):
"""
Build the dashboard for Jupyter widgets. Requires running
in a notebook/jupyterlab.
"""
from .widgets import Dashboard
return Dashboard(self, width, height, play_rate)
[docs] def pp(self, *args, **opts):
"""
Pretty-print a vector.
"""
if isinstance(args[0], str):
label = args[0]
vector = args[1]
else:
label = ""
vector = args[0]
print(label + self.pf(vector[:20], **opts))
[docs] def pf_matrix(self, matrix, force=False, **opts):
"""
Pretty-fromat a matrix. If a list, then that implies multi-bank.
"""
if isinstance(matrix, list): ## multiple output banks
rows = []
for r in range(len(matrix[0])):
row = []
for c in range(len(matrix)):
row.append(self.pf(matrix[c][r], **opts))
if c > 99 and not force:
row.append("...")
rows.append("[" + (",".join(row)) + "]")
if r > 99 and not force:
rows.append("...")
break
return rows
else:
rows = []
for r in range(len(matrix)):
rows.append(self.pf(matrix[r], **opts))
if r > 99 and not force:
rows.append("...")
break
return rows
[docs] def pf(self, vector, **opts):
"""
Pretty-format a vector. Returns string.
Arguments:
vector (list): The first parameter.
precision (int): Number of decimal places to show for each
value in vector.
Returns:
str: Returns the vector formatted as a short string.
Examples:
These examples demonstrate the net.pf formatting function:
>>> import conx
>>> net = Network("Test")
>>> net.pf([1.01])
'[1.01]'
>>> net.pf(range(10), precision=2)
'[0,1,2,3,4,5,6,7,8,9]'
>>> net.pf([0]*10000) # doctest: +ELLIPSIS
'[0,0,0,...]'
"""
if isinstance(vector, collections.Iterable):
vector = list(vector)
if isinstance(vector, (list, tuple)):
vector = np.array(vector)
config = copy.copy(self.config)
config.update(opts)
precision = "{0:.%df}" % config["precision"]
return np.array2string(
vector,
formatter={'float_kind': precision.format},
separator=",",
max_line_width=79).replace("\n", "")
[docs] def set_weights(self, weights, layer_name=None):
"""
Set the model's weights, or a particular layer's weights.
>>> net = Network("Weight Set Test", 2, 2, 1, activation="sigmoid")
>>> net.compile(error="mse", optimizer="sgd")
>>> net.set_weights(net.get_weights())
>>> hw = net.get_weights("hidden")
>>> net.set_weights(hw, "hidden")
"""
if self.model is None:
raise Exception("need to compile network")
if layer_name is None:
for i in range(len(self.model.layers)):
self.model.layers[i].set_weights(weights[i])
else:
for i in range(len(self.model.layers)):
if self.model.layers[i].name == layer_name:
w = [np.array(x) for x in
self.model.layers[i].get_weights()]
self.model.layers[i].set_weights(w)
[docs] def to_array(self) -> list:
"""
Get the weights of a network as a flat, one-dimensional list.
Example:
>>> from conx import Network
>>> net = Network("Deep", 3, 4, 5, 2, 3, 4, 5)
>>> net.compile(optimizer="adam", error="mse")
>>> array = net.to_array()
>>> len(array)
103
Returns:
All of weights and biases of the network in a single, flat list.
"""
if self.model is None:
raise Exception("need to compile network")
array = []
for layer in self.model.layers:
for weight in layer.get_weights():
array.extend(weight.flatten())
return array
[docs] def from_array(self, array: list):
"""
Load the weights from a list.
Arguments:
array: a sequence (e.g., list, np.array) of numbers
Example:
>>> from conx import Network
>>> net = Network("Deep", 3, 4, 5, 2, 3, 4, 5)
>>> net.compile(optimizer="adam", error="mse")
>>> net.from_array([0] * 103)
>>> array = net.to_array()
>>> len(array)
103
"""
if self.model is None:
raise Exception("need to compile network")
position = 0
for layer in self.model.layers:
weights = layer.get_weights()
new_weights = []
for i in range(len(weights)):
w = weights[i]
size = reduce(operator.mul, w.shape)
new_w = np.array(array[position:position + size]).reshape(w.shape)
new_weights.append(new_w)
position += size
layer.set_weights(new_weights)
### Config methods:
[docs] def load_config(self, datadir=None, config_file=None):
"""
"""
if datadir is None:
datadir = "%s.conx" % self.name.replace(" ", "_")
if config_file is None:
config_file = "config.json"
datadir = os.path.expanduser(datadir)
if not os.path.exists(datadir):
## second try, here
datadir = os.path.join('/tmp', datadir)
full_config_file = os.path.join(datadir, config_file)
if os.path.isfile(full_config_file):
with open(full_config_file) as fp:
config_data = json.load(fp)
self.update_config(config_data)
## give up, fail silently
[docs] def save_config(self, datadir=None, config_file=None):
"""
"""
if datadir is None:
datadir = "%s.conx" % self.name.replace(" ", "_")
if config_file is None:
config_file = "config.json"
if not os.path.exists(datadir):
try:
os.makedirs(datadir)
except:
datadir = os.path.join('/tmp', datadir)
os.makedirs(datadir)
full_config_file = os.path.join(datadir, config_file)
self.rebuild_config()
with open(full_config_file, "w") as fp:
json.dump(self.config, fp, indent=" ")
[docs] def update_config(self, config):
"""
"""
self.config.update(config)
for layer in self.layers:
self.update_layer_from_config(layer)
[docs] def rebuild_config(self):
"""
"""
self.config["config_layers"].clear()
for layer in self.layers:
d = {}
self.config["config_layers"][layer.name] = d
for item in ["visible",
"minmax",
"vshape",
"image_maxdim",
"image_pixels_per_unit",
"colormap",
"feature",
"max_draw_units"]:
d[item] = getattr(layer, item)
[docs] def update_layer_from_config(self, layer):
"""
"""
if layer.name in self.config["config_layers"]:
for item in self.config["config_layers"][layer.name]:
setattr(layer, item, self.config["config_layers"][layer.name][item])
class _InterruptHandler():
"""
Class for handling interrupts so that state is not left
in inconsistant situation.
"""
def __init__(self, network, sig=signal.SIGINT):
self.network = network
self.sig = sig
self.interrupted = None
self.released = None
self.original_handler = None
def __enter__(self):
self.interrupted = False
self.released = False
self.original_handler = signal.getsignal(self.sig)
def handler(signum, frame):
self._release()
if self.interrupted:
raise KeyboardInterrupt
print("\nStopping at end of epoch... (^C again to quit now)...")
self.interrupted = True
self.network.model.stop_training = True
signal.signal(self.sig, handler)
return self
def __exit__(self, type, value, tb):
self._release()
def _release(self):
if self.released:
return False
signal.signal(self.sig, self.original_handler)
self.released = True
return True