Module openrtdynamics2.lang.standard_library
Expand source code
import math
import typing as t
from . import lang as dy
from .signal_interface import SignalUserTemplate
import numpy as np
from .core_blocks import generic_subsystem, const, gain, convert, add, operator1, logic_and, logic_or, logic_xor, bitwise_and, bitwise_or, bitwise_shift_left, bitwise_shift_right, comparison, switchNto1, conditional_overwrite, sqrt, sin, cos, tan, atan, asin, acos, abs, logic_not, bitwise_not, atan2, pow, fmod, generic_cpp_static, flipflop, memory, memory_read, delay__, cpp_allocate_class, cpp_call_class_member_function
#
# constants
#
def int32(value):
"""Cast anything to DataTypeInt32
Convert the input value to a signal.
Parameters
----------
value : SignalUserTemplate, int
the signal or a constant value to convert to a signal
Returns
-------
SignalUserTemplate
the signal of type int32
"""
if isinstance( value, SignalUserTemplate ):
# already a singal
return value
else:
# create a new constant
return dy.const(value, dy.DataTypeInt32(1) )
def float64(value):
"""Cast anything to DataTypeFloat64
Convert the input value to a signal.
Parameters
----------
value : SignalUserTemplate, float
the signal or a constant value to convert to a signal
Returns
-------
SignalUserTemplate
the signal of type float64
"""
if isinstance( value, SignalUserTemplate ):
# already a singal
return value
else:
# create a new constant
return dy.const(value, dy.DataTypeFloat64(1) )
def boolean(value):
"""Cast anything to DataTypeBoolean
Convert the input value to a signal.
Parameters
----------
value : SignalUserTemplate, int
the signal or a constant value to convert to a signal
Returns
-------
SignalUserTemplate
the signal of type boolean
"""
if isinstance( value, SignalUserTemplate ):
# already a singal
return value
else:
# create a new constant
return dy.const(value, dy.DataTypeBoolean(1) )
#
#
#
def initial_event():
"""Emits an event on the first sampling instant after the reset of the system
Returns
-------
SignalUserTemplate
the signal of type boolean containing the event
"""
# TODO: introduce caching like done for counter()
return dy.counter() == int32(0)
#
# Delay - the basis for all dynamic elements
#
def delay(u , initial_state = None):
"""Unit delay
Delay the input u by one sampling instant:
y[k+1] = u[k], y[0] = initial_state
Parameters
----------
u : SignalUserTemplate
the input signal to delay
initial_state : SignalUserTemplate
the initial state (signal or constant value)
Returns
-------
SignalUserTemplate
the one-step delayed input
"""
if not isinstance( initial_state, SignalUserTemplate ):
return dy.delay__( u, initial_state )
else:
event_on_first_sample = initial_event()
delayed_input = dy.delay__( u, None )
delayed_input = dy.conditional_overwrite( delayed_input, event_on_first_sample, initial_state )
return delayed_input
def sample_and_hold(u, event, initial_state = None):
"""Sample & hold
Samples the input when event is true and hold this value for the proceeding time instants.
Parameters
----------
u : SignalUserTemplate
the input to sample
event : SignalUserTemplate
the event on which sampling of the input is performed
initial_state : SignalUserTemplate
the initial output
Returns
-------
SignalUserTemplate
the sampled input
"""
# NOTE: this could be implemented in a more comp. efficient way directly in C in block_prototypes.py
y = dy.signal()
delayed_y = delay( y, initial_state )
y << dy.conditional_overwrite( delayed_y, event, u )
return y
#
# static functions
#
def unwrap_angle(angle, normalize_around_zero = False):
"""Unwrap an angle
Unwrap and normalize the input angle to the range
[0, 2*pi[ in case normalize_around_zero == false
or [-pi, pi] in case normalize_around_zero == true
Parameters
----------
angle : SignalUserTemplate
the input signal (angle in radians)
Returns
-------
SignalUserTemplate
the output signal
"""
def normalize_around_zero(angle):
"""
Normalize an angle
Normalize an angle to a range [-pi, pi]
Important: the assumed range for the input is - 2*pi <= angle <= 2*p
"""
tmp = angle + dy.conditional_overwrite( dy.float64(0), angle <= float64(-math.pi), 2*math.pi )
normalized_angle = tmp + dy.conditional_overwrite( dy.float64(0), angle > float64(math.pi), -2*math.pi )
return normalized_angle
#
#
angle_ = dy.fmod(angle, dy.float64(2*math.pi) )
unwrapped_angle = angle_ + dy.conditional_overwrite( dy.float64(0), angle_ < float64(0), 2*math.pi )
if normalize_around_zero:
return normalize_around_zero(unwrapped_angle)
else:
return unwrapped_angle
def saturate(u, lower_limit = None, upper_limit = None):
"""Saturation
The output is the saturated input
Parameters
----------
lower_limit : SignalUserTemplate
lower bound for the output
upper_limit : SignalUserTemplate
upper bound for the output
Returns
-------
SignalUserTemplate
the integer output signal
Details
-------
{ lower_limit for u < lower_limit
y = { u otherwise
{ upper_limit for u > upper_limit
"""
y = u
if lower_limit is not None:
y = dy.conditional_overwrite( y, y < float64(lower_limit), lower_limit )
if upper_limit is not None:
y = dy.conditional_overwrite( y, y > float64(upper_limit), upper_limit )
return y
def rate_limit( u, Ts, lower_limit, upper_limit, initial_state = 0 ):
"""Rate limiter
Parameters
----------
Ts : SignalUserTemplate
sampling time (constant)
lower_limit : SignalUserTemplate
lower rate limit
upper_limit : SignalUserTemplate
upper rate limit
Returns
-------
SignalUserTemplate
the output signal
"""
Ts_ = float64(Ts)
y = dy.signal()
omega = u - y
omega_sat = saturate(omega, float64(lower_limit) * Ts_, float64(upper_limit) * Ts_)
y << euler_integrator( omega_sat, 1, initial_state=initial_state)
return y
#
# Counters
#
# TODO: mark as private
class __Counter():
"""
This class is meant to store the counter output signal as it might be used
by more than one destination block. The instance of this class is per simulation
and will be stored in the components property of the current get_system_context()
"""
def __init__(self, counter_signal : SignalUserTemplate):
self.counter_signal_ = counter_signal
self.hits = 0
@property
def output(self):
self.hits += 1
# print("counter cache hits ***: " + str(self.hits) )
return self.counter_signal_
#
# dynamic functions
#
def counter():
"""Basic counter - the sampling index k
The integer output is increasing with each sampling instant by 1.
Counting starts at zero.
Returns
-------
SignalUserTemplate
the integer output signal describing the sampling index k
"""
if not 'counter' in dy.get_system_context().components:
# no counter has been defined in this system so far. Hence, create one.
increase = dy.const(1, dy.DataTypeInt32(1) )
cnt = dy.signal()
tmp = dy.delay(cnt + increase)
cnt << tmp
tmp.set_name('shared_counter')
# store the output signal of the counter as it might be used again.
dy.get_system_context().components['counter'] = __Counter(tmp)
else:
# use the output of an already created counter
tmp = dy.get_system_context().components['counter'].output
return tmp
def counter_triggered( upper_limit, stepwidth=None, initial_state = 0, reset=None, reset_on_limit:bool=False, start_trigger=None, pause_trigger=None, auto_start:bool=True, no_delay:bool=False ):
"""A generic counter
Features:
- upper limit
- triggerable start/pause
- resetable
- dynamic adjustable step-size
Parameters
----------
upper_limit : int
the upper limit of the counter
initial_state : int
the state after reset
reset : SignalUserTemplate
reset the counter
reset_on_limit : bool
reset counter once the upper limit is reached
start_trigger : SignalUserTemplate
event to start counting
pause_trigger : SignalUserTemplate
event to pause counting
auto_start : bool
start counting automatically
no_delay : bool
when True the new value of the counter is returned without delay
Returns
-------
SignalUserTemplate
the boolean output signal
SignalUserTemplate
event that fires when the upper limit of the counter is reached
"""
if stepwidth is None:
stepwidth = dy.int32(1)
counter = dy.signal()
reached_upper_limit = counter >= dy.int32(upper_limit)
if start_trigger is None:
start_trigger = dy.boolean(0)
#
if pause_trigger is not None:
activate_trigger = dy.logic_or(reached_upper_limit, pause_trigger)
else:
if not auto_start:
activate_trigger = reached_upper_limit
else:
# when auto_start is active, continue counting after reset on reached_upper_limit
activate_trigger = dy.boolean(0)
# state for pause/counting
paused = dy.flipflop(activate_trigger=activate_trigger, disable_trigger=start_trigger, initial_state = not auto_start, no_delay=True).set_name('paused')
# prevent counter increase
stepwidth = dy.conditional_overwrite(stepwidth, paused, 0).set_name('stepwidth')
# increase the counter until the end is reached
new_counter = counter + dy.conditional_overwrite(stepwidth, reached_upper_limit, 0)
if reset is not None:
# reset in case this is requested
new_counter = dy.conditional_overwrite(new_counter, reset, initial_state)
if reset_on_limit:
new_counter = dy.conditional_overwrite(new_counter, reached_upper_limit, initial_state)
# introduce a state variable for the counter
counter << dy.delay( new_counter, initial_state=initial_state )
if not no_delay:
return counter, reached_upper_limit
else:
return new_counter, reached_upper_limit
def toggle(trigger, initial_state=False, no_delay=False):
"""Toggle a state based on an event
Parameters
----------
period : SignalUserTemplate
the signal to trigger a state change
initial_state : int
the initial state
no_delay : bool
when true the toggle immediately reacts to a trigger (default: false)
Returns
-------
SignalUserTemplate
the boolean state signal
SignalUserTemplate
the event for activation
SignalUserTemplate
the event for deactivation
"""
state = dy.signal()
activate = dy.logic_and( dy.logic_not( state ), trigger )
deactivate = dy.logic_and( trigger , state)
state_ = dy.flipflop( activate, deactivate,
initial_state = 0,
no_delay=no_delay )
if not no_delay:
state << state_
else:
state << dy.delay(state_)
return state_, activate, deactivate
#
# signal generators
#
def signal_square(period, phase):
"""Square wave signal generator
Parameters
----------
period : SignalUserTemplate
singal or constant describing the period in samples at which the edges of the square are placed
phase : SignalUserTemplate
singal or constant describing the phase in samples at which the edges of the square are placed
Returns
-------
SignalUserTemplate
the output signal
"""
trigger = signal_periodic_impulse(period, phase)
# k, trigger = counter_triggered( upper_limit=dy.int32(period) - dy.int32(1), reset_on_limit=True )
state, activate, deactivate = toggle(trigger, no_delay=True)
return state, activate, deactivate
def signal_sinus(N_period : int = 100, phi = None):
"""Sine wave generator
Parameters
----------
N_period : SignalUserTemplate
period in sampling instants (type: constant integer)
phi : SignalUserTemplate
phase shift (signal)
Returns
-------
SignalUserTemplate
the output signal
Details
-------
The output is computed as follows:
y = sin( k * (1 / N_period * 2 * pi) + phi )
k - is the sampling index
"""
if N_period <= 0:
raise BaseException('N_period <= 0')
if phi is None:
phi = dy.float64(0.0)
i, _ = dy.counter_triggered( upper_limit=N_period-1, reset_on_limit=True )
y = dy.sin( i * dy.float64(1/N_period * 2*math.pi) + phi )
return y
def signal_step(k_step):
"""Signal generator for a step signal
Parameters
----------
k_step : SignalUserTemplate
the sampling index as returned by counter() at which the step appears.
Returns
-------
SignalUserTemplate
the output signal
"""
k = dy.counter()
y = dy.int32(k_step) <= k
return y
def signal_ramp(k_start):
"""Signal generator for a ramp signal
Parameters
----------
k_start : SignalUserTemplate
the sampling index as returned by counter() at which the ramp starts increasing.
Returns
-------
SignalUserTemplate
the output signal
Details
-------
y[k] = { 0 for k < k_start
{ k-k_start for k >= k_start
"""
k = dy.counter()
active = dy.int32(k_start) <= k
linearRise = dy.convert( (k - dy.int32(k_start) ), dy.DataTypeFloat64(1) )
activation = dy.convert( active, dy.DataTypeFloat64(1) )
return activation * linearRise
def signal_impulse(k_event):
"""Pulse signal generator
Generates a unique pulse at the sampling index k_event.
Parameters
----------
k_event : SignalUserTemplate
the sampling index at which the pulse appears
Returns
-------
SignalUserTemplate
the output signal
"""
if k_event < 0:
raise BaseException('The sampling index for the event is invalid (k_event < 0)')
k = dy.counter()
pulse_signal = dy.int32(k_event) == k
return pulse_signal
def signal_periodic_impulse(period, phase):
"""Signal generator for periodic pulses
Parameters
----------
period : SignalUserTemplate
singal or constant describing the period in samples at which the pulses are generated
phase : SignalUserTemplate
singal or constant describing the phase in samples at which the pulses are generated
Returns
-------
SignalUserTemplate
the output signal
"""
k, trigger = counter_triggered( upper_limit=dy.int32(period) - dy.int32(1), reset_on_limit=True )
pulse_signal = dy.int32(phase) == k
return pulse_signal
def signal_step_wise_sequence( time_instance_indices, values, time_scale=None, counter=None, reset=None ):
"""Signal generator for a step-wise changeing signal
Parameters
----------
time_instance_indices : List[int]
an array of sampling instants at which the signal changes its values
values : List[float]
an array of values; must have one more element than time_instance_indices
time_scale : SignalUserTemplate
multiplies all elements of time_instance_indices by the given factor (optional)
counter : SignalUserTemplate
an alternative sample counter (optional), default: counter=dy.counter()
reset : SignalUserTemplate
boolean signal to reset the sequence (optional)
Returns
-------
SignalUserTemplate
the output signal
Example
-------
time_instance_indices = [ 50, 100, 150, 250, 300, 350, 400, 450, 500 ]
values = [ 0, -1.0, 0, 1.0, 0, -1.0, 0, 0.2, -0.2, 0 ]
v = step_wise_sequence( time_instance_indices, values )
"""
if len(values) - 1 != len(time_instance_indices):
raise BaseException( "len(values) - 1 != len(time_instance_indices)" )
if counter is None:
counter = dy.counter()
indices_mem = dy.memory(datatype=dy.DataTypeInt32(1), constant_array=time_instance_indices )
values_mem = dy.memory(datatype=dy.DataTypeFloat64(1), constant_array=values )
current_index = dy.signal()
current_time_index_to_check = dy.memory_read( indices_mem, current_index )
# scale time
if time_scale is not None:
index_to_check = time_scale * current_time_index_to_check
else:
index_to_check = current_time_index_to_check
# check wether to step to the next sample
increase_index = dy.int32(0)
increase_index = dy.conditional_overwrite(increase_index, counter >= index_to_check, dy.int32(1) )
cnt_, _ = dy.counter_triggered(upper_limit=len(time_instance_indices), stepwidth=increase_index, reset=reset )
current_index << cnt_
val = dy.memory_read(values_mem, current_index)
return val
def play( sequence_array, stepwidth=None, initial_state = 0, reset=None, reset_on_end:bool=False, start_trigger=None, pause_trigger=None, auto_start:bool=True, no_delay:bool=False ):
"""Play a given sequence of samples
Parameters
----------
sequence_array : list[float]
the sequence given as a list of values
reset : SignalUserTemplate
reset playback and start from the beginning
reset_on_end : SignalUserTemplate
reset playback once the end is reached (repetitive playback)
start_trigger : SignalUserTemplate
event to start playback
pause_trigger : SignalUserTemplate
event to pause playback
auto_start : bool
start playback automatically
Returns
-------
SignalUserTemplate
the value obtained from sequence_array
SignalUserTemplate
the current position of playback (index of the currently issued sequence element)
"""
sequence_array_storage = dy.memory(datatype=dy.DataTypeFloat64(1), constant_array=sequence_array )
# if prevent_initial_playback:
# initial_counter_state = np.size(sequence_array)
# else:
playback_index, _ = counter_triggered(
upper_limit=np.size(sequence_array)-1,
stepwidth=stepwidth, initial_state=initial_state,
reset=reset, reset_on_limit=reset_on_end,
start_trigger=start_trigger, pause_trigger=pause_trigger,
auto_start=auto_start,
no_delay=no_delay
)
# sample the given data
sample = dy.memory_read(sequence_array_storage, playback_index)
return sample, playback_index
#
# Filters
#
def diff(u : SignalUserTemplate, initial_state = None):
"""Discrete difference
Parameters
----------
u : SignalUserTemplate
the input signal
initial_state : float, SignalUserTemplate
the initial state
Returns
-------
SignalUserTemplate
the output signal of the filter
Details:
--------
y = u[k] - u[k-1]
initial state
u[0] = initial_state in case initial_state is not None
u[0] = 0 otherwise
"""
i = dy.delay( u, initial_state )
y = dy.add( [ i, u ], [ -1, 1 ] )
return y
def sum(u : SignalUserTemplate, initial_state=0, no_delay=False):
"""Accumulative sum
Parameters
----------
u : SignalUserTemplate
the input signal
initial_state : float, SignalUserTemplate
the initial state
no_delay : bool
when true the output is not delayed
Returns
-------
SignalUserTemplate
the output signal of the filter
Details:
--------
The difference equation
y[k+1] = y[k] + u[k]
is evaluated. The return value is either
y[k] by default or when no_delay == False
or
y[k+1] in case no_delay == True .
"""
y_k = dy.signal()
y_kp1 = y_k + u
y_k << dy.delay(y_kp1, initial_state=initial_state)
if no_delay:
return y_kp1
else:
return y_k
def sum2(u : SignalUserTemplate, initial_state=0):
"""Accumulative sum
Parameters
----------
u : SignalUserTemplate
the input signal
initial_state : float, SignalUserTemplate
the initial state
Returns
-------
SignalUserTemplate
the output signal of the filter
Details:
--------
The difference equation
y[k+1] = y[k] + u[k]
is evaluated. The return values are
y[k], y[k+1]
"""
y_k = dy.signal()
y_kp1 = y_k + u
y_k << dy.delay(y_kp1, initial_state=initial_state)
return y_k, y_kp1
def euler_integrator( u : SignalUserTemplate, Ts, initial_state = 0.0):
"""Euler (forward) integrator
Parameters
----------
u : SignalUserTemplate
the input signal
Ts : float
the sampling time
initial_state : float, SignalUserTemplate
the initial state of the integrator
Returns
-------
SignalUserTemplate
the output signal of the filter
Details:
--------
y[k+1] = y[k] + Ts * u[k]
"""
yFb = dy.signal()
if not isinstance( Ts, SignalUserTemplate ):
i = dy.add( [ yFb, u ], [ 1, Ts ] )
else:
i = yFb + Ts * u
y = dy.delay( i, initial_state )
yFb << y
return y
def dtf_lowpass_1_order(u : SignalUserTemplate, z_infinity):
"""First-order discrete-time low pass filter
Parameters
----------
u : SignalUserTemplate
the input signal
Returns
-------
SignalUserTemplate
the output signal of the filter
Details:
--------
1 - z_infinity
H (z) = --------------
z - z_infinity
"""
zinf = dy.float64( z_infinity )
zinf_ = dy.float64( 1 ) - zinf
y_delayed = dy.signal()
y = zinf * y_delayed + zinf_ * u
y_delayed << dy.delay(y)
return y
def transfer_function_discrete(u : SignalUserTemplate, num_coeff : t.List[float], den_coeff : t.List[float] ):
"""Discrete time transfer function
Parameters
----------
u : SignalUserTemplate
the input signal
num_coeff : List[float]
a list of numerator coefficients of the transfer function
den_coeff : List[float]
a list of denominator coefficients of the transfer function
Returns
-------
SignalUserTemplate
the output signal of the filter
Details:
--------
This filter realizes a discrete-time transfer function by using 'direct form II'
c.f. https://en.wikipedia.org/wiki/Digital_filter .
b0 + b1 z^-1 + b2 z^-2 + ... + bN z^-N
H(z) = ----------------------------------------
1 + a1 z^-1 + a2 z^-2 + ... + aM z^-M
The coefficient vectors num_coeff and den_coeff describe the numerator and
denominator polynomials, respectively, and are defined as follows:
num_coeff = [b0, b1, .., bN]
den_coeff = [a1, a2, ... aM] .
"""
# get filter order
N = len(num_coeff)-1
# feedback start signal
z_pre = dy.signal()
# array to store state signals
z_ = []
# create delay chain
z_iterate = z_pre
for i in range(0,N):
z_iterate = dy.delay( z_iterate ) .extend_name('_z' + str(i) )
z_.append( z_iterate )
# build feedback path
#
# a1 = den_coeff[0]
# a2 = den_coeff[1]
# a3 = den_coeff[2]
# ...
sum_feedback = u
for i in range(0,N):
a_ip1 = dy.float64( den_coeff[i] ).extend_name('_a' + str(i+1) )
sum_feedback = sum_feedback - a_ip1 * z_[i]
sum_feedback.extend_name('_i')
# close the feedback loop
z_pre << sum_feedback
# build output path
#
# b0 = num_coeff[0]
# b1 = num_coeff[1]
# b2 = num_coeff[2]
# ...
for i in range(0,N+1):
b_i = dy.float64( num_coeff[i] ).extend_name('_b' + str(i) )
if i==0:
y = b_i * sum_feedback
else:
y = y + b_i * z_[i-1]
# y is the filter output
return y
#
# Control
#
def PID_controller(r, y, Ts, kp, ki = None, kd = None):
"""Discrete-time PID-controller
Parameters
----------
r : SignalUserTemplate
the reference signal
y : SignalUserTemplate
the measured plant output
Ts : float
the sampleing time
kp : float
the parameter kp (proportional)
ki : float
the parameter ki (integral)
kd : float
the parameter kd (differential)
Returns
-------
SignalUserTemplate
the control variable u
"""
Ts = dy.float64(Ts)
# control error
e = r - y
# P
u = kp * e
# D
if kd is not None:
u = u + dy.diff(e) * kd / Ts
# I
if ki is not None:
u = u + dy.sum(e) * ki * Ts
return u
Functions
def PID_controller(r, y, Ts, kp, ki=None, kd=None)
-
Discrete-time PID-controller
Parameters
r
:SignalUserTemplate
- the reference signal
y
:SignalUserTemplate
- the measured plant output
Ts
:float
- the sampleing time
kp
:float
- the parameter kp (proportional)
ki
:float
- the parameter ki (integral)
kd
:float
- the parameter kd (differential)
Returns
SignalUserTemplate
- the control variable u
Expand source code
def PID_controller(r, y, Ts, kp, ki = None, kd = None): """Discrete-time PID-controller Parameters ---------- r : SignalUserTemplate the reference signal y : SignalUserTemplate the measured plant output Ts : float the sampleing time kp : float the parameter kp (proportional) ki : float the parameter ki (integral) kd : float the parameter kd (differential) Returns ------- SignalUserTemplate the control variable u """ Ts = dy.float64(Ts) # control error e = r - y # P u = kp * e # D if kd is not None: u = u + dy.diff(e) * kd / Ts # I if ki is not None: u = u + dy.sum(e) * ki * Ts return u
def boolean(value)
-
Cast anything to DataTypeBoolean
Convert the input value to a signal.
Parameters
value
:SignalUserTemplate, int
- the signal or a constant value to convert to a signal
Returns
SignalUserTemplate
- the signal of type boolean
Expand source code
def boolean(value): """Cast anything to DataTypeBoolean Convert the input value to a signal. Parameters ---------- value : SignalUserTemplate, int the signal or a constant value to convert to a signal Returns ------- SignalUserTemplate the signal of type boolean """ if isinstance( value, SignalUserTemplate ): # already a singal return value else: # create a new constant return dy.const(value, dy.DataTypeBoolean(1) )
def counter()
-
Basic counter - the sampling index k
The integer output is increasing with each sampling instant by 1. Counting starts at zero.
Returns
SignalUserTemplate
- the integer output signal describing the sampling index k
Expand source code
def counter(): """Basic counter - the sampling index k The integer output is increasing with each sampling instant by 1. Counting starts at zero. Returns ------- SignalUserTemplate the integer output signal describing the sampling index k """ if not 'counter' in dy.get_system_context().components: # no counter has been defined in this system so far. Hence, create one. increase = dy.const(1, dy.DataTypeInt32(1) ) cnt = dy.signal() tmp = dy.delay(cnt + increase) cnt << tmp tmp.set_name('shared_counter') # store the output signal of the counter as it might be used again. dy.get_system_context().components['counter'] = __Counter(tmp) else: # use the output of an already created counter tmp = dy.get_system_context().components['counter'].output return tmp
def counter_triggered(upper_limit, stepwidth=None, initial_state=0, reset=None, reset_on_limit: bool = False, start_trigger=None, pause_trigger=None, auto_start: bool = True, no_delay: bool = False)
-
A generic counter
Features: - upper limit - triggerable start/pause - resetable - dynamic adjustable step-size
Parameters
upper_limit
:int
- the upper limit of the counter
initial_state
:int
- the state after reset
reset
:SignalUserTemplate
- reset the counter
reset_on_limit
:bool
- reset counter once the upper limit is reached
start_trigger
:SignalUserTemplate
- event to start counting
pause_trigger
:SignalUserTemplate
- event to pause counting
auto_start
:bool
- start counting automatically
no_delay
:bool
- when True the new value of the counter is returned without delay
Returns
SignalUserTemplate
- the boolean output signal
SignalUserTemplate
- event that fires when the upper limit of the counter is reached
Expand source code
def counter_triggered( upper_limit, stepwidth=None, initial_state = 0, reset=None, reset_on_limit:bool=False, start_trigger=None, pause_trigger=None, auto_start:bool=True, no_delay:bool=False ): """A generic counter Features: - upper limit - triggerable start/pause - resetable - dynamic adjustable step-size Parameters ---------- upper_limit : int the upper limit of the counter initial_state : int the state after reset reset : SignalUserTemplate reset the counter reset_on_limit : bool reset counter once the upper limit is reached start_trigger : SignalUserTemplate event to start counting pause_trigger : SignalUserTemplate event to pause counting auto_start : bool start counting automatically no_delay : bool when True the new value of the counter is returned without delay Returns ------- SignalUserTemplate the boolean output signal SignalUserTemplate event that fires when the upper limit of the counter is reached """ if stepwidth is None: stepwidth = dy.int32(1) counter = dy.signal() reached_upper_limit = counter >= dy.int32(upper_limit) if start_trigger is None: start_trigger = dy.boolean(0) # if pause_trigger is not None: activate_trigger = dy.logic_or(reached_upper_limit, pause_trigger) else: if not auto_start: activate_trigger = reached_upper_limit else: # when auto_start is active, continue counting after reset on reached_upper_limit activate_trigger = dy.boolean(0) # state for pause/counting paused = dy.flipflop(activate_trigger=activate_trigger, disable_trigger=start_trigger, initial_state = not auto_start, no_delay=True).set_name('paused') # prevent counter increase stepwidth = dy.conditional_overwrite(stepwidth, paused, 0).set_name('stepwidth') # increase the counter until the end is reached new_counter = counter + dy.conditional_overwrite(stepwidth, reached_upper_limit, 0) if reset is not None: # reset in case this is requested new_counter = dy.conditional_overwrite(new_counter, reset, initial_state) if reset_on_limit: new_counter = dy.conditional_overwrite(new_counter, reached_upper_limit, initial_state) # introduce a state variable for the counter counter << dy.delay( new_counter, initial_state=initial_state ) if not no_delay: return counter, reached_upper_limit else: return new_counter, reached_upper_limit
def delay(u, initial_state=None)
-
Unit delay
Delay the input u by one sampling instant:
y[k+1] = u[k], y[0] = initial_state
Parameters
u
:SignalUserTemplate
- the input signal to delay
initial_state
:SignalUserTemplate
- the initial state (signal or constant value)
Returns
SignalUserTemplate
- the one-step delayed input
Expand source code
def delay(u , initial_state = None): """Unit delay Delay the input u by one sampling instant: y[k+1] = u[k], y[0] = initial_state Parameters ---------- u : SignalUserTemplate the input signal to delay initial_state : SignalUserTemplate the initial state (signal or constant value) Returns ------- SignalUserTemplate the one-step delayed input """ if not isinstance( initial_state, SignalUserTemplate ): return dy.delay__( u, initial_state ) else: event_on_first_sample = initial_event() delayed_input = dy.delay__( u, None ) delayed_input = dy.conditional_overwrite( delayed_input, event_on_first_sample, initial_state ) return delayed_input
def diff(u: SignalUserTemplate, initial_state=None)
-
Discrete difference
Parameters
u
:SignalUserTemplate
- the input signal
initial_state
:float, SignalUserTemplate
- the initial state
Returns
SignalUserTemplate
- the output signal of the filter
Details:
y = u[k] - u[k-1]
initial state
u[0] = initial_state in case initial_state is not None u[0] = 0 otherwise
Expand source code
def diff(u : SignalUserTemplate, initial_state = None): """Discrete difference Parameters ---------- u : SignalUserTemplate the input signal initial_state : float, SignalUserTemplate the initial state Returns ------- SignalUserTemplate the output signal of the filter Details: -------- y = u[k] - u[k-1] initial state u[0] = initial_state in case initial_state is not None u[0] = 0 otherwise """ i = dy.delay( u, initial_state ) y = dy.add( [ i, u ], [ -1, 1 ] ) return y
def dtf_lowpass_1_order(u: SignalUserTemplate, z_infinity)
-
First-order discrete-time low pass filter
Parameters
u
:SignalUserTemplate
- the input signal
Returns
SignalUserTemplate
- the output signal of the filter
Details:
1 - z_infinity H (z) = -------------- z - z_infinity
Expand source code
def dtf_lowpass_1_order(u : SignalUserTemplate, z_infinity): """First-order discrete-time low pass filter Parameters ---------- u : SignalUserTemplate the input signal Returns ------- SignalUserTemplate the output signal of the filter Details: -------- 1 - z_infinity H (z) = -------------- z - z_infinity """ zinf = dy.float64( z_infinity ) zinf_ = dy.float64( 1 ) - zinf y_delayed = dy.signal() y = zinf * y_delayed + zinf_ * u y_delayed << dy.delay(y) return y
def euler_integrator(u: SignalUserTemplate, Ts, initial_state=0.0)
-
Euler (forward) integrator
Parameters
u
:SignalUserTemplate
- the input signal
Ts
:float
- the sampling time
initial_state
:float, SignalUserTemplate
- the initial state of the integrator
Returns
SignalUserTemplate
- the output signal of the filter
Details:
y[k+1] = y[k] + Ts * u[k]
Expand source code
def euler_integrator( u : SignalUserTemplate, Ts, initial_state = 0.0): """Euler (forward) integrator Parameters ---------- u : SignalUserTemplate the input signal Ts : float the sampling time initial_state : float, SignalUserTemplate the initial state of the integrator Returns ------- SignalUserTemplate the output signal of the filter Details: -------- y[k+1] = y[k] + Ts * u[k] """ yFb = dy.signal() if not isinstance( Ts, SignalUserTemplate ): i = dy.add( [ yFb, u ], [ 1, Ts ] ) else: i = yFb + Ts * u y = dy.delay( i, initial_state ) yFb << y return y
def float64(value)
-
Cast anything to DataTypeFloat64
Convert the input value to a signal.
Parameters
value
:SignalUserTemplate, float
- the signal or a constant value to convert to a signal
Returns
SignalUserTemplate
- the signal of type float64
Expand source code
def float64(value): """Cast anything to DataTypeFloat64 Convert the input value to a signal. Parameters ---------- value : SignalUserTemplate, float the signal or a constant value to convert to a signal Returns ------- SignalUserTemplate the signal of type float64 """ if isinstance( value, SignalUserTemplate ): # already a singal return value else: # create a new constant return dy.const(value, dy.DataTypeFloat64(1) )
def initial_event()
-
Emits an event on the first sampling instant after the reset of the system
Returns
SignalUserTemplate
- the signal of type boolean containing the event
Expand source code
def initial_event(): """Emits an event on the first sampling instant after the reset of the system Returns ------- SignalUserTemplate the signal of type boolean containing the event """ # TODO: introduce caching like done for counter() return dy.counter() == int32(0)
def int32(value)
-
Cast anything to DataTypeInt32
Convert the input value to a signal.
Parameters
value
:SignalUserTemplate, int
- the signal or a constant value to convert to a signal
Returns
SignalUserTemplate
- the signal of type int32
Expand source code
def int32(value): """Cast anything to DataTypeInt32 Convert the input value to a signal. Parameters ---------- value : SignalUserTemplate, int the signal or a constant value to convert to a signal Returns ------- SignalUserTemplate the signal of type int32 """ if isinstance( value, SignalUserTemplate ): # already a singal return value else: # create a new constant return dy.const(value, dy.DataTypeInt32(1) )
def play(sequence_array, stepwidth=None, initial_state=0, reset=None, reset_on_end: bool = False, start_trigger=None, pause_trigger=None, auto_start: bool = True, no_delay: bool = False)
-
Play a given sequence of samples
Parameters
sequence_array
:list[float]
- the sequence given as a list of values
reset
:SignalUserTemplate
- reset playback and start from the beginning
reset_on_end
:SignalUserTemplate
- reset playback once the end is reached (repetitive playback)
start_trigger
:SignalUserTemplate
- event to start playback
pause_trigger
:SignalUserTemplate
- event to pause playback
auto_start
:bool
- start playback automatically
Returns
SignalUserTemplate
- the value obtained from sequence_array
SignalUserTemplate
- the current position of playback (index of the currently issued sequence element)
Expand source code
def play( sequence_array, stepwidth=None, initial_state = 0, reset=None, reset_on_end:bool=False, start_trigger=None, pause_trigger=None, auto_start:bool=True, no_delay:bool=False ): """Play a given sequence of samples Parameters ---------- sequence_array : list[float] the sequence given as a list of values reset : SignalUserTemplate reset playback and start from the beginning reset_on_end : SignalUserTemplate reset playback once the end is reached (repetitive playback) start_trigger : SignalUserTemplate event to start playback pause_trigger : SignalUserTemplate event to pause playback auto_start : bool start playback automatically Returns ------- SignalUserTemplate the value obtained from sequence_array SignalUserTemplate the current position of playback (index of the currently issued sequence element) """ sequence_array_storage = dy.memory(datatype=dy.DataTypeFloat64(1), constant_array=sequence_array ) # if prevent_initial_playback: # initial_counter_state = np.size(sequence_array) # else: playback_index, _ = counter_triggered( upper_limit=np.size(sequence_array)-1, stepwidth=stepwidth, initial_state=initial_state, reset=reset, reset_on_limit=reset_on_end, start_trigger=start_trigger, pause_trigger=pause_trigger, auto_start=auto_start, no_delay=no_delay ) # sample the given data sample = dy.memory_read(sequence_array_storage, playback_index) return sample, playback_index
def rate_limit(u, Ts, lower_limit, upper_limit, initial_state=0)
-
Rate limiter
Parameters
Ts
:SignalUserTemplate
- sampling time (constant)
lower_limit
:SignalUserTemplate
- lower rate limit
upper_limit
:SignalUserTemplate
- upper rate limit
Returns
SignalUserTemplate
- the output signal
Expand source code
def rate_limit( u, Ts, lower_limit, upper_limit, initial_state = 0 ): """Rate limiter Parameters ---------- Ts : SignalUserTemplate sampling time (constant) lower_limit : SignalUserTemplate lower rate limit upper_limit : SignalUserTemplate upper rate limit Returns ------- SignalUserTemplate the output signal """ Ts_ = float64(Ts) y = dy.signal() omega = u - y omega_sat = saturate(omega, float64(lower_limit) * Ts_, float64(upper_limit) * Ts_) y << euler_integrator( omega_sat, 1, initial_state=initial_state) return y
def sample_and_hold(u, event, initial_state=None)
-
Sample & hold
Samples the input when event is true and hold this value for the proceeding time instants.
Parameters
u
:SignalUserTemplate
- the input to sample
event
:SignalUserTemplate
- the event on which sampling of the input is performed
initial_state
:SignalUserTemplate
- the initial output
Returns
SignalUserTemplate
- the sampled input
Expand source code
def sample_and_hold(u, event, initial_state = None): """Sample & hold Samples the input when event is true and hold this value for the proceeding time instants. Parameters ---------- u : SignalUserTemplate the input to sample event : SignalUserTemplate the event on which sampling of the input is performed initial_state : SignalUserTemplate the initial output Returns ------- SignalUserTemplate the sampled input """ # NOTE: this could be implemented in a more comp. efficient way directly in C in block_prototypes.py y = dy.signal() delayed_y = delay( y, initial_state ) y << dy.conditional_overwrite( delayed_y, event, u ) return y
def saturate(u, lower_limit=None, upper_limit=None)
-
Saturation
The output is the saturated input
Parameters
lower_limit
:SignalUserTemplate
- lower bound for the output
upper_limit
:SignalUserTemplate
- upper bound for the output
Returns
SignalUserTemplate
- the integer output signal
Details
{ lower_limit for u < lower_limit y = { u otherwise { upper_limit for u > upper_limit
Expand source code
def saturate(u, lower_limit = None, upper_limit = None): """Saturation The output is the saturated input Parameters ---------- lower_limit : SignalUserTemplate lower bound for the output upper_limit : SignalUserTemplate upper bound for the output Returns ------- SignalUserTemplate the integer output signal Details ------- { lower_limit for u < lower_limit y = { u otherwise { upper_limit for u > upper_limit """ y = u if lower_limit is not None: y = dy.conditional_overwrite( y, y < float64(lower_limit), lower_limit ) if upper_limit is not None: y = dy.conditional_overwrite( y, y > float64(upper_limit), upper_limit ) return y
def signal_impulse(k_event)
-
Pulse signal generator
Generates a unique pulse at the sampling index k_event.
Parameters
k_event
:SignalUserTemplate
- the sampling index at which the pulse appears
Returns
SignalUserTemplate
- the output signal
Expand source code
def signal_impulse(k_event): """Pulse signal generator Generates a unique pulse at the sampling index k_event. Parameters ---------- k_event : SignalUserTemplate the sampling index at which the pulse appears Returns ------- SignalUserTemplate the output signal """ if k_event < 0: raise BaseException('The sampling index for the event is invalid (k_event < 0)') k = dy.counter() pulse_signal = dy.int32(k_event) == k return pulse_signal
def signal_periodic_impulse(period, phase)
-
Signal generator for periodic pulses
Parameters
period
:SignalUserTemplate
- singal or constant describing the period in samples at which the pulses are generated
phase
:SignalUserTemplate
- singal or constant describing the phase in samples at which the pulses are generated
Returns
SignalUserTemplate
- the output signal
Expand source code
def signal_periodic_impulse(period, phase): """Signal generator for periodic pulses Parameters ---------- period : SignalUserTemplate singal or constant describing the period in samples at which the pulses are generated phase : SignalUserTemplate singal or constant describing the phase in samples at which the pulses are generated Returns ------- SignalUserTemplate the output signal """ k, trigger = counter_triggered( upper_limit=dy.int32(period) - dy.int32(1), reset_on_limit=True ) pulse_signal = dy.int32(phase) == k return pulse_signal
def signal_ramp(k_start)
-
Signal generator for a ramp signal
Parameters
k_start
:SignalUserTemplate
- the sampling index as returned by counter() at which the ramp starts increasing.
Returns
SignalUserTemplate
- the output signal
Details
y[k] = { 0 for k < k_start { k-k_start for k >= k_start
Expand source code
def signal_ramp(k_start): """Signal generator for a ramp signal Parameters ---------- k_start : SignalUserTemplate the sampling index as returned by counter() at which the ramp starts increasing. Returns ------- SignalUserTemplate the output signal Details ------- y[k] = { 0 for k < k_start { k-k_start for k >= k_start """ k = dy.counter() active = dy.int32(k_start) <= k linearRise = dy.convert( (k - dy.int32(k_start) ), dy.DataTypeFloat64(1) ) activation = dy.convert( active, dy.DataTypeFloat64(1) ) return activation * linearRise
def signal_sinus(N_period: int = 100, phi=None)
-
Sine wave generator
Parameters
N_period
:SignalUserTemplate
- period in sampling instants (type: constant integer)
phi
:SignalUserTemplate
- phase shift (signal)
Returns
SignalUserTemplate
- the output signal
Details
The output is computed as follows:
y = sin( k * (1 / N_period * 2 * pi) + phi )
k - is the sampling index
Expand source code
def signal_sinus(N_period : int = 100, phi = None): """Sine wave generator Parameters ---------- N_period : SignalUserTemplate period in sampling instants (type: constant integer) phi : SignalUserTemplate phase shift (signal) Returns ------- SignalUserTemplate the output signal Details ------- The output is computed as follows: y = sin( k * (1 / N_period * 2 * pi) + phi ) k - is the sampling index """ if N_period <= 0: raise BaseException('N_period <= 0') if phi is None: phi = dy.float64(0.0) i, _ = dy.counter_triggered( upper_limit=N_period-1, reset_on_limit=True ) y = dy.sin( i * dy.float64(1/N_period * 2*math.pi) + phi ) return y
def signal_square(period, phase)
-
Square wave signal generator
Parameters
period
:SignalUserTemplate
- singal or constant describing the period in samples at which the edges of the square are placed
phase
:SignalUserTemplate
- singal or constant describing the phase in samples at which the edges of the square are placed
Returns
SignalUserTemplate
- the output signal
Expand source code
def signal_square(period, phase): """Square wave signal generator Parameters ---------- period : SignalUserTemplate singal or constant describing the period in samples at which the edges of the square are placed phase : SignalUserTemplate singal or constant describing the phase in samples at which the edges of the square are placed Returns ------- SignalUserTemplate the output signal """ trigger = signal_periodic_impulse(period, phase) # k, trigger = counter_triggered( upper_limit=dy.int32(period) - dy.int32(1), reset_on_limit=True ) state, activate, deactivate = toggle(trigger, no_delay=True) return state, activate, deactivate
def signal_step(k_step)
-
Signal generator for a step signal
Parameters
k_step
:SignalUserTemplate
- the sampling index as returned by counter() at which the step appears.
Returns
SignalUserTemplate
- the output signal
Expand source code
def signal_step(k_step): """Signal generator for a step signal Parameters ---------- k_step : SignalUserTemplate the sampling index as returned by counter() at which the step appears. Returns ------- SignalUserTemplate the output signal """ k = dy.counter() y = dy.int32(k_step) <= k return y
def signal_step_wise_sequence(time_instance_indices, values, time_scale=None, counter=None, reset=None)
-
Signal generator for a step-wise changeing signal
Parameters
time_instance_indices
:List[int]
- an array of sampling instants at which the signal changes its values
values
:List[float]
- an array of values; must have one more element than time_instance_indices
time_scale
:SignalUserTemplate
- multiplies all elements of time_instance_indices by the given factor (optional)
counter
:SignalUserTemplate
- an alternative sample counter (optional), default: counter=dy.counter()
reset
:SignalUserTemplate
- boolean signal to reset the sequence (optional)
Returns
SignalUserTemplate
- the output signal
Example
time_instance_indices = [ 50, 100, 150, 250, 300, 350, 400, 450, 500 ] values = [ 0, -1.0, 0, 1.0, 0, -1.0, 0, 0.2, -0.2, 0 ] v = step_wise_sequence( time_instance_indices, values )
Expand source code
def signal_step_wise_sequence( time_instance_indices, values, time_scale=None, counter=None, reset=None ): """Signal generator for a step-wise changeing signal Parameters ---------- time_instance_indices : List[int] an array of sampling instants at which the signal changes its values values : List[float] an array of values; must have one more element than time_instance_indices time_scale : SignalUserTemplate multiplies all elements of time_instance_indices by the given factor (optional) counter : SignalUserTemplate an alternative sample counter (optional), default: counter=dy.counter() reset : SignalUserTemplate boolean signal to reset the sequence (optional) Returns ------- SignalUserTemplate the output signal Example ------- time_instance_indices = [ 50, 100, 150, 250, 300, 350, 400, 450, 500 ] values = [ 0, -1.0, 0, 1.0, 0, -1.0, 0, 0.2, -0.2, 0 ] v = step_wise_sequence( time_instance_indices, values ) """ if len(values) - 1 != len(time_instance_indices): raise BaseException( "len(values) - 1 != len(time_instance_indices)" ) if counter is None: counter = dy.counter() indices_mem = dy.memory(datatype=dy.DataTypeInt32(1), constant_array=time_instance_indices ) values_mem = dy.memory(datatype=dy.DataTypeFloat64(1), constant_array=values ) current_index = dy.signal() current_time_index_to_check = dy.memory_read( indices_mem, current_index ) # scale time if time_scale is not None: index_to_check = time_scale * current_time_index_to_check else: index_to_check = current_time_index_to_check # check wether to step to the next sample increase_index = dy.int32(0) increase_index = dy.conditional_overwrite(increase_index, counter >= index_to_check, dy.int32(1) ) cnt_, _ = dy.counter_triggered(upper_limit=len(time_instance_indices), stepwidth=increase_index, reset=reset ) current_index << cnt_ val = dy.memory_read(values_mem, current_index) return val
def sum(u: SignalUserTemplate, initial_state=0, no_delay=False)
-
Accumulative sum
Parameters
u
:SignalUserTemplate
- the input signal
initial_state
:float, SignalUserTemplate
- the initial state
no_delay
:bool
- when true the output is not delayed
Returns
SignalUserTemplate
- the output signal of the filter
Details:
The difference equation
y[k+1] = y[k] + u[k]
is evaluated. The return value is either
y[k] by default or when no_delay == False
or
y[k+1] in case no_delay == True .
Expand source code
def sum(u : SignalUserTemplate, initial_state=0, no_delay=False): """Accumulative sum Parameters ---------- u : SignalUserTemplate the input signal initial_state : float, SignalUserTemplate the initial state no_delay : bool when true the output is not delayed Returns ------- SignalUserTemplate the output signal of the filter Details: -------- The difference equation y[k+1] = y[k] + u[k] is evaluated. The return value is either y[k] by default or when no_delay == False or y[k+1] in case no_delay == True . """ y_k = dy.signal() y_kp1 = y_k + u y_k << dy.delay(y_kp1, initial_state=initial_state) if no_delay: return y_kp1 else: return y_k
def sum2(u: SignalUserTemplate, initial_state=0)
-
Accumulative sum
Parameters
u
:SignalUserTemplate
- the input signal
initial_state
:float, SignalUserTemplate
- the initial state
Returns
SignalUserTemplate
- the output signal of the filter
Details:
The difference equation
y[k+1] = y[k] + u[k]
is evaluated. The return values are
y[k], y[k+1]
Expand source code
def sum2(u : SignalUserTemplate, initial_state=0): """Accumulative sum Parameters ---------- u : SignalUserTemplate the input signal initial_state : float, SignalUserTemplate the initial state Returns ------- SignalUserTemplate the output signal of the filter Details: -------- The difference equation y[k+1] = y[k] + u[k] is evaluated. The return values are y[k], y[k+1] """ y_k = dy.signal() y_kp1 = y_k + u y_k << dy.delay(y_kp1, initial_state=initial_state) return y_k, y_kp1
def toggle(trigger, initial_state=False, no_delay=False)
-
Toggle a state based on an event
Parameters
period
:SignalUserTemplate
- the signal to trigger a state change
initial_state
:int
- the initial state
no_delay
:bool
- when true the toggle immediately reacts to a trigger (default: false)
Returns
SignalUserTemplate
- the boolean state signal
SignalUserTemplate
- the event for activation
SignalUserTemplate
- the event for deactivation
Expand source code
def toggle(trigger, initial_state=False, no_delay=False): """Toggle a state based on an event Parameters ---------- period : SignalUserTemplate the signal to trigger a state change initial_state : int the initial state no_delay : bool when true the toggle immediately reacts to a trigger (default: false) Returns ------- SignalUserTemplate the boolean state signal SignalUserTemplate the event for activation SignalUserTemplate the event for deactivation """ state = dy.signal() activate = dy.logic_and( dy.logic_not( state ), trigger ) deactivate = dy.logic_and( trigger , state) state_ = dy.flipflop( activate, deactivate, initial_state = 0, no_delay=no_delay ) if not no_delay: state << state_ else: state << dy.delay(state_) return state_, activate, deactivate
def transfer_function_discrete(u: SignalUserTemplate, num_coeff: List[float], den_coeff: List[float])
-
Discrete time transfer function
Parameters
u
:SignalUserTemplate
- the input signal
num_coeff
:List[float]
- a list of numerator coefficients of the transfer function
den_coeff
:List[float]
- a list of denominator coefficients of the transfer function
Returns
SignalUserTemplate
- the output signal of the filter
Details:
This filter realizes a discrete-time transfer function by using 'direct form II' c.f. https://en.wikipedia.org/wiki/Digital_filter .
b0 + b1 z^-1 + b2 z^-2 + ... + bN z^-N H(z) = ---------------------------------------- 1 + a1 z^-1 + a2 z^-2 + ... + aM z^-M
The coefficient vectors num_coeff and den_coeff describe the numerator and denominator polynomials, respectively, and are defined as follows:
num_coeff = [b0, b1, .., bN] den_coeff = [a1, a2, ... aM] .
Expand source code
def transfer_function_discrete(u : SignalUserTemplate, num_coeff : t.List[float], den_coeff : t.List[float] ): """Discrete time transfer function Parameters ---------- u : SignalUserTemplate the input signal num_coeff : List[float] a list of numerator coefficients of the transfer function den_coeff : List[float] a list of denominator coefficients of the transfer function Returns ------- SignalUserTemplate the output signal of the filter Details: -------- This filter realizes a discrete-time transfer function by using 'direct form II' c.f. https://en.wikipedia.org/wiki/Digital_filter . b0 + b1 z^-1 + b2 z^-2 + ... + bN z^-N H(z) = ---------------------------------------- 1 + a1 z^-1 + a2 z^-2 + ... + aM z^-M The coefficient vectors num_coeff and den_coeff describe the numerator and denominator polynomials, respectively, and are defined as follows: num_coeff = [b0, b1, .., bN] den_coeff = [a1, a2, ... aM] . """ # get filter order N = len(num_coeff)-1 # feedback start signal z_pre = dy.signal() # array to store state signals z_ = [] # create delay chain z_iterate = z_pre for i in range(0,N): z_iterate = dy.delay( z_iterate ) .extend_name('_z' + str(i) ) z_.append( z_iterate ) # build feedback path # # a1 = den_coeff[0] # a2 = den_coeff[1] # a3 = den_coeff[2] # ... sum_feedback = u for i in range(0,N): a_ip1 = dy.float64( den_coeff[i] ).extend_name('_a' + str(i+1) ) sum_feedback = sum_feedback - a_ip1 * z_[i] sum_feedback.extend_name('_i') # close the feedback loop z_pre << sum_feedback # build output path # # b0 = num_coeff[0] # b1 = num_coeff[1] # b2 = num_coeff[2] # ... for i in range(0,N+1): b_i = dy.float64( num_coeff[i] ).extend_name('_b' + str(i) ) if i==0: y = b_i * sum_feedback else: y = y + b_i * z_[i-1] # y is the filter output return y
def unwrap_angle(angle, normalize_around_zero=False)
-
Unwrap an angle
Unwrap and normalize the input angle to the range
[0, 2*pi[ in case normalize_around_zero == false or [-pi, pi] in case normalize_around_zero == true
Parameters
angle
:SignalUserTemplate
- the input signal (angle in radians)
Returns
SignalUserTemplate
- the output signal
Expand source code
def unwrap_angle(angle, normalize_around_zero = False): """Unwrap an angle Unwrap and normalize the input angle to the range [0, 2*pi[ in case normalize_around_zero == false or [-pi, pi] in case normalize_around_zero == true Parameters ---------- angle : SignalUserTemplate the input signal (angle in radians) Returns ------- SignalUserTemplate the output signal """ def normalize_around_zero(angle): """ Normalize an angle Normalize an angle to a range [-pi, pi] Important: the assumed range for the input is - 2*pi <= angle <= 2*p """ tmp = angle + dy.conditional_overwrite( dy.float64(0), angle <= float64(-math.pi), 2*math.pi ) normalized_angle = tmp + dy.conditional_overwrite( dy.float64(0), angle > float64(math.pi), -2*math.pi ) return normalized_angle # # angle_ = dy.fmod(angle, dy.float64(2*math.pi) ) unwrapped_angle = angle_ + dy.conditional_overwrite( dy.float64(0), angle_ < float64(0), 2*math.pi ) if normalize_around_zero: return normalize_around_zero(unwrapped_angle) else: return unwrapped_angle