From fae3ea1cda0b1462b57e431a503a8719bbee703d Mon Sep 17 00:00:00 2001
From: Jannik Luboeinski <33398515+jlubo@users.noreply.github.com>
Date: Thu, 25 Apr 2024 09:03:49 +0200
Subject: [PATCH 1/2] ATRLIF neuron model (#846)
* added process and CPU process models of ATRLIF neuron; added a Jupyter notebook to demonstrate the properties of the ATRLIF neuron
* testing of the tutorial
* tests for process and model added; copyright notes added; cleanup
* codacy-related fixed
* comment formatting and copyright notices adjusted
---
.gitignore | 2 +
src/lava/proc/atrlif/models.py | 267 +++++++
src/lava/proc/atrlif/process.py | 138 ++++
tests/lava/proc/atrlif/__init__.py | 0
tests/lava/proc/atrlif/test_models.py | 696 ++++++++++++++++++
tests/lava/proc/atrlif/test_process.py | 43 ++
tests/lava/tutorials/test_tutorials.py | 8 +-
.../tutorial12_adaptive_neurons.ipynb | 329 +++++++++
8 files changed, 1482 insertions(+), 1 deletion(-)
create mode 100644 src/lava/proc/atrlif/models.py
create mode 100644 src/lava/proc/atrlif/process.py
create mode 100644 tests/lava/proc/atrlif/__init__.py
create mode 100644 tests/lava/proc/atrlif/test_models.py
create mode 100644 tests/lava/proc/atrlif/test_process.py
create mode 100644 tutorials/in_depth/tutorial12_adaptive_neurons.ipynb
diff --git a/.gitignore b/.gitignore
index cd5d1d84b..7d71837a3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -145,3 +145,5 @@ dmypy.json
.idea/
.vscode/
.history/
+.flakeheaven_cache/
+tutorials/in_depth/results/
diff --git a/src/lava/proc/atrlif/models.py b/src/lava/proc/atrlif/models.py
new file mode 100644
index 000000000..bb2aa1c9d
--- /dev/null
+++ b/src/lava/proc/atrlif/models.py
@@ -0,0 +1,267 @@
+# Copyright (C) 2024 Intel Corporation
+# Copyright (C) 2024 Jannik Luboeinski
+# SPDX-License-Identifier: BSD-3-Clause
+# See: https://spdx.org/licenses/
+
+import numpy as np
+from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol
+from lava.magma.core.model.py.ports import PyInPort, PyOutPort
+from lava.magma.core.model.py.type import LavaPyType
+from lava.magma.core.resources import CPU
+from lava.magma.core.decorator import implements, requires, tag
+from lava.magma.core.model.py.model import PyLoihiProcessModel
+
+from lava.proc.atrlif.process import ATRLIF
+
+
+@implements(proc=ATRLIF, protocol=LoihiProtocol)
+@requires(CPU)
+@tag("floating_pt")
+class PyATRLIFModelFloat(PyLoihiProcessModel):
+ """
+ Implementation of Adaptive Threshold and Refractoriness Leaky-Integrate-
+ and-Fire neuron process in floating-point precision. This short and simple
+ ProcessModel can be used for quick algorithmic prototyping, without
+ engaging with the nuances of a fixed-point implementation.
+ """
+ a_in: PyInPort = LavaPyType(PyInPort.VEC_DENSE, float)
+ s_out = None
+ i: np.ndarray = LavaPyType(np.ndarray, float)
+ v: np.ndarray = LavaPyType(np.ndarray, float)
+ theta: np.ndarray = LavaPyType(np.ndarray, float)
+ r: np.ndarray = LavaPyType(np.ndarray, float)
+ s: np.ndarray = LavaPyType(np.ndarray, bool)
+ bias_mant: np.ndarray = LavaPyType(np.ndarray, float)
+ bias_exp: np.ndarray = LavaPyType(np.ndarray, float)
+ delta_i: float = LavaPyType(float, float)
+ delta_v: float = LavaPyType(float, float)
+ delta_theta: float = LavaPyType(float, float)
+ delta_r: float = LavaPyType(float, float)
+ theta_0: float = LavaPyType(float, float)
+ theta_step: float = LavaPyType(float, float)
+ s_out: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, float)
+
+ def __init__(self, proc_params):
+ super(PyATRLIFModelFloat, self).__init__(proc_params)
+
+ def subthr_dynamics(self, activation_in: np.ndarray):
+ """
+ Sub-threshold dynamics for the model:
+ i[t] = (1-delta_i)*i[t-1] + x[t]
+ v[t] = (1-delta_v)*v[t-1] + i[t] + bias_mant
+ theta[t] = (1-delta_theta)*(theta[t-1] - theta_0) + theta_0
+ r[t] = (1-delta_r)*r[t-1]
+ """
+ self.i[:] = (1 - self.delta_i) * self.i + activation_in
+ self.v[:] = (1 - self.delta_v) * self.v + self.i + self.bias_mant
+ self.theta[:] = (1 - self.delta_theta) * (self.theta - self.theta_0) \
+ + self.theta_0
+ self.r[:] = (1 - self.delta_r) * self.r
+
+ def post_spike(self, spike_vector: np.ndarray):
+ """
+ Post spike/refractory behavior:
+ r[t] = r[t] + 2*theta[t]
+ theta[t] = theta[t] + theta_step
+ """
+ # For spiking neurons, set new values for refractory state and
+ # threshold
+ r_spiking = self.r[spike_vector]
+ theta_spiking = self.theta[spike_vector]
+ self.r[spike_vector] = r_spiking + 2 * theta_spiking
+ self.theta[spike_vector] = theta_spiking + self.theta_step
+
+ def run_spk(self):
+ """
+ The run function that performs the actual computation. Processes spike
+ events that occur if (v[t] - r[t]) >= theta[t].
+ """
+ # Receive synaptic input
+ a_in_data = self.a_in.recv()
+
+ # Perform the sub-threshold and spike computations
+ self.subthr_dynamics(activation_in=a_in_data)
+ self.s[:] = (self.v - self.r) >= self.theta
+ self.post_spike(spike_vector=self.s)
+ self.s_out.send(self.s)
+
+
+@implements(proc=ATRLIF, protocol=LoihiProtocol)
+@requires(CPU)
+@tag("bit_accurate_loihi", "fixed_pt")
+class PyATRLIFModelFixed(PyLoihiProcessModel):
+ """
+ Implementation of Adaptive Threshold and Refractoriness Leaky-Integrate-
+ and-Fire neuron process in fixed-point precision, bit-by-bit mimicking the
+ fixed-point computation behavior of Loihi 2.
+ """
+ a_in: PyInPort = LavaPyType(PyInPort.VEC_DENSE, np.int16, precision=16)
+ i: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24)
+ v: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24)
+ theta: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24)
+ r: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24)
+ s: np.ndarray = LavaPyType(np.ndarray, bool)
+ bias_mant: np.ndarray = LavaPyType(np.ndarray, np.int16, precision=13)
+ bias_exp: np.ndarray = LavaPyType(np.ndarray, np.int16, precision=3)
+ delta_i: int = LavaPyType(int, np.uint16, precision=12)
+ delta_v: int = LavaPyType(int, np.uint16, precision=12)
+ delta_theta: int = LavaPyType(int, np.uint16, precision=12)
+ delta_r: int = LavaPyType(int, np.uint16, precision=12)
+ theta_0: int = LavaPyType(int, np.uint16, precision=12)
+ theta_step: int = LavaPyType(int, np.uint16, precision=12)
+ s_out: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, np.int32, precision=24)
+
+ def __init__(self, proc_params):
+ super(PyATRLIFModelFixed, self).__init__(proc_params)
+
+ # The `ds_offset` constant enables setting decay constant values to
+ # exact 4096 = 2**12. Without it, the range of 12-bit unsigned
+ # `delta_i` is 0 to 4095.
+ self.ds_offset = 1
+ self.isthrscaled = False
+ self.effective_bias = 0
+ # State variables i and v are 24 bits wide
+ self.iv_bitwidth = 24
+ self.max_iv_val = 2**(self.iv_bitwidth - 1)
+ # Decays need an MSB alignment by 12 bits
+ self.decay_shift = 12
+ self.decay_unity = 2**self.decay_shift
+ # Threshold and incoming activation are MSB-aligned by 6 bits
+ self.theta_unity = 2**6
+ self.act_unity = 2**6
+
+ def subthr_dynamics(self, activation_in: np.ndarray):
+ """
+ Sub-threshold dynamics for the model:
+ i[t] = (1-delta_i)*i[t-1] + x[t]
+ v[t] = (1-delta_v)*v[t-1] + i[t] + bias_mant
+ theta[t] = (1-delta_theta)*(theta[t-1] - theta_0) + theta_0
+ r[t] = (1-delta_r)*r[t-1]
+ """
+ # Update current
+ # --------------
+ # Multiplication is done for left shifting, offset is added
+ decay_const_i = self.delta_i * self.decay_unity + self.ds_offset
+ # Below, i is promoted to int64 to avoid overflow of the product
+ # between i and decay constant beyond int32.
+ # Subsequent right shift by 12 brings us back within 24-bits (and
+ # hence, within 32-bits).
+ i_decayed = np.int64(self.i * (self.decay_unity - decay_const_i))
+ i_decayed = np.sign(i_decayed) * np.right_shift(
+ np.abs(i_decayed), self.decay_shift
+ )
+ # Multiplication is done for left shifting (to account for MSB
+ # alignment done by the hardware).
+ activation_in = activation_in * self.act_unity
+ # Add synaptic input to decayed current
+ i_updated = np.int32(i_decayed + activation_in)
+ # Check if value of current is within bounds of 24-bit. Overflows are
+ # handled by wrapping around modulo.
+ # 2 ** 23. E.g., (2 ** 23) + k becomes k and -(2**23 + k) becomes -k
+ wrapped_curr = np.where(
+ i_updated > self.max_iv_val,
+ i_updated - 2 * self.max_iv_val,
+ i_updated,
+ )
+ wrapped_curr = np.where(
+ wrapped_curr <= -self.max_iv_val,
+ i_updated + 2 * self.max_iv_val,
+ wrapped_curr,
+ )
+ self.i[:] = wrapped_curr
+
+ # Update voltage (proceeding similar to current update)
+ # -----------------------------------------------------
+ decay_const_v = self.delta_v * self.decay_unity
+ neg_voltage_limit = -np.int32(self.max_iv_val) + 1
+ pos_voltage_limit = np.int32(self.max_iv_val) - 1
+ v_decayed = np.int64(self.v) * np.int64(self.decay_unity
+ - decay_const_v)
+ v_decayed = np.sign(v_decayed) * np.right_shift(
+ np.abs(v_decayed), self.decay_shift
+ )
+ v_updated = np.int32(v_decayed + self.i + self.effective_bias)
+ self.v[:] = np.clip(v_updated, neg_voltage_limit, pos_voltage_limit)
+
+ # Update threshold (proceeding similar to current update)
+ # -------------------------------------------------------
+ decay_const_theta = self.delta_theta * self.decay_unity
+ theta_diff_decayed = np.int64(self.theta - self.theta_0) * \
+ np.int64(self.decay_unity - decay_const_theta)
+ theta_diff_decayed = np.sign(theta_diff_decayed) * np.right_shift(
+ np.abs(theta_diff_decayed), self.decay_shift
+ )
+ self.theta[:] = np.int32(theta_diff_decayed) + self.theta_0
+ # TODO do clipping here?
+
+ # Update refractoriness (decaying similar to current)
+ # ---------------------------------------------------
+ decay_const_r = self.delta_r * self.decay_unity
+ r_decayed = np.int64(self.r) * np.int64(self.decay_unity
+ - decay_const_r)
+ r_decayed = np.sign(r_decayed) * np.right_shift(
+ np.abs(r_decayed), self.decay_shift
+ )
+ self.r[:] = np.int32(r_decayed)
+ # TODO do clipping here?
+
+ def scale_bias(self):
+ """
+ Scale bias with bias exponent by taking into account sign of the
+ exponent.
+ """
+ # Create local copy of `bias_mant` with promoted dtype to prevent
+ # overflow when applying shift of `bias_exp`.
+ bias_mant = self.bias_mant.copy().astype(np.int32)
+ self.effective_bias = np.where(
+ self.bias_exp >= 0,
+ np.left_shift(bias_mant, self.bias_exp),
+ np.right_shift(bias_mant, -self.bias_exp),
+ )
+
+ def scale_threshold(self):
+ """
+ Scale threshold according to the way Loihi hardware scales it. In Loihi
+ hardware, threshold is left-shifted by 6-bits to MSB-align it with
+ other state variables of higher precision.
+ """
+ # Multiplication is done for left shifting
+ self.theta_0 = np.int32(self.theta_0 * self.theta_unity)
+ self.theta = np.full(self.theta.shape, self.theta_0)
+ self.theta_step = np.int32(self.theta_step * self.theta_unity)
+ self.isthrscaled = True
+
+ def post_spike(self, spike_vector: np.ndarray):
+ """
+ Post spike/refractory behavior:
+ r[t] = r[t] + 2*theta[t]
+ theta[t] = theta[t] + theta_step
+ """
+ # For spiking neurons, set new values for refractory state and
+ # threshold.
+ r_spiking = self.r[spike_vector]
+ theta_spiking = self.theta[spike_vector]
+ self.r[spike_vector] = r_spiking + 2 * theta_spiking
+ self.theta[spike_vector] = theta_spiking + self.theta_step
+
+ def run_spk(self):
+ """
+ The run function that performs the actual computation. Processes spike
+ events that occur if (v[t] - r[t]) >= theta[t].
+ """
+ # Receive synaptic input
+ a_in_data = self.a_in.recv()
+
+ # Compute effective bias
+ self.scale_bias()
+
+ # Compute scaled threshold-related variables only once, not every
+ # timestep (has to be done once after object construction).
+ if not self.isthrscaled:
+ self.scale_threshold()
+
+ # Perform the sub-threshold and spike computations
+ self.subthr_dynamics(activation_in=a_in_data)
+ self.s[:] = (self.v - self.r) >= self.theta
+ self.post_spike(spike_vector=self.s)
+ self.s_out.send(self.s)
diff --git a/src/lava/proc/atrlif/process.py b/src/lava/proc/atrlif/process.py
new file mode 100644
index 000000000..1f5bf8152
--- /dev/null
+++ b/src/lava/proc/atrlif/process.py
@@ -0,0 +1,138 @@
+# Copyright (C) 2024 Intel Corporation
+# Copyright (C) 2024 Jannik Luboeinski
+# SPDX-License-Identifier: BSD-3-Clause
+# See: https://spdx.org/licenses/
+
+import numpy as np
+import typing as ty
+
+from lava.magma.core.process.process import AbstractProcess, LogConfig
+from lava.magma.core.process.variable import Var
+from lava.magma.core.process.ports.ports import InPort, OutPort
+
+
+class ATRLIF(AbstractProcess):
+ """
+ Adaptive Threshold and Refractoriness Leaky-Integrate-and-Fire Process.
+ With activation input port `a_in` and spike output port `s_out`.
+
+ Note that non-integer parameter values are supported, but can lead to
+ deviating results in models that employ fixed-point computation.
+
+ Dynamics (cf. https://github.com/lava-nc/lava-dl/blob/main/src/lava/lib/dl/
+ slayer/neuron/alif.py,
+ https://github.com/lava-nc/lava-dl/blob/main/tutorials/lava/
+ lib/dl/slayer/neuron_dynamics/dynamics.ipynb):
+ i[t] = (1-delta_i)*i[t-1] + x[t]
+ v[t] = (1-delta_v)*v[t-1] + i[t] + bias
+ theta[t] = (1-delta_theta)*(theta[t-1] - theta_0) + theta_0
+ r[t] = (1-delta_r)*r[t-1]
+
+ Spike event:
+ s[t] = (v[t] - r[t]) >= theta[t]
+
+ Post spike event:
+ r[t] = r[t] + 2*theta[t]
+ theta[t] = theta[t] + theta_step
+
+ Parameters
+ ----------
+ shape : tuple(int)
+ Number and topology of LIF neurons.
+ i : float, list, numpy.ndarray, optional
+ Initial value of the neuron's current.
+ v : float, list, numpy.ndarray, optional
+ Initial value of the neuron's voltage (membrane potential).
+ theta : float, list, numpy.ndarray, optional
+ Initial value of the threshold
+ r : float, list, numpy.ndarray, optional
+ Initial value of the refractory state
+ s : bool, list, numpy.ndarray, optional
+ Initial spike state
+ delta_i : float, optional
+ Decay constant for current i.
+ delta_v : float, optional
+ Decay constant for voltage v.
+ delta_theta : float, optional
+ Decay constant for threshold theta.
+ delta_r : float, optional
+ Decay constant for refractory state r.
+ theta_0 : float, optional
+ Initial/baselien value of threshold theta.
+ theta_step : float, optional
+ Increase of threshold theta upon the occurrence of a spike.
+ bias_mant : float, list, numpy.ndarray, optional
+ Mantissa part of the neuron's bias.
+ bias_exp : float, list, numpy.ndarray, optional
+ Exponent part of the neuron's bias, if needed. Mostly for fixed-point
+ implementations. Ignored for floating-point implementations.
+
+ Example
+ -------
+ >>> atrlif = ATRLIF(shape=(200, 15), decay_theta=10, decay_v=5)
+ This will create 200x15 ATRLIF neurons that all have the same threshold
+ decay of 10 and voltage decay of 5.
+ """
+
+ def __init__(
+ self,
+ *,
+ shape: ty.Tuple[int, ...],
+ i: ty.Optional[ty.Union[float, list, np.ndarray]] = 0,
+ v: ty.Optional[ty.Union[float, list, np.ndarray]] = 0,
+ theta: ty.Optional[ty.Union[float, list, np.ndarray]] = 5,
+ r: ty.Optional[ty.Union[float, list, np.ndarray]] = 0,
+ s: ty.Optional[ty.Union[bool, list, np.ndarray]] = 0,
+ delta_i: ty.Optional[float] = 0.4,
+ delta_v: ty.Optional[float] = 0.4,
+ delta_theta: ty.Optional[float] = 0.2,
+ delta_r: ty.Optional[float] = 0.2,
+ theta_0: ty.Optional[float] = 5,
+ theta_step: ty.Optional[float] = 3.75,
+ bias_mant: ty.Optional[ty.Union[float, list, np.ndarray]] = 0,
+ bias_exp: ty.Optional[ty.Union[float, list, np.ndarray]] = 0,
+ name: ty.Optional[str] = None,
+ log_config: ty.Optional[LogConfig] = None
+ ) -> None:
+
+ super().__init__(
+ shape=shape,
+ i=i,
+ v=v,
+ theta=theta,
+ r=r,
+ s=s,
+ delta_i=delta_i,
+ delta_v=delta_v,
+ delta_theta=delta_theta,
+ delta_r=delta_r,
+ theta_0=theta_0,
+ theta_step=theta_step,
+ bias_mant=bias_mant,
+ bias_exp=bias_exp,
+ name=name,
+ log_config=log_config
+ )
+
+ # Ports
+ self.a_in = InPort(shape=shape)
+ self.s_out = OutPort(shape=shape)
+
+ # Bias
+ self.bias_mant = Var(shape=shape, init=bias_mant)
+ self.bias_exp = Var(shape=shape, init=bias_exp)
+
+ # Variables
+ self.i = Var(shape=shape, init=i)
+ self.v = Var(shape=shape, init=v)
+ self.theta = Var(shape=shape, init=theta)
+ self.r = Var(shape=shape, init=r)
+ self.s = Var(shape=shape, init=s)
+
+ # Parameters
+ self.delta_i = Var(shape=(1,), init=delta_i)
+ self.delta_v = Var(shape=(1,), init=delta_v)
+ self.delta_theta = Var(shape=(1,), init=delta_theta)
+ self.delta_r = Var(shape=(1,), init=delta_r)
+ self.theta_0 = Var(shape=(1,), init=theta_0)
+ self.theta_step = Var(shape=(1,), init=theta_step)
diff --git a/tests/lava/proc/atrlif/__init__.py b/tests/lava/proc/atrlif/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tests/lava/proc/atrlif/test_models.py b/tests/lava/proc/atrlif/test_models.py
new file mode 100644
index 000000000..6db21b91f
--- /dev/null
+++ b/tests/lava/proc/atrlif/test_models.py
@@ -0,0 +1,696 @@
+# Copyright (C) 2024 Intel Corporation
+# Copyright (C) 2024 Jannik Luboeinski
+# SPDX-License-Identifier: BSD-3-Clause
+# See: https://spdx.org/licenses/
+
+import unittest
+import numpy as np
+
+from lava.magma.core.decorator import implements, requires, tag
+from lava.magma.core.model.py.model import PyLoihiProcessModel
+from lava.magma.core.model.py.ports import PyOutPort, PyInPort
+from lava.magma.core.model.py.type import LavaPyType
+from lava.magma.core.process.ports.ports import OutPort, InPort
+from lava.magma.core.process.process import AbstractProcess
+from lava.magma.core.process.variable import Var
+from lava.magma.core.resources import CPU
+from lava.magma.core.run_configs import RunConfig
+from lava.magma.core.run_conditions import RunSteps
+from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol
+from lava.proc.atrlif.process import ATRLIF
+
+
+class AtrlifRunConfig(RunConfig):
+ """Run configuration selects appropriate ATRLIF ProcessModel based on tag:
+ floating point precision or Loihi bit-accurate fixed point precision"""
+ def __init__(self, custom_sync_domains=None, select_tag='fixed_pt'):
+ super().__init__(custom_sync_domains=custom_sync_domains)
+ self.select_tag = select_tag
+
+ def select(self, proc, proc_models):
+ for pm in proc_models:
+ if self.select_tag in pm.tags:
+ return pm
+ raise AssertionError("No legal ProcessModel found.")
+
+
+class VecSendProcess(AbstractProcess):
+ """
+ Process of a user-defined shape that sends an arbitrary vector
+
+ Parameters
+ ----------
+ shape: tuple, shape of the process
+ vec_to_send: np.ndarray, vector of spike values to send
+ send_at_times: np.ndarray, vector bools. Send the `vec_to_send` at times
+ when there is a True
+ """
+ def __init__(self, **kwargs):
+ super().__init__()
+ shape = kwargs.pop("shape", (1,))
+ vec_to_send = kwargs.pop("vec_to_send")
+ send_at_times = kwargs.pop("send_at_times")
+ num_steps = kwargs.pop("num_steps", 1)
+ self.shape = shape
+ self.num_steps = num_steps
+ self.vec_to_send = Var(shape=shape, init=vec_to_send)
+ self.send_at_times = Var(shape=(num_steps,), init=send_at_times)
+ self.s_out = OutPort(shape=shape)
+
+
+class VecRecvProcess(AbstractProcess):
+ """
+ Process that receives arbitrary vectors
+
+ Parameters
+ ----------
+ shape: tuple, shape of the process
+ """
+ def __init__(self, **kwargs):
+ super().__init__()
+ shape = kwargs.get("shape", (1,))
+ self.shape = shape
+ self.s_in = InPort(shape=(shape[1],))
+ self.spk_data = Var(shape=shape, init=0) # This Var expands with time
+
+
+@implements(proc=VecSendProcess, protocol=LoihiProtocol)
+@requires(CPU)
+# Following tag is needed to discover the ProcessModel using AtrlifRunConfig
+@tag('floating_pt')
+class PyVecSendModelFloat(PyLoihiProcessModel):
+ s_out: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, float)
+ vec_to_send: np.ndarray = LavaPyType(np.ndarray, float)
+ send_at_times: np.ndarray = LavaPyType(np.ndarray, bool, precision=1)
+
+ def run_spk(self):
+ """
+ Send `spikes_to_send` if current time-step requires it
+ """
+ if self.send_at_times[self.time_step - 1]:
+ self.s_out.send(self.vec_to_send)
+ else:
+ self.s_out.send(np.zeros_like(self.vec_to_send))
+
+
+@implements(proc=VecSendProcess, protocol=LoihiProtocol)
+@requires(CPU)
+# Following tag is needed to discover the ProcessModel using AtrlifRunConfig
+@tag('fixed_pt')
+class PyVecSendModelFixed(PyLoihiProcessModel):
+ s_out: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, np.int16, precision=16)
+ vec_to_send: np.ndarray = LavaPyType(np.ndarray, np.int16, precision=16)
+ send_at_times: np.ndarray = LavaPyType(np.ndarray, bool, precision=1)
+
+ def run_spk(self):
+ """
+ Send `spikes_to_send` if current time-step requires it
+ """
+ if self.send_at_times[self.time_step - 1]:
+ self.s_out.send(self.vec_to_send)
+ else:
+ self.s_out.send(np.zeros_like(self.vec_to_send))
+
+
+@implements(proc=VecRecvProcess, protocol=LoihiProtocol)
+@requires(CPU)
+# Following tag is needed to discover the ProcessModel using AtrlifRunConfig
+@tag('floating_pt')
+class PySpkRecvModelFloat(PyLoihiProcessModel):
+ s_in: PyInPort = LavaPyType(PyInPort.VEC_DENSE, bool, precision=1)
+ spk_data: np.ndarray = LavaPyType(np.ndarray, float)
+
+ def run_spk(self):
+ """Receive spikes and store in an internal variable"""
+ spk_in = self.s_in.recv()
+ self.spk_data[self.time_step - 1, :] = spk_in
+
+
+@implements(proc=VecRecvProcess, protocol=LoihiProtocol)
+@requires(CPU)
+# Following tag is needed to discover the ProcessModel using AtrlifRunConfig
+@tag('fixed_pt')
+class PySpkRecvModelFixed(PyLoihiProcessModel):
+ s_in: PyInPort = LavaPyType(PyInPort.VEC_DENSE, bool, precision=1)
+ spk_data: np.ndarray = LavaPyType(np.ndarray, int, precision=1)
+
+ def run_spk(self):
+ """Receive spikes and store in an internal variable"""
+ spk_in = self.s_in.recv()
+ self.spk_data[self.time_step - 1, :] = spk_in
+
+
+class TestATRLIFProcessModelsFloat(unittest.TestCase):
+ """
+ Tests for floating point ProcessModels of ATRLIF, resembling the
+ existing tests for the LIF process.
+ """
+ def test_float_pm_no_decay(self):
+ """
+ Tests floating point ATRLIF ProcessModel with no current or voltage
+ decay and neurons driven by internal biases.
+ """
+ shape = (10,)
+ num_steps = 50
+ # Set up external input to 0
+ sps = VecSendProcess(shape=shape, num_steps=num_steps,
+ vec_to_send=np.zeros(shape, dtype=float),
+ send_at_times=np.ones((num_steps,), dtype=bool))
+ # `delta_i` and `delta_v` = 0 => bias driven neurons spike first after
+ # `theta_0 / bias` time steps, then less often due to the refractor-
+ # iness. For the test implementation below, `theta_0` has to be a
+ # multiple of `bias`.
+ bias = 2
+ theta_0 = 4
+ neur = ATRLIF(shape=shape,
+ delta_i=0.,
+ delta_v=0.,
+ delta_theta=0.,
+ delta_r=0.,
+ theta_0=theta_0,
+ theta=theta_0,
+ theta_step=0.,
+ bias_mant=bias * np.ones(shape, dtype=float))
+ # Receive neuron spikes
+ spr = VecRecvProcess(shape=(num_steps, shape[0]))
+ sps.s_out.connect(neur.a_in)
+ neur.s_out.connect(spr.s_in)
+ # Configure execution and run
+ rcnd = RunSteps(num_steps=num_steps)
+ rcfg = AtrlifRunConfig(select_tag='floating_pt')
+ neur.run(condition=rcnd, run_cfg=rcfg)
+ # Gather spike data and stop
+ spk_data_through_run = spr.spk_data.get()
+ neur.stop()
+ # Compute the number of time steps until the first spike
+ t_spike_0 = theta_0 // bias
+ # Compute the following number of time steps until the second spike
+ # (according to `bias * (t_spike_0 + t_spike_refr) - 2 * theta_0 >=
+ # theta_0`)
+ t_spike_refr = 3 * theta_0 // bias - t_spike_0
+ # Gold standard for the test
+ expected_spk_data = np.zeros((t_spike_0 + t_spike_refr + 1, shape[0]))
+ expected_spk_data[t_spike_0 - 1:t_spike_0 + t_spike_refr + 1:
+ t_spike_refr, :] = 1.
+ spk_data_through_run_needed = \
+ spk_data_through_run[0:t_spike_0 + t_spike_refr + 1, :]
+ self.assertTrue(np.all(expected_spk_data
+ == spk_data_through_run_needed))
+
+ def test_float_pm_impulse_delta_i(self):
+ """
+ Tests floating point ATRLIF ProcessModel's impulse response with no
+ voltage decay and input activation at the very first time-step.
+ """
+ # Use a single neuron
+ shape = (1,)
+ num_steps = 8
+ # Send activation of 128. at timestep = 1
+ sps = VecSendProcess(shape=shape, num_steps=num_steps,
+ vec_to_send=(2 ** 7) * np.ones(shape,
+ dtype=float),
+ send_at_times=np.array([True, False, False,
+ False, False, False,
+ False, False]))
+ # Set up no bias, no voltage decay. Current decay = 0.5.
+ # Set up high constant threshold, such that there are no output spikes.
+ neur = ATRLIF(shape=shape,
+ delta_i=0.5,
+ delta_v=0.,
+ delta_theta=0.,
+ delta_r=0.,
+ theta_0=256.,
+ theta=256.,
+ theta_step=0.,
+ bias_mant=np.zeros(shape, dtype=float))
+ spr = VecRecvProcess(shape=(num_steps, shape[0]))
+ sps.s_out.connect(neur.a_in)
+ neur.s_out.connect(spr.s_in)
+ # Configure to run 1 step at a time
+ rcnd = RunSteps(num_steps=1)
+ rcfg = AtrlifRunConfig(select_tag='floating_pt')
+ neur_i = []
+ # Run 1 timestep at a time and collect state variable i
+ for _ in range(num_steps):
+ neur.run(condition=rcnd, run_cfg=rcfg)
+ neur_i.append(neur.i.get()[0])
+ neur.stop()
+ # Gold standard for testing: current decay of 0.5 should halve the
+ # current every time-step
+ expected_i_timeseries = [2. ** (7 - j) for j in range(8)]
+ self.assertListEqual(expected_i_timeseries, neur_i)
+
+ def test_float_pm_impulse_delta_v(self):
+ """
+ Tests floating point ATRLIF ProcessModel's impulse response with no
+ current decay and input activation at the very first time-step.
+ """
+ # Use a single neuron
+ shape = (1,)
+ num_steps = 8
+ # Send activation of 128. at timestep = 1
+ sps = VecSendProcess(shape=shape, num_steps=num_steps,
+ vec_to_send=(2 ** 7) * np.ones(shape,
+ dtype=float),
+ send_at_times=np.array([True, False, False,
+ False, False, False,
+ False, False]))
+ # Set up no bias, no current decay. Voltage decay = 0.5.
+ # Set up high constant threshold, such that there are no output spikes.
+ neur = ATRLIF(shape=shape,
+ delta_i=0.,
+ delta_v=0.5,
+ delta_theta=0.,
+ delta_r=0.,
+ theta_0=256.,
+ theta=256.,
+ theta_step=0.,
+ bias_mant=np.zeros(shape, dtype=float))
+ spr = VecRecvProcess(shape=(num_steps, shape[0]))
+ sps.s_out.connect(neur.a_in)
+ neur.s_out.connect(spr.s_in)
+ # Configure to run 1 step at a time
+ rcnd = RunSteps(num_steps=1)
+ rcfg = AtrlifRunConfig(select_tag='floating_pt')
+ neur_v = []
+ # Run 1 timestep at a time and collect state variable v
+ for _ in range(num_steps):
+ neur.run(condition=rcnd, run_cfg=rcfg)
+ neur_v.append(neur.v.get()[0])
+ neur.stop()
+ # Gold standard for testing: voltage decay of 0.5 should integrate
+ # the voltage from 128. to 255., with steps of 64., 32., 16., etc.
+ expected_v_timeseries = [128., 192., 224., 240.,
+ 248., 252., 254., 255.]
+ self.assertListEqual(expected_v_timeseries, neur_v)
+
+ def test_float_pm_instant_theta_decay(self):
+ """
+ Tests floating point ATRLIF ProcessModel's behavior for instant decay
+ of the threshold variable in the presence of constant bias.
+ """
+ # Use a single neuron
+ shape = (1,)
+ num_steps = 20
+ # Set up external input to 0
+ sps = VecSendProcess(shape=shape, num_steps=num_steps,
+ vec_to_send=np.zeros(shape, dtype=float),
+ send_at_times=np.ones((num_steps,), dtype=bool))
+ # `delta_i` and `delta_v` = 0 => bias driven neurons spike first after
+ # `theta_0 / bias` time steps, then less often due to the refractor-
+ # iness. For the test implementation below, `theta_0` has to be a
+ # multiple of `bias`. Following a spike, the threshold `theta` is
+ # increased tremendously (by 10.), but this remains without effect
+ # due to the instant decay (`delta_theta=1.`).
+ bias = 2
+ theta_0 = 4
+ neur = ATRLIF(shape=shape,
+ delta_i=0.,
+ delta_v=0.,
+ delta_theta=1.,
+ delta_r=0.,
+ theta_0=theta_0,
+ theta=theta_0,
+ theta_step=10.,
+ bias_mant=bias * np.ones(shape, dtype=float))
+ # Receive neuron spikes
+ spr = VecRecvProcess(shape=(num_steps, shape[0]))
+ sps.s_out.connect(neur.a_in)
+ neur.s_out.connect(spr.s_in)
+ # Configure execution and run
+ rcnd = RunSteps(num_steps=num_steps)
+ rcfg = AtrlifRunConfig(select_tag='floating_pt')
+ neur.run(condition=rcnd, run_cfg=rcfg)
+ # Gather spike data and stop
+ spk_data_through_run = spr.spk_data.get()
+ neur.stop()
+ # Compute the number of time steps until the first spike
+ t_spike_0 = theta_0 // bias
+ # Compute the following number of time steps until the second spike
+ # (according to `bias * (t_spike_0 + t_spike_refr) - 2 * theta_0 >=
+ # theta_0`)
+ t_spike_refr = 3 * theta_0 // bias - t_spike_0
+ # Gold standard for the test
+ expected_spk_data = np.zeros((t_spike_0 + t_spike_refr + 1, shape[0]))
+ expected_spk_data[t_spike_0 - 1:t_spike_0 + t_spike_refr + 1:
+ t_spike_refr, :] = 1.
+ spk_data_through_run_needed = \
+ spk_data_through_run[0:t_spike_0 + t_spike_refr + 1, :]
+ self.assertTrue(np.all(expected_spk_data
+ == spk_data_through_run_needed))
+
+ def test_float_pm_instant_r_decay(self):
+ """
+ Tests floating point ATRLIF ProcessModel's behavior for instant decay
+ of the refractory variable in the presence of constant bias.
+ """
+ # Use a single neuron
+ shape = (1,)
+ num_steps = 20
+ # Set up external input to 0
+ sps = VecSendProcess(shape=shape, num_steps=num_steps,
+ vec_to_send=np.zeros(shape, dtype=float),
+ send_at_times=np.ones((num_steps,), dtype=bool))
+ # `delta_i` and `delta_v` = 0 => bias driven neurons spike first after
+ # `theta_0 / bias` time steps. Following a spike, the threshold `theta`
+ # is automatically increased by `2 * theta`, but this remains without
+ # effect due to the instant decay (`delta_r=1.`).
+ bias = 8
+ theta_0 = 16
+ neur = ATRLIF(shape=shape,
+ delta_i=0.,
+ delta_v=0.,
+ delta_theta=0.,
+ delta_r=1.,
+ theta_0=theta_0,
+ theta=theta_0,
+ theta_step=0.,
+ bias_mant=bias * np.ones(shape, dtype=float))
+ # Receive neuron spikes
+ spr = VecRecvProcess(shape=(num_steps, shape[0]))
+ sps.s_out.connect(neur.a_in)
+ neur.s_out.connect(spr.s_in)
+ # Configure execution and run
+ rcnd = RunSteps(num_steps=num_steps)
+ rcfg = AtrlifRunConfig(select_tag='floating_pt')
+ neur.run(condition=rcnd, run_cfg=rcfg)
+ # Gather spike data and stop
+ spk_data_through_run = spr.spk_data.get()
+ neur.stop()
+ # Compute the number of time steps until the first spike
+ t_spike_0 = theta_0 // bias
+ # Compute the following number of time steps until the second spike
+ # (according to `bias * (t_spike_0 + t_spike_refr) >= theta_0`)
+ t_spike_refr = theta_0 // bias - t_spike_0 + 1
+ # Gold standard for the test
+ expected_spk_data = np.zeros((t_spike_0 + t_spike_refr + 1, shape[0]))
+ expected_spk_data[t_spike_0 - 1:t_spike_0 + t_spike_refr + 1:
+ t_spike_refr, :] = 1.
+ spk_data_through_run_needed = \
+ spk_data_through_run[0:t_spike_0 + t_spike_refr + 1, :]
+ self.assertTrue(np.all(expected_spk_data
+ == spk_data_through_run_needed))
+
+
+class TestATRLIFProcessModelsFixed(unittest.TestCase):
+ """
+ Tests for fixed point ProcessModels of ATRLIF (which are bit-accurate
+ with Loihi hardware), resembling the existing tests for the LIF process.
+ """
+ def test_bitacc_pm_no_decay(self):
+ """
+ Tests fixed point ATRLIF ProcessModel (bit-accurate
+ with Loihi hardware) with no current or voltage
+ decay and neurons driven by internal biases.
+ """
+ shape = (10,)
+ num_steps = 50
+ # Set up external input to 0
+ sps = VecSendProcess(shape=shape, num_steps=num_steps,
+ vec_to_send=np.zeros(shape, dtype=np.int16),
+ send_at_times=np.ones((num_steps,), dtype=bool))
+ # Set up bias = 2 * 2**6 = 128 and threshold = 8<<6
+ # `delta_i` and `delta_v` = 0 => bias driven neurons spike first after
+ # `theta_0 / bias` time steps, then less often due to the refractor-
+ # iness. For the test implementation below, `theta_0` has to be a
+ # multiple of `bias`.
+ bias = 4
+ theta_0 = 8
+ neur = ATRLIF(shape=shape,
+ delta_i=0,
+ delta_v=0,
+ delta_theta=0,
+ delta_r=0,
+ theta_0=theta_0,
+ theta=theta_0,
+ theta_step=0,
+ bias_mant=bias * np.ones(shape, dtype=np.int32),
+ bias_exp=6 * np.ones(shape, dtype=np.int32))
+ # Receive neuron spikes
+ spr = VecRecvProcess(shape=(num_steps, shape[0]))
+ sps.s_out.connect(neur.a_in)
+ neur.s_out.connect(spr.s_in)
+ # Configure execution and run
+ rcnd = RunSteps(num_steps=num_steps)
+ rcfg = AtrlifRunConfig(select_tag='fixed_pt')
+ neur.run(condition=rcnd, run_cfg=rcfg)
+ # Gather spike data and stop
+ spk_data_through_run = spr.spk_data.get()
+ neur.stop()
+ # Compute the number of time steps until the first spike
+ t_spike_0 = theta_0 // bias
+ # Compute the following number of time steps until the second spike
+ # (according to `bias * (t_spike_0 + t_spike_refr) - 2 * theta_0 >=
+ # theta_0`)
+ t_spike_refr = 3 * theta_0 // bias - t_spike_0
+ # Gold standard for the test
+ expected_spk_data = np.zeros((t_spike_0 + t_spike_refr + 1, shape[0]))
+ expected_spk_data[t_spike_0 - 1:t_spike_0 + t_spike_refr + 1:
+ t_spike_refr, :] = 1.
+ spk_data_through_run_needed = \
+ spk_data_through_run[0:t_spike_0 + t_spike_refr + 1, :]
+ self.assertTrue(np.all(expected_spk_data
+ == spk_data_through_run_needed))
+
+ def test_bitacc_pm_impulse_delta_i(self):
+ """
+ Tests fixed point ATRLIF ProcessModel's impulse response with no
+ voltage decay and input activation at the very first time-step.
+ """
+ # Use a single neuron
+ shape = (1,)
+ num_steps = 8
+ # Send activation of 128. at timestep = 1
+ sps = VecSendProcess(shape=shape, num_steps=num_steps,
+ vec_to_send=128 * np.ones(shape, dtype=np.int32),
+ send_at_times=np.array([True, False, False,
+ False, False, False,
+ False, False]))
+ # Set up no bias, no voltage decay. Current decay is a 12-bit
+ # unsigned variable in Loihi hardware. Therefore, 2**-12 is the
+ # equivalent of 1. The subtracted 1 is added by default in the
+ # hardware via the `ds_offset` setting, thereby finally giving
+ # `delta_i = 2048 = 0.5 * 2**12`.
+ # Set up threshold high, such that there are no output spikes. By
+ # default the threshold value here is left-shifted by 6.
+ neur = ATRLIF(shape=shape,
+ delta_i=0.5 - (2**-12),
+ delta_v=0,
+ delta_theta=0,
+ delta_r=0,
+ theta_0=256 * np.ones(shape, dtype=np.int32),
+ theta=256 * np.ones(shape, dtype=np.int32),
+ theta_step=0,
+ bias_mant=np.zeros(shape, dtype=np.int16),
+ bias_exp=np.ones(shape, dtype=np.int16))
+ spr = VecRecvProcess(shape=(num_steps, shape[0]))
+ sps.s_out.connect(neur.a_in)
+ neur.s_out.connect(spr.s_in)
+ # Configure to run 1 step at a time
+ rcnd = RunSteps(num_steps=1)
+ rcfg = AtrlifRunConfig(select_tag='fixed_pt')
+ neur_i = []
+ # Run 1 timestep at a time and collect state variable i
+ for _ in range(num_steps):
+ neur.run(condition=rcnd, run_cfg=rcfg)
+ neur_i.append(neur.i.get().astype(np.int32)[0])
+ neur.stop()
+ # Gold standard for testing: current decay of 0.5 should halve the
+ # current every time-step.
+ expected_i_timeseries = [1 << (13 - j) for j in range(8)]
+ # Gold standard for floating point equivalent of the current,
+ # which would be all Loihi-bit-accurate values right shifted by 6 bits
+ expected_float_i = [1 << (7 - j) for j in range(8)]
+ self.assertListEqual(expected_i_timeseries, neur_i)
+ self.assertListEqual(expected_float_i, np.right_shift(np.array(
+ neur_i), 6).tolist())
+
+ def test_bitacc_pm_impulse_delta_v(self):
+ """
+ Tests fixed point ATRLIF ProcessModel's impulse response with no
+ current decay and input activation at the very first time-step.
+ """
+ # Use a single neuron
+ shape = (1,)
+ num_steps = 8
+ # Send activation of 128. at timestep = 1
+ sps = VecSendProcess(shape=shape, num_steps=num_steps,
+ vec_to_send=128 * np.ones(shape, dtype=np.int32),
+ send_at_times=np.array([True, False, False,
+ False, False, False,
+ False, False]))
+ # Set up no bias, no current decay.
+ # Set up threshold high, such that there are no output spikes.
+ # Threshold provided here is left-shifted by 6-bits.
+ neur = ATRLIF(shape=shape,
+ delta_i=0,
+ delta_v=0.5,
+ delta_theta=0,
+ delta_r=0,
+ theta_0=256 * np.ones(shape, dtype=np.int32),
+ theta=256 * np.ones(shape, dtype=np.int32),
+ theta_step=0,
+ bias_mant=np.zeros(shape, dtype=np.int16),
+ bias_exp=np.ones(shape, dtype=np.int16))
+ spr = VecRecvProcess(shape=(num_steps, shape[0]))
+ sps.s_out.connect(neur.a_in)
+ neur.s_out.connect(spr.s_in)
+ # Configure to run 1 step at a time
+ rcnd = RunSteps(num_steps=1)
+ rcfg = AtrlifRunConfig(select_tag='fixed_pt')
+ neur_v = []
+ # Run 1 timestep at a time and collect state variable u
+ for _ in range(num_steps):
+ neur.run(condition=rcnd, run_cfg=rcfg)
+ neur_v.append(neur.v.get().astype(np.int32)[0])
+ neur.stop()
+ # Gold standard for testing: with a voltage decay of 2048, voltage
+ # should integrate from 128<<6 to 255<<6. But it is slightly smaller,
+ # because current decay is not exactly 0. Due to the default
+ # ds_offset = 1 setting in the hardware, current decay = 1. So
+ # voltage is slightly smaller than 128<<6 to 255<<6.
+ expected_v_timeseries = [8192, 12286, 14331, 15351, 15859, 16111,
+ 16235, 16295]
+ # Gold standard for floating point equivalent of the voltage,
+ # which would be all Loihi-bit-accurate values right shifted by 6 bits
+ expected_float_v = [128, 192, 224, 240, 248, 252, 254, 255]
+ neur_v_float = np.right_shift(np.array(neur_v), 6)
+ neur_v_float[1:] += 1 # This compensates the drift caused by ds_offset
+ self.assertListEqual(expected_v_timeseries, neur_v)
+ self.assertListEqual(expected_float_v, neur_v_float.tolist())
+
+ def test_bitacc_pm_scaling_of_bias(self):
+ """
+ Tests fixed point ATRLIF ProcessModel's scaling of threshold.
+ """
+ bias_mant = 2 ** 12 - 1
+ bias_exp = 5
+ # Set up high threshold and high bias current to check for potential
+ # overflow in effective bias in single neuron.
+ neur = ATRLIF(shape=(1,),
+ delta_i=0,
+ delta_v=0.5,
+ delta_theta=0,
+ delta_r=0,
+ theta_0=2 ** 17,
+ theta=2 ** 17,
+ theta_step=0,
+ bias_mant=bias_mant,
+ bias_exp=bias_exp)
+
+ rcnd = RunSteps(num_steps=1)
+ rcfg = AtrlifRunConfig(select_tag='fixed_pt')
+
+ neur.run(condition=rcnd, run_cfg=rcfg)
+ neur_v = neur.v.get()[0]
+ neur.stop()
+
+ # Check if neur_v has correct value.
+ self.assertEqual(neur_v, bias_mant * 2 ** bias_exp)
+
+ def test_fixed_pm_instant_theta_decay(self):
+ """
+ Tests fixed point ATRLIF ProcessModel's behavior for instant decay
+ of the threshold variable in the presence of constant bias.
+ """
+ # Use a single neuron
+ shape = (1,)
+ num_steps = 20
+ # Set up external input to 0
+ sps = VecSendProcess(shape=shape, num_steps=num_steps,
+ vec_to_send=np.zeros(shape, dtype=float),
+ send_at_times=np.ones((num_steps,), dtype=bool))
+ # `delta_i` and `delta_v` = 0 => bias driven neurons spike first after
+ # `theta_0 / bias` time steps, then less often due to the refractor-
+ # iness. For the test implementation below, `theta_0` has to be a
+ # multiple of `bias`. Following a spike, the threshold `theta` is
+ # increased tremendously (by 10.), but this remains without effect
+ # due to the instant decay (`delta_theta=1.`).
+ bias = 2
+ theta_0 = 4
+ neur = ATRLIF(shape=shape,
+ delta_i=0,
+ delta_v=0,
+ delta_theta=1,
+ delta_r=0,
+ theta_0=theta_0,
+ theta=theta_0,
+ theta_step=10,
+ bias_mant=bias * np.ones(shape, dtype=float))
+ # Receive neuron spikes
+ spr = VecRecvProcess(shape=(num_steps, shape[0]))
+ sps.s_out.connect(neur.a_in)
+ neur.s_out.connect(spr.s_in)
+ # Configure execution and run
+ rcnd = RunSteps(num_steps=num_steps)
+ rcfg = AtrlifRunConfig(select_tag='floating_pt')
+ neur.run(condition=rcnd, run_cfg=rcfg)
+ # Gather spike data and stop
+ spk_data_through_run = spr.spk_data.get()
+ neur.stop()
+ # Compute the number of time steps until the first spike
+ t_spike_0 = theta_0 // bias
+ # Compute the following number of time steps until the second spike
+ # (according to `bias * (t_spike_0 + t_spike_refr) - 2 * theta_0 >=
+ # theta_0`)
+ t_spike_refr = 3 * theta_0 // bias - t_spike_0
+ # Gold standard for the test
+ expected_spk_data = np.zeros((t_spike_0 + t_spike_refr + 1, shape[0]))
+ expected_spk_data[t_spike_0 - 1:t_spike_0 + t_spike_refr + 1:
+ t_spike_refr, :] = 1.
+ spk_data_through_run_needed = \
+ spk_data_through_run[0:t_spike_0 + t_spike_refr + 1, :]
+ self.assertTrue(np.all(expected_spk_data
+ == spk_data_through_run_needed))
+
+ def test_fixed_pm_instant_r_decay(self):
+ """
+ Tests fixed point ATRLIF ProcessModel's behavior for instant decay
+ of the refractory variable in the presence of constant bias.
+ """
+ # Use a single neuron
+ shape = (1,)
+ num_steps = 20
+ # Set up external input to 0
+ sps = VecSendProcess(shape=shape, num_steps=num_steps,
+ vec_to_send=np.zeros(shape, dtype=float),
+ send_at_times=np.ones((num_steps,), dtype=bool))
+ # `delta_i` and `delta_v` = 0 => bias driven neurons spike first after
+ # `theta_0 / bias` time steps. Following a spike, the threshold `theta`
+ # is automatically increased by `2 * theta`, but this remains without
+ # effect due to the instant decay (`delta_r=1`).
+ bias = 8
+ theta_0 = 16
+ neur = ATRLIF(shape=shape,
+ delta_i=0,
+ delta_v=0,
+ delta_theta=0,
+ delta_r=1,
+ theta_0=theta_0,
+ theta=theta_0,
+ theta_step=0,
+ bias_mant=bias * np.ones(shape, dtype=float))
+ # Receive neuron spikes
+ spr = VecRecvProcess(shape=(num_steps, shape[0]))
+ sps.s_out.connect(neur.a_in)
+ neur.s_out.connect(spr.s_in)
+ # Configure execution and run
+ rcnd = RunSteps(num_steps=num_steps)
+ rcfg = AtrlifRunConfig(select_tag='floating_pt')
+ neur.run(condition=rcnd, run_cfg=rcfg)
+ # Gather spike data and stop
+ spk_data_through_run = spr.spk_data.get()
+ neur.stop()
+ # Compute the number of time steps until the first spike
+ t_spike_0 = theta_0 // bias
+ # Compute the following number of time steps until the second spike
+ # (according to `bias * (t_spike_0 + t_spike_refr) >= theta_0`)
+ t_spike_refr = theta_0 // bias - t_spike_0 + 1
+ # Gold standard for the test
+ expected_spk_data = np.zeros((t_spike_0 + t_spike_refr + 1, shape[0]))
+ expected_spk_data[t_spike_0 - 1:t_spike_0 + t_spike_refr + 1:
+ t_spike_refr, :] = 1.
+ spk_data_through_run_needed = \
+ spk_data_through_run[0:t_spike_0 + t_spike_refr + 1, :]
+ self.assertTrue(np.all(expected_spk_data
+ == spk_data_through_run_needed))
diff --git a/tests/lava/proc/atrlif/test_process.py b/tests/lava/proc/atrlif/test_process.py
new file mode 100644
index 000000000..c0a16e94a
--- /dev/null
+++ b/tests/lava/proc/atrlif/test_process.py
@@ -0,0 +1,43 @@
+# Copyright (C) 2024 Intel Corporation
+# Copyright (C) 2024 Jannik Luboeinski
+# SPDX-License-Identifier: BSD-3-Clause
+# See: https://spdx.org/licenses/
+
+import unittest
+import numpy as np
+from lava.proc.atrlif.process import ATRLIF
+
+
+class TestATRLIFProcess(unittest.TestCase):
+ """Tests for ATRLIF class"""
+ def test_init(self):
+ """Tests instantiation of ATRLIF neuron"""
+ N = 100
+ delta_i = 0.6
+ delta_v = 0.6
+ delta_theta = 0.4
+ delta_r = 0.4
+ theta_0 = 4
+ theta_step = 2
+ bias_mant = 2 * np.ones((N,), dtype=float)
+ bias_exp = np.ones((N,), dtype=float)
+ name = "ATRLIF"
+
+ neur = ATRLIF(shape=(N,),
+ delta_i=delta_i,
+ delta_v=delta_v,
+ delta_theta=delta_theta,
+ delta_r=delta_r,
+ theta_0=theta_0,
+ theta=theta_0,
+ theta_step=theta_step,
+ bias_mant=bias_mant,
+ bias_exp=bias_exp,
+ name=name)
+
+ self.assertEqual(neur.proc_params["shape"], (N,))
+ self.assertEqual(neur.delta_i.init, delta_i)
+ self.assertEqual(neur.delta_v.init, delta_v)
+ self.assertListEqual(neur.bias_mant.init.tolist(), bias_mant.tolist())
+ self.assertListEqual(neur.bias_exp.init.tolist(), bias_exp.tolist())
+ self.assertEqual(neur.name, name)
diff --git a/tests/lava/tutorials/test_tutorials.py b/tests/lava/tutorials/test_tutorials.py
index 3f752c081..53996e692 100644
--- a/tests/lava/tutorials/test_tutorials.py
+++ b/tests/lava/tutorials/test_tutorials.py
@@ -1,4 +1,5 @@
-# Copyright (C) 2022 Intel Corporation
+# Copyright (C) 2022-2024 Intel Corporation
+# Copyright (C) 2024 Jannik Luboeinski
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/
@@ -285,6 +286,11 @@ def test_in_depth_11_serialization(self):
"""Test tutorial serialization."""
self._run_notebook("tutorial11_serialization.ipynb")
+ @unittest.skipIf(system_name != "linux", "Tests work on linux")
+ def test_in_depth_12_adaptive_neurons(self):
+ """Test tutorial adaptive_neurons."""
+ self._run_notebook("tutorial12_adaptive_neurons.ipynb")
+
@unittest.skipIf(system_name != "linux", "Tests work on linux")
def test_in_depth_clp_01(self):
"""Test tutorial CLP 01."""
diff --git a/tutorials/in_depth/tutorial12_adaptive_neurons.ipynb b/tutorials/in_depth/tutorial12_adaptive_neurons.ipynb
new file mode 100644
index 000000000..6dc80a662
--- /dev/null
+++ b/tutorials/in_depth/tutorial12_adaptive_neurons.ipynb
@@ -0,0 +1,329 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "*Copyright (C) 2024 Jannik Luboeinski*
\n",
+ "*SPDX-License-Identifier: BSD-3-Clause*
\n",
+ "*See: https://spdx.org/licenses/*\n",
+ "\n",
+ "---\n",
+ "\n",
+ "# ATRLIF neuron in different implementations\n",
+ "_A Leaky Integrate-and-Fire neuron with adaptive threshold and adaptive refractoriness._"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Imports"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Import general modules\n",
+ "import numpy as np\n",
+ "import matplotlib.pyplot as plt\n",
+ "import os\n",
+ "\n",
+ "# Import Lava core modules\n",
+ "from lava.magma.core.run_configs import Loihi2SimCfg, Loihi2HwCfg\n",
+ "from lava.magma.core.run_conditions import RunSteps\n",
+ "\n",
+ "# Import Lava monitors\n",
+ "from lava.proc.monitor.process import Monitor\n",
+ "\n",
+ "# Import ATRLIF process\n",
+ "from lava.proc.atrlif.process import *"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Simulation test function"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def simulation_test(run_config, label, bias_mant=3, bias_exp=0, **kwargs):\n",
+ " '''\n",
+ " Function to simulate and monitor a population of ATRLIF neurons.\n",
+ " \n",
+ " Parameters\n",
+ " ----------\n",
+ " run_config : `AbstractLoihiSimRunCfg`\n",
+ " Run configuratrion object for Lava.\n",
+ " label : `str`\n",
+ " Label for the current simulation.\n",
+ " bias_mant : `float`, optional\n",
+ " Mantissa part of neuron's bias. Equals `bias` for floating-point implementation.\n",
+ " bias_exp : `int`, optional\n",
+ " Exponent part of neuron's bias, if needed. Ignored in floating-point implementation.\n",
+ " **kwargs : `dict`\n",
+ " Additional keyword arguments (cf. 'atrlif_process.py').\n",
+ " '''\n",
+ "\n",
+ " # Initialization\n",
+ " n_neurons = 1 # the number of neurons\n",
+ " num_steps = 20 # the number of timesteps\n",
+ " output_period = 1 # the sampling period (in timesteps)\n",
+ " num_samples = num_steps // output_period + 1 # the number of samples\n",
+ " atrlif = ATRLIF(shape=(n_neurons,), bias_mant=bias_mant, bias_exp=bias_exp, **kwargs)\n",
+ " \n",
+ " # Set monitors for the different variables\n",
+ " voltage_mon = Monitor()\n",
+ " voltage_mon.probe(atrlif.v, num_samples)\n",
+ " refractory_mon = Monitor()\n",
+ " refractory_mon.probe(atrlif.r, num_samples)\n",
+ " threshold_mon = Monitor()\n",
+ " threshold_mon.probe(atrlif.theta, num_samples)\n",
+ " spike_mon = Monitor()\n",
+ " spike_mon.probe(atrlif.s_out, num_samples)\n",
+ " \n",
+ " # Run the simulation\n",
+ " atrlif.run(condition=RunSteps(num_steps=num_samples, blocking=True), run_cfg=run_config)\n",
+ " \n",
+ " # Retrieve process name\n",
+ " process_name = atrlif.v.process.name\n",
+ " \n",
+ " # Collect samples\n",
+ " data_v = voltage_mon.get_data()[process_name]['v']\n",
+ " data_r = refractory_mon.get_data()[process_name]['r']\n",
+ " data_theta = threshold_mon.get_data()[process_name]['theta']\n",
+ " data_s = spike_mon.get_data()[process_name]['s_out']\n",
+ " \n",
+ " # Stop the simulation\n",
+ " atrlif.stop()\n",
+ " \n",
+ " # Normalize data from fixed-point implementation (right shifting by `bias_exp`)\n",
+ " if bias_exp > 0:\n",
+ " data_v = data_v * 2**(-bias_exp)\n",
+ " data_r = data_r * 2**(-bias_exp)\n",
+ " data_theta = data_theta * 2**(-bias_exp)\n",
+ " \n",
+ " # Save voltage and spike data\n",
+ " header = \"t\"\n",
+ " for var in [\"v\", \"r\", \"theta\", \"s\"]:\n",
+ " for n in range(n_neurons):\n",
+ " header += f\"\\t{var}_{n}\"\n",
+ " times = np.arange(0, len(data_v))\n",
+ " os.makedirs(\"./results\", exist_ok=True)\n",
+ " np.savetxt(f\"./results/atrlif_v_{label}.txt\",\n",
+ " np.column_stack([times, data_v, data_r, data_theta, data_s]), \n",
+ " fmt=\"%.0f\"+n_neurons*\"\\t%.4f\\t%.4f\\t%.4f\"+n_neurons*\"\\t%.0f\", \n",
+ " header=header)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Plotting function"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def plot(data_stacked_1, data_stacked_2, framework_name_1, framework_name_2, X_cols, store_path = \"figure.svg\"):\n",
+ " '''\n",
+ " Function to plot the results from two different paradigms.\n",
+ " Based on https://github.com/jlubo/memory-consolidation-stc/blob/main/analysis/plotSimResultsComparisonMeanSEM.py.\n",
+ " \n",
+ " Parameters\n",
+ " ----------\n",
+ " data_stacked_1 : `numpy.ndarray`\n",
+ " Data array of first paradigm. First column contains the time, the other columns contain the data specified via `X_cols`.\n",
+ " data_stacked_2 : `numpy.ndarray`\n",
+ " Data array of second paradigm. First column contains the time, the other columns contain the data specified via `X_cols`.\n",
+ " framework_name_1 : `str`\n",
+ " Name of the first framework/paradigm.\n",
+ " framework_name_2 : `str`\n",
+ " Name of the second framework/paradigm.\n",
+ " X_cols : `dict` of `int`\n",
+ " Dictionary specifying the data columns to be plotted.\n",
+ " store_path : `str`, optional\n",
+ " Path to store the resulting graphics file.\n",
+ " '''\n",
+ " \n",
+ " # Figure setting\n",
+ " fig, axes = plt.subplots(nrows=2, ncols=1, sharex=True, figsize=(8, 8), height_ratios=[1.0, 0.4])\n",
+ " \n",
+ " # Plot membrane voltage, effective voltage, and threshold dynamics\n",
+ " axes[0].set_ylabel(f\"Membrane dynamics (mV)\")\n",
+ " axes[0].plot(data_stacked_1[:,0], data_stacked_1[:,X_cols[\"theta\"]], color=\"#aaeeaa\", label=f\"Threshold {framework_name_1}\", marker='None', zorder=9)\n",
+ " axes[0].plot(data_stacked_2[:,0], data_stacked_2[:,X_cols[\"theta\"]], color=\"#556655\", linestyle='dotted', label=f\"Threshold {framework_name_2}\", marker='None', zorder=10)\n",
+ " axes[0].legend()\n",
+ " axes[0].plot(data_stacked_1[:,0], (data_stacked_1[:,X_cols[\"voltage\"]]-data_stacked_1[:,X_cols[\"ref\"]]), color=\"#ff0000\", label=f\"Effective voltage {framework_name_1}\", marker='None', zorder=9)\n",
+ " axes[0].plot(data_stacked_2[:,0], (data_stacked_2[:,X_cols[\"voltage\"]]-data_stacked_2[:,X_cols[\"ref\"]]), color=\"#330000\", linestyle='dashed', label=f\"Effective voltage {framework_name_2}\", marker='None', zorder=10)\n",
+ " axes[0].legend()\n",
+ " # Set x-ticks (to integer values only)\n",
+ " axes[0].set_xticks(np.arange(min(data_stacked_1[:,0]), np.ceil(max(data_stacked_1[:,0])) + 1, step=2))\n",
+ " axes[0].set_xticks(np.arange(min(data_stacked_1[:,0]), np.ceil(max(data_stacked_1[:,0])) + 1, step=1), minor=True)\n",
+ "\n",
+ " # Plot spikes\n",
+ " axes[1].set_xlabel(\"Time (steps)\")\n",
+ " axes[1].set_ylabel(f\"Spikes\")\n",
+ " # Get logical masks\n",
+ " mask_1 = data_stacked_1[:,X_cols[\"spike\"]] > 0.5\n",
+ " mask_2 = data_stacked_2[:,X_cols[\"spike\"]] > 0.5\n",
+ " axes[1].plot(data_stacked_1[:,0][mask_1], data_stacked_1[:,X_cols[\"spike\"]][mask_1], color=\"#ff0000\", marker='o', \n",
+ " linestyle='none', label=framework_name_1, zorder=9)\n",
+ " axes[1].plot(data_stacked_2[:,0][mask_2], data_stacked_2[:,X_cols[\"spike\"]][mask_2], color=\"#330000\", marker='o', markerfacecolor='none', \n",
+ " linestyle='none', label=framework_name_2, zorder=10)\n",
+ " axes[1].tick_params(left = False, labelleft = False)\n",
+ " axes[1].legend()\n",
+ "\n",
+ " # Save figure as vector graphics\n",
+ " fig.savefig(store_path)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Running the simulations"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "PyATRLIFModelFloat initialized\n",
+ "PyATRLIFModelFixed initialized\n",
+ "PyATRLIFModelFloat initialized\n",
+ "PyATRLIFModelFixed initialized\n"
+ ]
+ }
+ ],
+ "source": [
+ "# Simulate with default values in floating- and fixed-point implementation\n",
+ "simulation_test(Loihi2SimCfg(select_tag=\"floating_pt\"),\n",
+ " \"cpu-float\")\n",
+ "simulation_test(Loihi2SimCfg(select_tag=\"fixed_pt\"),\n",
+ " \"cpu-fixed\",\n",
+ " bias_exp=6)\n",
+ "\n",
+ "# Simulate with constant threshold dynamics in floating- and fixed-point implementation\n",
+ "simulation_test(Loihi2SimCfg(select_tag=\"floating_pt\"),\n",
+ " \"cpu-float_theta_const\",\n",
+ " delta_theta=0,\n",
+ " theta_step=0)\n",
+ "simulation_test(Loihi2SimCfg(select_tag=\"fixed_pt\"),\n",
+ " \"cpu-fixed_theta_const\",\n",
+ " delta_theta=0,\n",
+ " theta_step=0,\n",
+ " bias_exp=6)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Plotting the results from default setting\n",
+ "_The membrane voltage minus the value of the refractory state yields the effective voltage, which is compared against the threshold value to determine spiking._\n",
+ "\n",
+ "_The dynamics of the threshold hampers spiking if the last spike occurred not too long ago._"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/png": "",
+ "text/plain": [
+ "