# -*coding: utf-8 -*-
from ..kernel import Kernel
from ..graph import Node, Parameter
from ..containers import Tensor, CircleBuffer
from ..core import MPEnums
from .kernel_utils import extract_nested_data
import numpy as np
[docs]
class RunningAverageKernel(Kernel):
"""
Kernel to calculate running average across multiple trials in a session.
Trials are automatically included into the next running average
calculation.
Parameters
----------
inA : Tensor or Scalar
Single Trial input data to the RunningAverageKernel; should be a 2D Tensor or Scalar object
outA : Tensor or Scalar
Output Tensor to store output of mean trial calculation; should be the same size
of the input tensor or a scalar.
running_average_len : int
Indicates the maximum number of trials that the running average kernel will be used to compute.
Used to preallocate tensor to store previous trial data
axis : None or 0:
Axis by which to calculate running average. Currently only supports mean across trials when axis = 0
(ie. Average Tensor layer values), or single value mean, axis = None
flush_on_init : bool, default = False
If true, flushes the buffer on initialization.
"""
def __init__(self, graph, inA, outA, running_average_len, axis = 0, flush_on_init = False):
""" Init """
super().__init__('RunningAverage',MPEnums.INIT_FROM_NONE,graph)
self.inputs = [inA]
self.outputs = [outA]
self._running_average_len = running_average_len
self._flush_on_init = flush_on_init
self._axis = axis
self._data_buff = None
def _verify(self):
"""
Verify the inputs and outputs are appropriately sized
"""
# need to set the CB here to ensure that inA size is available TODO, kinda hacky, might need to require that size is defined by user
self._data_buff = CircleBuffer.create(self.session, self._running_average_len,
Tensor.create(self.session, self.inputs[0].shape))
def _initialize(self, init_inputs, init_outputs, labels):
"""
This kernel has no internal state to be initialized. Call initialization_execution
if downstream nodes are missing training data.
Parameters
----------
init_inputs: Tensor or Scalar
Input data
init_outputs: Tensor or Scalar
Output data
"""
init_in = init_inputs[0]
init_out = init_outputs[0]
if self._flush_on_init:
self._data_buff.flush()
if init_in is not None:
# check that the input is a tensor or array
accepted_inputs = (MPEnums.ARRAY,MPEnums.TENSOR,MPEnums.CIRCLE_BUFFER)
if init_in.mp_type not in accepted_inputs:
raise TypeError("Running Average Kernel: Initialization input must be a Tensor or Circle Buffer")
# extract the data fron the input and place it into the buffer
for i in range(init_in.num_elements):
data = extract_nested_data(init_in,i)
self._data_buff.enqueue(data)
if init_out is not None:
if init_out.virtual:
init_out.shape = self.outputs[0].shape
# extract the data from the buffer
X = extract_nested_data(self._data_buff)
init_out.data = np.mean(X,axis=self._axis)
def _process_data(self, inputs, outputs):
"""
Add input data to data object storing previous trials and process the stacked data
Parameters
----------
inputs: list of Tensors or Scalars
Input data container, list of length 1
outputs: list of Tensors or Scalars
Output data container, list of length 1
"""
# enqueue the input data
self._data_buff.enqueue(inputs[0])
# extract the data from the buffer
X = extract_nested_data(self._data_buff)
outputs[0].data = np.mean(X,axis=self._axis)
[docs]
@classmethod
def add_to_graph(cls, graph, inA, outA, running_average_len, axis=0, flush_on_init=False, init_input=None, init_labels=None):
"""
Factory method to create running average node and add it to the specified graph
Parameters
----------
graph : Graph
The graph where the node object should be added
inA : Tensor or Scalar
Single Trial input data to the RunningAverageKernel; should be a 2D Tensor or Scalar object
outA : Tensor or Scalar
Output Tensor to store output of mean trial calculation; should be the same size
of the input tensor or a scalar.
running_average_len : int
Indicates the maximum number of trials that the running average kernel will be used to compute.
Used to preallocate tensor to store previous trial data
axis : None or 0:
Axis by which to calculate running average. Currently only supports mean across trials when axis = 0
(ie. Average Tensor layer values), or single value mean, axis = None
flush_on_init : bool
If true, flushes the buffer on initialization.
Returns
-------
node : Node
The node object that was added to the graph containing the running average kernel
"""
kernel = cls(graph, inA, outA, running_average_len, axis, flush_on_init)
params = (Parameter(inA,MPEnums.INPUT),
Parameter(outA,MPEnums.OUTPUT))
node = Node(graph, kernel, params)
graph.add_node(node)
# if initialization data is provided, add it to the node
if init_input is not None:
node.add_initialization_data([init_input], init_labels)
return node