Module openrtdynamics2.lang.block_prototypes_switched_subsystems
Expand source code
from .diagram_core.system import System
from .diagram_core import datatypes as dt
from .diagram_core.signal_network.signals import Signal
from .diagram_core import code_generation_helper as cgh
from . import block_interface as bi
from .block_prototypes_subsystems import *
class MultiSubsystemEmbedder(bi.BlockPrototype):
"""
Prototype for a block that includes multiple sub-systems whose outputs are joint in a switching manner
(this is class to be derived, e.g. by StatemachineSwitchSubsystems, SwitchSubsystems)
+----------------------------------------------------------------------------------------------------+
| normal inputs (determinded on compile_callback_all_subsystems_compiled) |
| +----------+ |
self._list_of_all_subsystem_inputs | | normal outputs |
+---------------------------------+-------> subsys 1 +--------------+ |
| | | | | |
| | | | control | |
| | | | outputs | switch |
| | | +-----------+ | +--------+ |
| | | | | +------>+ o | switched |
| | +----------+ | | | normal outputs | self.switched_normal_outputs
| | +--------->+ o o +------------------------->
| | | / | |
| | +----------+ normal | / | switched |
| | | | outputs +--------->+ o | control outputs | self.control_outputs
| +-------> subsys 2 +-----------+ | o +------+------------------>
| | | | / | | |
| | | control +-------->+ o | | |
| | | outputs | +---+----+ | |
| | +------------+ ^ | |
| | | | | |
| +----------+ | | |
| o | | |
| control +-----------+ o switch position | | |
| inputs | +-----------------------------------------------+ | |
+------------------>+ execution | | |
self._control_inputs | control | | |
| | +<----------------------------------------------------------+ |
| +-----------+ |
| |
+----------------------------------------------------------------------------------------------------+
- control_inputs - inputs used to control the switching strategy
- subsystem_prototypes - a list of the prototypes of all subsystems (of type GenericSubsystem)
- switch_reference_outputs - output signals of the reference subsystem from which the output datatypes are inherited
- number_of_control_outputs - the number of outputs of the subsystems used to control the execution
- helper function for code generation -
- self.generate_switch(), generate_reset()
"""
def __init__(self, sim : System, control_inputs : List [Signal], subsystem_prototypes : List [GenericSubsystem], switch_reference_outputs : List [Signal], number_of_control_outputs : int = 0 ):
assert number_of_control_outputs >= 0
# a list of the prototypes of all subsystems
self._subsystem_prototypes = subsystem_prototypes
# analyze the default subsystem (the first) to get the output datatypes to use
reference_subsystem_prototype = self._subsystem_prototypes[0] # [0] DIFF
# the number of outputs besides the subsystems outputs
self._number_of_control_outputs = number_of_control_outputs
self._total_number_of_subsystem_outputs = len(reference_subsystem_prototype.outputs)
self._number_of_switched_outputs = len(switch_reference_outputs) # DIFF: self._number_of_normal_outputs
if self._number_of_switched_outputs + number_of_control_outputs != self._total_number_of_subsystem_outputs:
raise BaseException("given number of total subsystem outputs does not match")
self._number_of_outputs_of_all_nested_systems = len(reference_subsystem_prototype.outputs)
# assertion
for subsystem_prototype in self._subsystem_prototypes:
if not len(subsystem_prototype.outputs) == self._total_number_of_subsystem_outputs:
raise BaseException("all subsystems must have the same number of outputs")
# now call the constructor for block prototypes and make input and output signals available
bi.BlockPrototype.__init__(self, sim=sim, inputSignals=None, N_outputs = self._total_number_of_subsystem_outputs )
# control inputs that are used to control how the subsystems are handled
self._control_inputs = control_inputs
# a list of all inputs used by all subsystems
self._list_of_all_subsystem_inputs = None
# a list of all inputs including self._list_of_all_subsystem_inputs and self._control_inputs
# will be filled in on compile_callback_all_subsystems_compiled()
self._list_of_all_inputs = None
# inherit output datatypes of this block from the embedded subsystem
setup_output_datatype_inheritance(
normal_outputs_of_embedding_block=self.switched_normal_outputs,
subsystem_prototype=reference_subsystem_prototype
)
# build the mapping between the signals of the embedding block and the (reference) embedded block
# please note that this consideres normal ouputs and control outputs combined.
self.outputs_map_from_embedding_block_to_subsystem = OutputMapEmbeddingBlockToSubsystem(
normal_outputs_of_embedding_block=self.outputs,
subsystem_prototype=reference_subsystem_prototype
)
@property
def control_outputs(self):
return self.outputs[ self._number_of_switched_outputs: ]
@property # rename to normal_outputs
def switched_normal_outputs(self):
return self.outputs[ 0:self._number_of_switched_outputs ]
def compile_callback_all_subsystems_compiled(self):
# Get all input signals required by the subsystems and avoid duplicates.
# Go through each subsystem in self._subsystem_prototypes and collect all input signals
# in a set.
set_of_all_inputs = set()
for subsystem_prototype in self._subsystem_prototypes:
set_of_all_inputs.update( subsystem_prototype.inputs )
self._list_of_all_subsystem_inputs = list( set_of_all_inputs )
# add additional inputs
set_of_all_inputs.update( self._control_inputs )
self._list_of_all_inputs = list(set_of_all_inputs)
def compile_callback_all_datatypes_defined(self):
pass
def config_request_define_feedforward_input_dependencies(self, outputSignal):
# NOTE: This is a simplified so far..
# Every output depends on every signal
return self._list_of_all_inputs
def config_request_define_state_update_input_dependencies(self, outputSignal):
# NOTE: This is a simplified so far..
# The update depends on every signal
return self._list_of_all_inputs
# for code_gen
def codegen_help_generate_switch( self, language, switch_control_signal_name, switch_ouput_signals=None, calculate_outputs = True, update_states = False, additional_outputs=None ):
"""
code generation helper to embed multiple subsystems and a switch for the outputs
"""
if calculate_outputs:
# combine all output names to one list: normal subsystem outputs and control outputs
assign_to_signals = []
assign_to_signals.extend( switch_ouput_signals )
if additional_outputs is not None:
assign_to_signals.extend( additional_outputs )
#
indices_of_outputs_to_compute = self.outputs_map_from_embedding_block_to_subsystem.map_to_output_index( assign_to_signals )
# code gen
lines = ''
action_list = []
condition_list = []
subsystem_counter = 0
for system_prototype in self._subsystem_prototypes:
if calculate_outputs:
# Ah well.. this is supposed to do: system_prototype.outputs[ indices_of_outputs_to_compute ]
tmp = []
for i in indices_of_outputs_to_compute:
tmp.append( system_prototype.outputs[i] )
# generate code to call subsystem output(s)
code_compute_output = embed_subsystem3(
language,
subsystem_prototype=system_prototype,
assign_to_signals=assign_to_signals,
ouput_signals_of_subsystem = tmp,
calculate_outputs = True, update_states = False
)
else:
code_compute_output = '' # no operation
if update_states:
code_compute_state_update = embed_subsystem3(
language,
subsystem_prototype=system_prototype,
calculate_outputs = False, update_states = True
)
else:
code_compute_state_update = '' # no operation
action_list.append( code_compute_output + code_compute_state_update )
# generate conditions when to execute the respective subsystem
condition_list.append( cgh.generate_compare_equality_to_constant( language, switch_control_signal_name , subsystem_counter ) )
subsystem_counter += 1
# combine conditions and their repective actions
lines += cgh.generate_if_else(language, condition_list, action_list)
return lines
def generate_code_defStates(self, language):
if language == 'c++':
# implement state definition for each subsystem
lines = ''
for system_prototype in self._subsystem_prototypes:
lines += system_prototype.generate_code_defStates(language)
return lines
def generate_reset(self, language, system_index):
"""
helper fn for code generation
"""
system_to_reset = self._subsystem_prototypes[ system_index ]
lines = system_to_reset.generate_code_reset(language)
return lines
def generate_code_reset(self, language):
if language == 'c++':
# reset all subsystems
lines = '// reset state of all subsystems in multisubsystem ' + self.name + '\n'
for system_prototype in self._subsystem_prototypes:
lines += system_prototype.generate_code_reset(language)
return lines
Classes
class MultiSubsystemEmbedder (sim: System, control_inputs: List[Signal], subsystem_prototypes: List[GenericSubsystem], switch_reference_outputs: List[Signal], number_of_control_outputs: int = 0)
-
Prototype for a block that includes multiple sub-systems whose outputs are joint in a switching manner (this is class to be derived, e.g. by StatemachineSwitchSubsystems, SwitchSubsystems)
+----------------------------------------------------------------------------------------------------+ | normal inputs (determinded on compile_callback_all_subsystems_compiled) | | +----------+ |
self._list_of_all_subsystem_inputs | | normal outputs | +---------------------------------+-------> subsys 1 +--------------+ | | | | | | | | | | | control | | | | | | outputs | switch | | | | +-----------+ | +--------+ | | | | | | +------>+ o | switched | | | +----------+ | | | normal outputs | self.switched_normal_outputs | | +--------->+ o o +-------------------------> | | | / | | | | +----------+ normal | / | switched | | | | | outputs +--------->+ o | control outputs | self.control_outputs | +-------> subsys 2 +-----------+ | o +------+------------------> | | | | / | | | | | | control +-------->+ o | | | | | | outputs | +—+----+ | | | | +------------+ ^ | | | | | | | | | +----------+ | | | | o | | | | control +-----------+ o switch position | | | | inputs | +-----------------------------------------------+ | | +------------------>+ execution | | | self._control_inputs | control | | | | | +<----------------------------------------------------------+ | | +-----------+ | | | +----------------------------------------------------------------------------------------------------+
- control_inputs - inputs used to control the switching strategy
- subsystem_prototypes - a list of the prototypes of all subsystems (of type GenericSubsystem)
- switch_reference_outputs - output signals of the reference subsystem from which the output datatypes are inherited
-
number_of_control_outputs - the number of outputs of the subsystems used to control the execution
-
helper function for code generation -
-
self.generate_switch(), generate_reset()
Expand source code
class MultiSubsystemEmbedder(bi.BlockPrototype): """ Prototype for a block that includes multiple sub-systems whose outputs are joint in a switching manner (this is class to be derived, e.g. by StatemachineSwitchSubsystems, SwitchSubsystems) +----------------------------------------------------------------------------------------------------+ | normal inputs (determinded on compile_callback_all_subsystems_compiled) | | +----------+ | self._list_of_all_subsystem_inputs | | normal outputs | +---------------------------------+-------> subsys 1 +--------------+ | | | | | | | | | | | control | | | | | | outputs | switch | | | | +-----------+ | +--------+ | | | | | | +------>+ o | switched | | | +----------+ | | | normal outputs | self.switched_normal_outputs | | +--------->+ o o +-------------------------> | | | / | | | | +----------+ normal | / | switched | | | | | outputs +--------->+ o | control outputs | self.control_outputs | +-------> subsys 2 +-----------+ | o +------+------------------> | | | | / | | | | | | control +-------->+ o | | | | | | outputs | +---+----+ | | | | +------------+ ^ | | | | | | | | | +----------+ | | | | o | | | | control +-----------+ o switch position | | | | inputs | +-----------------------------------------------+ | | +------------------>+ execution | | | self._control_inputs | control | | | | | +<----------------------------------------------------------+ | | +-----------+ | | | +----------------------------------------------------------------------------------------------------+ - control_inputs - inputs used to control the switching strategy - subsystem_prototypes - a list of the prototypes of all subsystems (of type GenericSubsystem) - switch_reference_outputs - output signals of the reference subsystem from which the output datatypes are inherited - number_of_control_outputs - the number of outputs of the subsystems used to control the execution - helper function for code generation - - self.generate_switch(), generate_reset() """ def __init__(self, sim : System, control_inputs : List [Signal], subsystem_prototypes : List [GenericSubsystem], switch_reference_outputs : List [Signal], number_of_control_outputs : int = 0 ): assert number_of_control_outputs >= 0 # a list of the prototypes of all subsystems self._subsystem_prototypes = subsystem_prototypes # analyze the default subsystem (the first) to get the output datatypes to use reference_subsystem_prototype = self._subsystem_prototypes[0] # [0] DIFF # the number of outputs besides the subsystems outputs self._number_of_control_outputs = number_of_control_outputs self._total_number_of_subsystem_outputs = len(reference_subsystem_prototype.outputs) self._number_of_switched_outputs = len(switch_reference_outputs) # DIFF: self._number_of_normal_outputs if self._number_of_switched_outputs + number_of_control_outputs != self._total_number_of_subsystem_outputs: raise BaseException("given number of total subsystem outputs does not match") self._number_of_outputs_of_all_nested_systems = len(reference_subsystem_prototype.outputs) # assertion for subsystem_prototype in self._subsystem_prototypes: if not len(subsystem_prototype.outputs) == self._total_number_of_subsystem_outputs: raise BaseException("all subsystems must have the same number of outputs") # now call the constructor for block prototypes and make input and output signals available bi.BlockPrototype.__init__(self, sim=sim, inputSignals=None, N_outputs = self._total_number_of_subsystem_outputs ) # control inputs that are used to control how the subsystems are handled self._control_inputs = control_inputs # a list of all inputs used by all subsystems self._list_of_all_subsystem_inputs = None # a list of all inputs including self._list_of_all_subsystem_inputs and self._control_inputs # will be filled in on compile_callback_all_subsystems_compiled() self._list_of_all_inputs = None # inherit output datatypes of this block from the embedded subsystem setup_output_datatype_inheritance( normal_outputs_of_embedding_block=self.switched_normal_outputs, subsystem_prototype=reference_subsystem_prototype ) # build the mapping between the signals of the embedding block and the (reference) embedded block # please note that this consideres normal ouputs and control outputs combined. self.outputs_map_from_embedding_block_to_subsystem = OutputMapEmbeddingBlockToSubsystem( normal_outputs_of_embedding_block=self.outputs, subsystem_prototype=reference_subsystem_prototype ) @property def control_outputs(self): return self.outputs[ self._number_of_switched_outputs: ] @property # rename to normal_outputs def switched_normal_outputs(self): return self.outputs[ 0:self._number_of_switched_outputs ] def compile_callback_all_subsystems_compiled(self): # Get all input signals required by the subsystems and avoid duplicates. # Go through each subsystem in self._subsystem_prototypes and collect all input signals # in a set. set_of_all_inputs = set() for subsystem_prototype in self._subsystem_prototypes: set_of_all_inputs.update( subsystem_prototype.inputs ) self._list_of_all_subsystem_inputs = list( set_of_all_inputs ) # add additional inputs set_of_all_inputs.update( self._control_inputs ) self._list_of_all_inputs = list(set_of_all_inputs) def compile_callback_all_datatypes_defined(self): pass def config_request_define_feedforward_input_dependencies(self, outputSignal): # NOTE: This is a simplified so far.. # Every output depends on every signal return self._list_of_all_inputs def config_request_define_state_update_input_dependencies(self, outputSignal): # NOTE: This is a simplified so far.. # The update depends on every signal return self._list_of_all_inputs # for code_gen def codegen_help_generate_switch( self, language, switch_control_signal_name, switch_ouput_signals=None, calculate_outputs = True, update_states = False, additional_outputs=None ): """ code generation helper to embed multiple subsystems and a switch for the outputs """ if calculate_outputs: # combine all output names to one list: normal subsystem outputs and control outputs assign_to_signals = [] assign_to_signals.extend( switch_ouput_signals ) if additional_outputs is not None: assign_to_signals.extend( additional_outputs ) # indices_of_outputs_to_compute = self.outputs_map_from_embedding_block_to_subsystem.map_to_output_index( assign_to_signals ) # code gen lines = '' action_list = [] condition_list = [] subsystem_counter = 0 for system_prototype in self._subsystem_prototypes: if calculate_outputs: # Ah well.. this is supposed to do: system_prototype.outputs[ indices_of_outputs_to_compute ] tmp = [] for i in indices_of_outputs_to_compute: tmp.append( system_prototype.outputs[i] ) # generate code to call subsystem output(s) code_compute_output = embed_subsystem3( language, subsystem_prototype=system_prototype, assign_to_signals=assign_to_signals, ouput_signals_of_subsystem = tmp, calculate_outputs = True, update_states = False ) else: code_compute_output = '' # no operation if update_states: code_compute_state_update = embed_subsystem3( language, subsystem_prototype=system_prototype, calculate_outputs = False, update_states = True ) else: code_compute_state_update = '' # no operation action_list.append( code_compute_output + code_compute_state_update ) # generate conditions when to execute the respective subsystem condition_list.append( cgh.generate_compare_equality_to_constant( language, switch_control_signal_name , subsystem_counter ) ) subsystem_counter += 1 # combine conditions and their repective actions lines += cgh.generate_if_else(language, condition_list, action_list) return lines def generate_code_defStates(self, language): if language == 'c++': # implement state definition for each subsystem lines = '' for system_prototype in self._subsystem_prototypes: lines += system_prototype.generate_code_defStates(language) return lines def generate_reset(self, language, system_index): """ helper fn for code generation """ system_to_reset = self._subsystem_prototypes[ system_index ] lines = system_to_reset.generate_code_reset(language) return lines def generate_code_reset(self, language): if language == 'c++': # reset all subsystems lines = '// reset state of all subsystems in multisubsystem ' + self.name + '\n' for system_prototype in self._subsystem_prototypes: lines += system_prototype.generate_code_reset(language) return lines
Ancestors
Subclasses
Instance variables
var control_outputs
-
Expand source code
@property def control_outputs(self): return self.outputs[ self._number_of_switched_outputs: ]
var switched_normal_outputs
-
Expand source code
@property # rename to normal_outputs def switched_normal_outputs(self): return self.outputs[ 0:self._number_of_switched_outputs ]
Methods
def codegen_help_generate_switch(self, language, switch_control_signal_name, switch_ouput_signals=None, calculate_outputs=True, update_states=False, additional_outputs=None)
-
code generation helper to embed multiple subsystems and a switch for the outputs
Expand source code
def codegen_help_generate_switch( self, language, switch_control_signal_name, switch_ouput_signals=None, calculate_outputs = True, update_states = False, additional_outputs=None ): """ code generation helper to embed multiple subsystems and a switch for the outputs """ if calculate_outputs: # combine all output names to one list: normal subsystem outputs and control outputs assign_to_signals = [] assign_to_signals.extend( switch_ouput_signals ) if additional_outputs is not None: assign_to_signals.extend( additional_outputs ) # indices_of_outputs_to_compute = self.outputs_map_from_embedding_block_to_subsystem.map_to_output_index( assign_to_signals ) # code gen lines = '' action_list = [] condition_list = [] subsystem_counter = 0 for system_prototype in self._subsystem_prototypes: if calculate_outputs: # Ah well.. this is supposed to do: system_prototype.outputs[ indices_of_outputs_to_compute ] tmp = [] for i in indices_of_outputs_to_compute: tmp.append( system_prototype.outputs[i] ) # generate code to call subsystem output(s) code_compute_output = embed_subsystem3( language, subsystem_prototype=system_prototype, assign_to_signals=assign_to_signals, ouput_signals_of_subsystem = tmp, calculate_outputs = True, update_states = False ) else: code_compute_output = '' # no operation if update_states: code_compute_state_update = embed_subsystem3( language, subsystem_prototype=system_prototype, calculate_outputs = False, update_states = True ) else: code_compute_state_update = '' # no operation action_list.append( code_compute_output + code_compute_state_update ) # generate conditions when to execute the respective subsystem condition_list.append( cgh.generate_compare_equality_to_constant( language, switch_control_signal_name , subsystem_counter ) ) subsystem_counter += 1 # combine conditions and their repective actions lines += cgh.generate_if_else(language, condition_list, action_list) return lines
def generate_code_reset(self, language)
-
Expand source code
def generate_code_reset(self, language): if language == 'c++': # reset all subsystems lines = '// reset state of all subsystems in multisubsystem ' + self.name + '\n' for system_prototype in self._subsystem_prototypes: lines += system_prototype.generate_code_reset(language) return lines
def generate_reset(self, language, system_index)
-
helper fn for code generation
Expand source code
def generate_reset(self, language, system_index): """ helper fn for code generation """ system_to_reset = self._subsystem_prototypes[ system_index ] lines = system_to_reset.generate_code_reset(language) return lines
Inherited members
BlockPrototype
:codegen_addToNamespace
compile_callback_all_datatypes_defined
compile_callback_all_subsystems_compiled
config_request_define_feedforward_input_dependencies
config_request_define_state_update_input_dependencies
generate_code_defStates
generate_code_destruct
generate_code_init
generate_code_setOutputReference
update_input_config