# -*- coding: utf-8 -*-
"""Convolutional-recurrent layers.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from .. import backend as K
from .. import activations
from .. import initializers
from .. import regularizers
from .. import constraints
from .recurrent import _generate_dropout_mask
from .recurrent import _standardize_args
import numpy as np
import warnings
from ..engine.base_layer import InputSpec, Layer
from ..utils import conv_utils
from ..legacy import interfaces
from ..legacy.layers import Recurrent, ConvRecurrent2D
from .recurrent import RNN
from ..utils.generic_utils import has_arg
from ..utils.generic_utils import to_list
from ..utils.generic_utils import transpose_shape
class ConvRNN2D(RNN):
"""Base class for convolutional-recurrent layers.
# Arguments
cell: A RNN cell instance. A RNN cell is a class that has:
- a `call(input_at_t, states_at_t)` method, returning
`(output_at_t, states_at_t_plus_1)`. The call method of the
cell can also take the optional argument `constants`, see
section "Note on passing external constants" below.
- a `state_size` attribute. This can be a single integer (single state)
in which case it is the number of channels of the recurrent state
(which should be the same as the number of channels of the cell
output). This can also be a list/tuple of integers
(one size per state). In this case, the first entry (`state_size[0]`)
should be the same as the size of the cell output.
return_sequences: Boolean. Whether to return the last output.
in the output sequence, or the full sequence.
return_state: Boolean. Whether to return the last state
in addition to the output.
go_backwards: Boolean (default False).
If True, process the input sequence backwards and return the
reversed sequence.
stateful: Boolean (default False). If True, the last state
for each sample at index i in a batch will be used as initial
state for the sample of index i in the following batch.
input_shape: Use this argument to specify the shape of the
input when this layer is the first one in a model.
# Input shape
5D tensor with shape:
`(samples, timesteps, channels, rows, cols)` if data_format='channels_first'
or 5D tensor with shape:
`(samples, timesteps, rows, cols, channels)` if data_format='channels_last'.
# Output shape
- if `return_state`: a list of tensors. The first tensor is
the output. The remaining tensors are the last states,
each 5D tensor with shape:
`(samples, timesteps,
filters, new_rows, new_cols)` if data_format='channels_first'
or 5D tensor with shape:
`(samples, timesteps,
new_rows, new_cols, filters)` if data_format='channels_last'.
`rows` and `cols` values might have changed due to padding.
- if `return_sequences`: 5D tensor with shape:
`(samples, timesteps,
filters, new_rows, new_cols)` if data_format='channels_first'
or 5D tensor with shape:
`(samples, timesteps,
new_rows, new_cols, filters)` if data_format='channels_last'.
- else, 4D tensor with shape:
`(samples, filters, new_rows, new_cols)` if data_format='channels_first'
or 4D tensor with shape:
`(samples, new_rows, new_cols, filters)` if data_format='channels_last'.
# Masking
This layer supports masking for input data with a variable number
of timesteps. To introduce masks to your data,
use an [Embedding](embeddings.md) layer with the `mask_zero` parameter
set to `True`.
# Note on using statefulness in RNNs
You can set RNN layers to be 'stateful', which means that the states
computed for the samples in one batch will be reused as initial states
for the samples in the next batch. This assumes a one-to-one mapping
between samples in different successive batches.
To enable statefulness:
- specify `stateful=True` in the layer constructor.
- specify a fixed batch size for your model, by passing
- if sequential model:
`batch_input_shape=(...)` to the first layer in your model.
- if functional model with 1 or more Input layers:
`batch_shape=(...)` to all the first layers in your model.
This is the expected shape of your inputs
*including the batch size*.
It should be a tuple of integers, e.g. `(32, 10, 100, 100, 32)`.
Note that the number of rows and columns should be specified too.
- specify `shuffle=False` when calling fit().
To reset the states of your model, call `.reset_states()` on either
a specific layer, or on your entire model.
# Note on specifying the initial state of RNNs
You can specify the initial state of RNN layers symbolically by
calling them with the keyword argument `initial_state`. The value of
`initial_state` should be a tensor or list of tensors representing
the initial state of the RNN layer.
You can specify the initial state of RNN layers numerically by
calling `reset_states` with the keyword argument `states`. The value of
`states` should be a numpy array or list of numpy arrays representing
the initial state of the RNN layer.
# Note on passing external constants to RNNs
You can pass "external" constants to the cell using the `constants`
keyword argument of `RNN.__call__` (as well as `RNN.call`) method. This
requires that the `cell.call` method accepts the same keyword argument
`constants`. Such constants can be used to condition the cell
transformation on additional static inputs (not changing over time),
a.k.a. an attention mechanism.
"""
def __init__(self, cell,
return_sequences=False,
return_state=False,
go_backwards=False,
stateful=False,
unroll=False,
**kwargs):
if unroll:
raise TypeError('Unrolling isn\'t possible with '
'convolutional RNNs.')
if isinstance(cell, (list, tuple)):
# The StackedConvRNN2DCells isn't implemented yet.
raise TypeError('It is not possible at the moment to'
'stack convolutional cells.')
super(ConvRNN2D, self).__init__(cell,
return_sequences,
return_state,
go_backwards,
stateful,
unroll,
**kwargs)
self.input_spec = [InputSpec(ndim=5)]
def compute_output_shape(self, input_shape):
if isinstance(input_shape, list):
input_shape = input_shape[0]
cell = self.cell
if cell.data_format == 'channels_first':
rows = input_shape[3]
cols = input_shape[4]
elif cell.data_format == 'channels_last':
rows = input_shape[2]
cols = input_shape[3]
rows = conv_utils.conv_output_length(rows,
cell.kernel_size[0],
padding=cell.padding,
stride=cell.strides[0],
dilation=cell.dilation_rate[0])
cols = conv_utils.conv_output_length(cols,
cell.kernel_size[1],
padding=cell.padding,
stride=cell.strides[1],
dilation=cell.dilation_rate[1])
output_shape = input_shape[:2] + (rows, cols, cell.filters)
output_shape = transpose_shape(output_shape, cell.data_format,
spatial_axes=(2, 3))
if not self.return_sequences:
output_shape = output_shape[:1] + output_shape[2:]
if self.return_state:
output_shape = [output_shape]
base = (input_shape[0], rows, cols, cell.filters)
base = transpose_shape(base, cell.data_format, spatial_axes=(1, 2))
output_shape += [base[:] for _ in range(2)]
return output_shape
def build(self, input_shape):
# Note input_shape will be list of shapes of initial states and
# constants if these are passed in __call__.
if self._num_constants is not None:
constants_shape = input_shape[-self._num_constants:]
else:
constants_shape = None
if isinstance(input_shape, list):
input_shape = input_shape[0]
batch_size = input_shape[0] if self.stateful else None
self.input_spec[0] = InputSpec(shape=(batch_size, None) + input_shape[2:5])
# allow cell (if layer) to build before we set or validate state_spec
if isinstance(self.cell, Layer):
step_input_shape = (input_shape[0],) + input_shape[2:]
if constants_shape is not None:
self.cell.build([step_input_shape] + constants_shape)
else:
self.cell.build(step_input_shape)
# set or validate state_spec
if hasattr(self.cell.state_size, '__len__'):
state_size = list(self.cell.state_size)
else:
state_size = [self.cell.state_size]
if self.state_spec is not None:
# initial_state was passed in call, check compatibility
if self.cell.data_format == 'channels_first':
ch_dim = 1
elif self.cell.data_format == 'channels_last':
ch_dim = 3
if not [spec.shape[ch_dim] for spec in self.state_spec] == state_size:
raise ValueError(
'An initial_state was passed that is not compatible with '
'`cell.state_size`. Received `state_spec`={}; '
'However `cell.state_size` is '
'{}'.format([spec.shape for spec in self.state_spec],
self.cell.state_size))
else:
if self.cell.data_format == 'channels_first':
self.state_spec = [InputSpec(shape=(None, dim, None, None))
for dim in state_size]
elif self.cell.data_format == 'channels_last':
self.state_spec = [InputSpec(shape=(None, None, None, dim))
for dim in state_size]
if self.stateful:
self.reset_states()
self.built = True
def get_initial_state(self, inputs):
# (samples, timesteps, rows, cols, filters)
initial_state = K.zeros_like(inputs)
# (samples, rows, cols, filters)
initial_state = K.sum(initial_state, axis=1)
shape = list(self.cell.kernel_shape)
shape[-1] = self.cell.filters
initial_state = self.cell.input_conv(initial_state,
K.zeros(tuple(shape)),
padding=self.cell.padding)
# Fix for Theano because it needs
# K.int_shape to work in call() with initial_state.
keras_shape = list(K.int_shape(inputs))
keras_shape.pop(1)
if K.image_data_format() == 'channels_first':
indices = 2, 3
else:
indices = 1, 2
for i, j in enumerate(indices):
keras_shape[j] = conv_utils.conv_output_length(
keras_shape[j],
shape[i],
padding=self.cell.padding,
stride=self.cell.strides[i],
dilation=self.cell.dilation_rate[i])
initial_state._keras_shape = keras_shape
if hasattr(self.cell.state_size, '__len__'):
return [initial_state for _ in self.cell.state_size]
else:
return [initial_state]
def __call__(self, inputs, initial_state=None, constants=None, **kwargs):
inputs, initial_state, constants = _standardize_args(
inputs, initial_state, constants, self._num_constants)
if initial_state is None and constants is None:
return super(ConvRNN2D, self).__call__(inputs, **kwargs)
# If any of `initial_state` or `constants` are specified and are Keras
# tensors, then add them to the inputs and temporarily modify the
# input_spec to include them.
additional_inputs = []
additional_specs = []
if initial_state is not None:
kwargs['initial_state'] = initial_state
additional_inputs += initial_state
self.state_spec = []
for state in initial_state:
try:
shape = K.int_shape(state)
# Fix for Theano
except TypeError:
shape = tuple(None for _ in range(K.ndim(state)))
self.state_spec.append(InputSpec(shape=shape))
additional_specs += self.state_spec
if constants is not None:
kwargs['constants'] = constants
additional_inputs += constants
self.constants_spec = [InputSpec(shape=K.int_shape(constant))
for constant in constants]
self._num_constants = len(constants)
additional_specs += self.constants_spec
# at this point additional_inputs cannot be empty
for tensor in additional_inputs:
if K.is_keras_tensor(tensor) != K.is_keras_tensor(additional_inputs[0]):
raise ValueError('The initial state or constants of an RNN'
' layer cannot be specified with a mix of'
' Keras tensors and non-Keras tensors')
if K.is_keras_tensor(additional_inputs[0]):
# Compute the full input spec, including state and constants
full_input = [inputs] + additional_inputs
full_input_spec = self.input_spec + additional_specs
# Perform the call with temporarily replaced input_spec
original_input_spec = self.input_spec
self.input_spec = full_input_spec
output = super(ConvRNN2D, self).__call__(full_input, **kwargs)
self.input_spec = original_input_spec
return output
else:
return super(ConvRNN2D, self).__call__(inputs, **kwargs)
def call(self,
inputs,
mask=None,
training=None,
initial_state=None,
constants=None):
# note that the .build() method of subclasses MUST define
# self.input_spec and self.state_spec with complete input shapes.
if isinstance(inputs, list):
inputs = inputs[0]
if initial_state is not None:
pass
elif self.stateful:
initial_state = self.states
else:
initial_state = self.get_initial_state(inputs)
if isinstance(mask, list):
mask = mask[0]
if len(initial_state) != len(self.states):
raise ValueError('Layer has ' + str(len(self.states)) +
' states but was passed ' +
str(len(initial_state)) +
' initial states.')
timesteps = K.int_shape(inputs)[1]
kwargs = {}
if has_arg(self.cell.call, 'training'):
kwargs['training'] = training
if constants:
if not has_arg(self.cell.call, 'constants'):
raise ValueError('RNN cell does not support constants')
def step(inputs, states):
constants = states[-self._num_constants:]
states = states[:-self._num_constants]
return self.cell.call(inputs, states, constants=constants,
**kwargs)
else:
def step(inputs, states):
return self.cell.call(inputs, states, **kwargs)
last_output, outputs, states = K.rnn(step,
inputs,
initial_state,
constants=constants,
go_backwards=self.go_backwards,
mask=mask,
input_length=timesteps)
if self.stateful:
updates = []
for i in range(len(states)):
updates.append((self.states[i], states[i]))
self.add_update(updates, inputs)
if self.return_sequences:
output = outputs
else:
output = last_output
# Properly set learning phase
if getattr(last_output, '_uses_learning_phase', False):
output._uses_learning_phase = True
if self.return_state:
states = to_list(states, allow_tuple=True)
return [output] + states
else:
return output
def reset_states(self, states=None):
if not self.stateful:
raise AttributeError('Layer must be stateful.')
input_shape = self.input_spec[0].shape
state_shape = self.compute_output_shape(input_shape)
if self.return_state:
state_shape = state_shape[0]
if self.return_sequences:
state_shape = state_shape[:1] + state_shape[2:]
if None in state_shape:
raise ValueError('If a RNN is stateful, it needs to know '
'its batch size. Specify the batch size '
'of your input tensors: \n'
'- If using a Sequential model, '
'specify the batch size by passing '
'a `batch_input_shape` '
'argument to your first layer.\n'
'- If using the functional API, specify '
'the time dimension by passing a '
'`batch_shape` argument to your Input layer.\n'
'The same thing goes for the number of rows '
'and columns.')
# helper function
def get_tuple_shape(nb_channels):
result = list(state_shape)
if self.cell.data_format == 'channels_first':
result[1] = nb_channels
elif self.cell.data_format == 'channels_last':
result[3] = nb_channels
else:
raise KeyError
return tuple(result)
# initialize state if None
if self.states[0] is None:
if hasattr(self.cell.state_size, '__len__'):
self.states = [K.zeros(get_tuple_shape(dim))
for dim in self.cell.state_size]
else:
self.states = [K.zeros(get_tuple_shape(self.cell.state_size))]
elif states is None:
if hasattr(self.cell.state_size, '__len__'):
for state, dim in zip(self.states, self.cell.state_size):
K.set_value(state, np.zeros(get_tuple_shape(dim)))
else:
K.set_value(self.states[0],
np.zeros(get_tuple_shape(self.cell.state_size)))
else:
states = to_list(states, allow_tuple=True)
if len(states) != len(self.states):
raise ValueError('Layer ' + self.name + ' expects ' +
str(len(self.states)) + ' states, '
'but it received ' + str(len(states)) +
' state values. Input received: ' +
str(states))
for index, (value, state) in enumerate(zip(states, self.states)):
if hasattr(self.cell.state_size, '__len__'):
dim = self.cell.state_size[index]
else:
dim = self.cell.state_size
if value.shape != get_tuple_shape(dim):
raise ValueError('State ' + str(index) +
' is incompatible with layer ' +
self.name + ': expected shape=' +
str(get_tuple_shape(dim)) +
', found shape=' + str(value.shape))
# TODO: consider batch calls to `set_value`.
K.set_value(state, value)
class ConvLSTM2DCell(Layer):
"""Cell class for the ConvLSTM2D layer.
# Arguments
filters: Integer, the dimensionality of the output space
(i.e. the number of output filters in the convolution).
kernel_size: An integer or tuple/list of n integers, specifying the
dimensions of the convolution window.
strides: An integer or tuple/list of n integers,
specifying the strides of the convolution.
Specifying any stride value != 1 is incompatible with specifying
any `dilation_rate` value != 1.
padding: One of `"valid"` or `"same"` (case-insensitive).
data_format: A string,
one of `"channels_last"` (default) or `"channels_first"`.
It defaults to the `image_data_format` value found in your
Keras config file at `~/.keras/keras.json`.
If you never set it, then it will be `"channels_last"`.
dilation_rate: An integer or tuple/list of n integers, specifying
the dilation rate to use for dilated convolution.
Currently, specifying any `dilation_rate` value != 1 is
incompatible with specifying any `strides` value != 1.
activation: Activation function to use
(see [activations](../activations.md)).
If you don't specify anything, no activation is applied
(ie. "linear" activation: `a(x) = x`).
recurrent_activation: Activation function to use
for the recurrent step
(see [activations](../activations.md)).
use_bias: Boolean, whether the layer uses a bias vector.
kernel_initializer: Initializer for the `kernel` weights matrix,
used for the linear transformation of the inputs.
(see [initializers](../initializers.md)).
recurrent_initializer: Initializer for the `recurrent_kernel`
weights matrix,
used for the linear transformation of the recurrent state.
(see [initializers](../initializers.md)).
bias_initializer: Initializer for the bias vector
(see [initializers](../initializers.md)).
unit_forget_bias: Boolean.
If True, add 1 to the bias of the forget gate at initialization.
Use in combination with `bias_initializer="zeros"`.
This is recommended in [Jozefowicz et al.]
(http://www.jmlr.org/proceedings/papers/v37/jozefowicz15.pdf).
kernel_regularizer: Regularizer function applied to
the `kernel` weights matrix
(see [regularizer](../regularizers.md)).
recurrent_regularizer: Regularizer function applied to
the `recurrent_kernel` weights matrix
(see [regularizer](../regularizers.md)).
bias_regularizer: Regularizer function applied to the bias vector
(see [regularizer](../regularizers.md)).
kernel_constraint: Constraint function applied to
the `kernel` weights matrix
(see [constraints](../constraints.md)).
recurrent_constraint: Constraint function applied to
the `recurrent_kernel` weights matrix
(see [constraints](../constraints.md)).
bias_constraint: Constraint function applied to the bias vector
(see [constraints](../constraints.md)).
dropout: Float between 0 and 1.
Fraction of the units to drop for
the linear transformation of the inputs.
recurrent_dropout: Float between 0 and 1.
Fraction of the units to drop for
the linear transformation of the recurrent state.
"""
def __init__(self, filters,
kernel_size,
strides=(1, 1),
padding='valid',
data_format=None,
dilation_rate=(1, 1),
activation='tanh',
recurrent_activation='hard_sigmoid',
use_bias=True,
kernel_initializer='glorot_uniform',
recurrent_initializer='orthogonal',
bias_initializer='zeros',
unit_forget_bias=True,
kernel_regularizer=None,
recurrent_regularizer=None,
bias_regularizer=None,
kernel_constraint=None,
recurrent_constraint=None,
bias_constraint=None,
dropout=0.,
recurrent_dropout=0.,
**kwargs):
super(ConvLSTM2DCell, self).__init__(**kwargs)
self.filters = filters
self.kernel_size = conv_utils.normalize_tuple(kernel_size, 2, 'kernel_size')
self.strides = conv_utils.normalize_tuple(strides, 2, 'strides')
self.padding = conv_utils.normalize_padding(padding)
self.data_format = K.normalize_data_format(data_format)
self.dilation_rate = conv_utils.normalize_tuple(dilation_rate, 2,
'dilation_rate')
self.activation = activations.get(activation)
self.recurrent_activation = activations.get(recurrent_activation)
self.use_bias = use_bias
self.kernel_initializer = initializers.get(kernel_initializer)
self.recurrent_initializer = initializers.get(recurrent_initializer)
self.bias_initializer = initializers.get(bias_initializer)
self.unit_forget_bias = unit_forget_bias
self.kernel_regularizer = regularizers.get(kernel_regularizer)
self.recurrent_regularizer = regularizers.get(recurrent_regularizer)
self.bias_regularizer = regularizers.get(bias_regularizer)
self.kernel_constraint = constraints.get(kernel_constraint)
self.recurrent_constraint = constraints.get(recurrent_constraint)
self.bias_constraint = constraints.get(bias_constraint)
if K.backend() == 'theano' and (dropout or recurrent_dropout):
warnings.warn(
'RNN dropout is no longer supported with the Theano backend '
'due to technical limitations. '
'You can either set `dropout` and `recurrent_dropout` to 0, '
'or use the TensorFlow backend.')
dropout = 0.
recurrent_dropout = 0.
self.dropout = min(1., max(0., dropout))
self.recurrent_dropout = min(1., max(0., recurrent_dropout))
self.state_size = (self.filters, self.filters)
self._dropout_mask = None
self._recurrent_dropout_mask = None
def build(self, input_shape):
if self.data_format == 'channels_first':
channel_axis = 1
else:
channel_axis = -1
if input_shape[channel_axis] is None:
raise ValueError('The channel dimension of the inputs '
'should be defined. Found `None`.')
input_dim = input_shape[channel_axis]
kernel_shape = self.kernel_size + (input_dim, self.filters * 4)
self.kernel_shape = kernel_shape
recurrent_kernel_shape = self.kernel_size + (self.filters, self.filters * 4)
self.kernel = self.add_weight(shape=kernel_shape,
initializer=self.kernel_initializer,
name='kernel',
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint)
self.recurrent_kernel = self.add_weight(
shape=recurrent_kernel_shape,
initializer=self.recurrent_initializer,
name='recurrent_kernel',
regularizer=self.recurrent_regularizer,
constraint=self.recurrent_constraint)
if self.use_bias:
if self.unit_forget_bias:
def bias_initializer(_, *args, **kwargs):
return K.concatenate([
self.bias_initializer((self.filters,), *args, **kwargs),
initializers.Ones()((self.filters,), *args, **kwargs),
self.bias_initializer((self.filters * 2,), *args, **kwargs),
])
else:
bias_initializer = self.bias_initializer
self.bias = self.add_weight(shape=(self.filters * 4,),
name='bias',
initializer=bias_initializer,
regularizer=self.bias_regularizer,
constraint=self.bias_constraint)
else:
self.bias = None
self.kernel_i = self.kernel[:, :, :, :self.filters]
self.recurrent_kernel_i = self.recurrent_kernel[:, :, :, :self.filters]
self.kernel_f = self.kernel[:, :, :, self.filters: self.filters * 2]
self.recurrent_kernel_f = (
self.recurrent_kernel[:, :, :, self.filters: self.filters * 2])
self.kernel_c = self.kernel[:, :, :, self.filters * 2: self.filters * 3]
self.recurrent_kernel_c = (
self.recurrent_kernel[:, :, :, self.filters * 2: self.filters * 3])
self.kernel_o = self.kernel[:, :, :, self.filters * 3:]
self.recurrent_kernel_o = self.recurrent_kernel[:, :, :, self.filters * 3:]
if self.use_bias:
self.bias_i = self.bias[:self.filters]
self.bias_f = self.bias[self.filters: self.filters * 2]
self.bias_c = self.bias[self.filters * 2: self.filters * 3]
self.bias_o = self.bias[self.filters * 3:]
else:
self.bias_i = None
self.bias_f = None
self.bias_c = None
self.bias_o = None
self.built = True
def call(self, inputs, states, training=None):
if 0 < self.dropout < 1 and self._dropout_mask is None:
self._dropout_mask = _generate_dropout_mask(
K.ones_like(inputs),
self.dropout,
training=training,
count=4)
if (0 < self.recurrent_dropout < 1 and
self._recurrent_dropout_mask is None):
self._recurrent_dropout_mask = _generate_dropout_mask(
K.ones_like(states[1]),
self.recurrent_dropout,
training=training,
count=4)
# dropout matrices for input units
dp_mask = self._dropout_mask
# dropout matrices for recurrent units
rec_dp_mask = self._recurrent_dropout_mask
h_tm1 = states[0] # previous memory state
c_tm1 = states[1] # previous carry state
if 0 < self.dropout < 1.:
inputs_i = inputs * dp_mask[0]
inputs_f = inputs * dp_mask[1]
inputs_c = inputs * dp_mask[2]
inputs_o = inputs * dp_mask[3]
else:
inputs_i = inputs
inputs_f = inputs
inputs_c = inputs
inputs_o = inputs
if 0 < self.recurrent_dropout < 1.:
h_tm1_i = h_tm1 * rec_dp_mask[0]
h_tm1_f = h_tm1 * rec_dp_mask[1]
h_tm1_c = h_tm1 * rec_dp_mask[2]
h_tm1_o = h_tm1 * rec_dp_mask[3]
else:
h_tm1_i = h_tm1
h_tm1_f = h_tm1
h_tm1_c = h_tm1
h_tm1_o = h_tm1
x_i = self.input_conv(inputs_i, self.kernel_i, self.bias_i,
padding=self.padding)
x_f = self.input_conv(inputs_f, self.kernel_f, self.bias_f,
padding=self.padding)
x_c = self.input_conv(inputs_c, self.kernel_c, self.bias_c,
padding=self.padding)
x_o = self.input_conv(inputs_o, self.kernel_o, self.bias_o,
padding=self.padding)
h_i = self.recurrent_conv(h_tm1_i,
self.recurrent_kernel_i)
h_f = self.recurrent_conv(h_tm1_f,
self.recurrent_kernel_f)
h_c = self.recurrent_conv(h_tm1_c,
self.recurrent_kernel_c)
h_o = self.recurrent_conv(h_tm1_o,
self.recurrent_kernel_o)
i = self.recurrent_activation(x_i + h_i)
f = self.recurrent_activation(x_f + h_f)
c = f * c_tm1 + i * self.activation(x_c + h_c)
o = self.recurrent_activation(x_o + h_o)
h = o * self.activation(c)
if 0 < self.dropout + self.recurrent_dropout:
if training is None:
h._uses_learning_phase = True
return h, [h, c]
def input_conv(self, x, w, b=None, padding='valid'):
conv_out = K.conv2d(x, w, strides=self.strides,
padding=padding,
data_format=self.data_format,
dilation_rate=self.dilation_rate)
if b is not None:
conv_out = K.bias_add(conv_out, b,
data_format=self.data_format)
return conv_out
def recurrent_conv(self, x, w):
conv_out = K.conv2d(x, w, strides=(1, 1),
padding='same',
data_format=self.data_format)
return conv_out
def get_config(self):
config = {'filters': self.filters,
'kernel_size': self.kernel_size,
'strides': self.strides,
'padding': self.padding,
'data_format': self.data_format,
'dilation_rate': self.dilation_rate,
'activation': activations.serialize(self.activation),
'recurrent_activation':
activations.serialize(self.recurrent_activation),
'use_bias': self.use_bias,
'kernel_initializer':
initializers.serialize(self.kernel_initializer),
'recurrent_initializer':
initializers.serialize(self.recurrent_initializer),
'bias_initializer': initializers.serialize(self.bias_initializer),
'unit_forget_bias': self.unit_forget_bias,
'kernel_regularizer':
regularizers.serialize(self.kernel_regularizer),
'recurrent_regularizer':
regularizers.serialize(self.recurrent_regularizer),
'bias_regularizer': regularizers.serialize(self.bias_regularizer),
'kernel_constraint':
constraints.serialize(self.kernel_constraint),
'recurrent_constraint':
constraints.serialize(self.recurrent_constraint),
'bias_constraint': constraints.serialize(self.bias_constraint),
'dropout': self.dropout,
'recurrent_dropout': self.recurrent_dropout}
base_config = super(ConvLSTM2DCell, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
class ConvLSTM2D(ConvRNN2D):
"""Convolutional LSTM.
It is similar to an LSTM layer, but the input transformations
and recurrent transformations are both convolutional.
# Arguments
filters: Integer, the dimensionality of the output space
(i.e. the number output of filters in the convolution).
kernel_size: An integer or tuple/list of n integers, specifying the
dimensions of the convolution window.
strides: An integer or tuple/list of n integers,
specifying the strides of the convolution.
Specifying any stride value != 1 is incompatible with specifying
any `dilation_rate` value != 1.
padding: One of `"valid"` or `"same"` (case-insensitive).
data_format: A string,
one of `"channels_last"` (default) or `"channels_first"`.
The ordering of the dimensions in the inputs.
`"channels_last"` corresponds to inputs with shape
`(batch, time, ..., channels)`
while `"channels_first"` corresponds to
inputs with shape `(batch, time, channels, ...)`.
It defaults to the `image_data_format` value found in your
Keras config file at `~/.keras/keras.json`.
If you never set it, then it will be `"channels_last"`.
dilation_rate: An integer or tuple/list of n integers, specifying
the dilation rate to use for dilated convolution.
Currently, specifying any `dilation_rate` value != 1 is
incompatible with specifying any `strides` value != 1.
activation: Activation function to use
(see [activations](../activations.md)).
If you don't specify anything, no activation is applied
(ie. "linear" activation: `a(x) = x`).
recurrent_activation: Activation function to use
for the recurrent step
(see [activations](../activations.md)).
use_bias: Boolean, whether the layer uses a bias vector.
kernel_initializer: Initializer for the `kernel` weights matrix,
used for the linear transformation of the inputs.
(see [initializers](../initializers.md)).
recurrent_initializer: Initializer for the `recurrent_kernel`
weights matrix,
used for the linear transformation of the recurrent state.
(see [initializers](../initializers.md)).
bias_initializer: Initializer for the bias vector
(see [initializers](../initializers.md)).
unit_forget_bias: Boolean.
If True, add 1 to the bias of the forget gate at initialization.
Use in combination with `bias_initializer="zeros"`.
This is recommended in [Jozefowicz et al.]
(http://www.jmlr.org/proceedings/papers/v37/jozefowicz15.pdf).
kernel_regularizer: Regularizer function applied to
the `kernel` weights matrix
(see [regularizer](../regularizers.md)).
recurrent_regularizer: Regularizer function applied to
the `recurrent_kernel` weights matrix
(see [regularizer](../regularizers.md)).
bias_regularizer: Regularizer function applied to the bias vector
(see [regularizer](../regularizers.md)).
activity_regularizer: Regularizer function applied to
the output of the layer (its "activation").
(see [regularizer](../regularizers.md)).
kernel_constraint: Constraint function applied to
the `kernel` weights matrix
(see [constraints](../constraints.md)).
recurrent_constraint: Constraint function applied to
the `recurrent_kernel` weights matrix
(see [constraints](../constraints.md)).
bias_constraint: Constraint function applied to the bias vector
(see [constraints](../constraints.md)).
return_sequences: Boolean. Whether to return the last output
in the output sequence, or the full sequence.
go_backwards: Boolean (default False).
If True, process the input sequence backwards.
stateful: Boolean (default False). If True, the last state
for each sample at index i in a batch will be used as initial
state for the sample of index i in the following batch.
dropout: Float between 0 and 1.
Fraction of the units to drop for
the linear transformation of the inputs.
recurrent_dropout: Float between 0 and 1.
Fraction of the units to drop for
the linear transformation of the recurrent state.
# Input shape
- if data_format='channels_first'
5D tensor with shape:
`(samples, time, channels, rows, cols)`
- if data_format='channels_last'
5D tensor with shape:
`(samples, time, rows, cols, channels)`
# Output shape
- if `return_sequences`
- if data_format='channels_first'
5D tensor with shape:
`(samples, time, filters, output_row, output_col)`
- if data_format='channels_last'
5D tensor with shape:
`(samples, time, output_row, output_col, filters)`
- else
- if data_format='channels_first'
4D tensor with shape:
`(samples, filters, output_row, output_col)`
- if data_format='channels_last'
4D tensor with shape:
`(samples, output_row, output_col, filters)`
where o_row and o_col depend on the shape of the filter and
the padding
# Raises
ValueError: in case of invalid constructor arguments.
# References
- [Convolutional LSTM Network: A Machine Learning Approach for
Precipitation Nowcasting](http://arxiv.org/abs/1506.04214v1)
The current implementation does not include the feedback loop on the
cells output
"""
@interfaces.legacy_convlstm2d_support
def __init__(self, filters,
kernel_size,
strides=(1, 1),
padding='valid',
data_format=None,
dilation_rate=(1, 1),
activation='tanh',
recurrent_activation='hard_sigmoid',
use_bias=True,
kernel_initializer='glorot_uniform',
recurrent_initializer='orthogonal',
bias_initializer='zeros',
unit_forget_bias=True,
kernel_regularizer=None,
recurrent_regularizer=None,
bias_regularizer=None,
activity_regularizer=None,
kernel_constraint=None,
recurrent_constraint=None,
bias_constraint=None,
return_sequences=False,
go_backwards=False,
stateful=False,
dropout=0.,
recurrent_dropout=0.,
**kwargs):
cell = ConvLSTM2DCell(filters=filters,
kernel_size=kernel_size,
strides=strides,
padding=padding,
data_format=data_format,
dilation_rate=dilation_rate,
activation=activation,
recurrent_activation=recurrent_activation,
use_bias=use_bias,
kernel_initializer=kernel_initializer,
recurrent_initializer=recurrent_initializer,
bias_initializer=bias_initializer,
unit_forget_bias=unit_forget_bias,
kernel_regularizer=kernel_regularizer,
recurrent_regularizer=recurrent_regularizer,
bias_regularizer=bias_regularizer,
kernel_constraint=kernel_constraint,
recurrent_constraint=recurrent_constraint,
bias_constraint=bias_constraint,
dropout=dropout,
recurrent_dropout=recurrent_dropout)
super(ConvLSTM2D, self).__init__(cell,
return_sequences=return_sequences,
go_backwards=go_backwards,
stateful=stateful,
**kwargs)
self.activity_regularizer = regularizers.get(activity_regularizer)
def call(self, inputs, mask=None, training=None, initial_state=None):
return super(ConvLSTM2D, self).call(inputs,
mask=mask,
training=training,
initial_state=initial_state)
@property
def filters(self):
return self.cell.filters
@property
def kernel_size(self):
return self.cell.kernel_size
@property
def strides(self):
return self.cell.strides
@property
def padding(self):
return self.cell.padding
@property
def data_format(self):
return self.cell.data_format
@property
def dilation_rate(self):
return self.cell.dilation_rate
@property
def activation(self):
return self.cell.activation
@property
def recurrent_activation(self):
return self.cell.recurrent_activation
@property
def use_bias(self):
return self.cell.use_bias
@property
def kernel_initializer(self):
return self.cell.kernel_initializer
@property
def recurrent_initializer(self):
return self.cell.recurrent_initializer
@property
def bias_initializer(self):
return self.cell.bias_initializer
@property
def unit_forget_bias(self):
return self.cell.unit_forget_bias
@property
def kernel_regularizer(self):
return self.cell.kernel_regularizer
@property
def recurrent_regularizer(self):
return self.cell.recurrent_regularizer
@property
def bias_regularizer(self):
return self.cell.bias_regularizer
@property
def kernel_constraint(self):
return self.cell.kernel_constraint
@property
def recurrent_constraint(self):
return self.cell.recurrent_constraint
@property
def bias_constraint(self):
return self.cell.bias_constraint
@property
def dropout(self):
return self.cell.dropout
@property
def recurrent_dropout(self):
return self.cell.recurrent_dropout
def get_config(self):
config = {'filters': self.filters,
'kernel_size': self.kernel_size,
'strides': self.strides,
'padding': self.padding,
'data_format': self.data_format,
'dilation_rate': self.dilation_rate,
'activation': activations.serialize(self.activation),
'recurrent_activation':
activations.serialize(self.recurrent_activation),
'use_bias': self.use_bias,
'kernel_initializer':
initializers.serialize(self.kernel_initializer),
'recurrent_initializer':
initializers.serialize(self.recurrent_initializer),
'bias_initializer': initializers.serialize(self.bias_initializer),
'unit_forget_bias': self.unit_forget_bias,
'kernel_regularizer':
regularizers.serialize(self.kernel_regularizer),
'recurrent_regularizer':
regularizers.serialize(self.recurrent_regularizer),
'bias_regularizer': regularizers.serialize(self.bias_regularizer),
'activity_regularizer':
regularizers.serialize(self.activity_regularizer),
'kernel_constraint':
constraints.serialize(self.kernel_constraint),
'recurrent_constraint':
constraints.serialize(self.recurrent_constraint),
'bias_constraint': constraints.serialize(self.bias_constraint),
'dropout': self.dropout,
'recurrent_dropout': self.recurrent_dropout}
base_config = super(ConvLSTM2D, self).get_config()
del base_config['cell']
return dict(list(base_config.items()) + list(config.items()))
@classmethod
def from_config(cls, config):
return cls(**config)