Module openrtdynamics2.lang.block_prototypes_subsystems

Expand source code
from .diagram_core.system import System
from .diagram_core import datatypes as dt
from .diagram_core.signal_network.signals import Signal
from .diagram_core import code_generation_helper as cgh
from . import block_interface as bi

from typing import Dict, List





#
# Subsystem prototypes
#

class GenericSubsystem(bi.BlockPrototype):
    """
        Include a sub-system by passing a manifest

        - sim - the simulation this block is embedded into

        parameters required only in case the subsystem is already defined (e.g. loaded from a library):

        - manifest           - the manifest of the subsystem to include (optional, might be handed over by init())
        - inputSignals       - the inputs to the subsystem 
        - N_outputs          - prepare a number of nOutputs (optional in case a manifest is given)
        - embedded_subsystem - the system to embed (optional in case a manifest to an already compiled subsystem is given, NOT IMPLEMENTED)

        Note: the number of outputs must be defined either by N_outputs or by a manifest

    """
    def __init__(self, sim : System = None, manifest=None, inputSignals=None, N_outputs = None, embedded_subsystem=None ):

        self.manifest = manifest
        self.inputSignals = inputSignals
        self.sim = sim
        self.Noutputs = N_outputs
        self._embedded_subsystem = embedded_subsystem

        if manifest is not None:
            if N_outputs is None:
                self.Noutputs = self.manifest.number_of_default_ouputs
            else:
                raise BaseException("N_outputs and a manifest specified at the same time")

        # output signals that were created by sth. ourside of this prototype
        # and that need to be connected to the actual outputs when init() is called.
        self.anonymous_output_signals = None

        # optional (in case this block is in charge of putting the code for the subsystem)
        self.compileResult = None

        # init super class
        bi.BlockPrototype.__init__(self, self.sim, N_outputs = self.Noutputs)

        # Note: the inputSignals are not defined when subsystems are pre-defined in the code
        # but are automatically placed and connected by the compiler during compilation

        # check if it is already possible to init this prototype
        # (in case all requred information is available)
        if inputSignals is not None and manifest is not None:
            self.init()

        # configure datatype inheritance for the outputs signals
        for i in range(0, len( embedded_subsystem.primary_outputs )):

            output_signal_of_embedding_block = self.outputs[i]
            output_signal_of_subsystem = embedded_subsystem.primary_outputs[i]

            output_signal_of_embedding_block.inherit_datatype_from_signal( output_signal_of_subsystem )

    @property
    def embedded_subsystem(self):
        """
            Return the system that is embedded (in case it was provided, returns None otherwise)
        """

        return self._embedded_subsystem

    def set_anonymous_output_signal_to_connect(self, anonymous_output_signals):
        """
            store a list of anonymous signals to connect to the outputs of the subsystems
            after running the post_compile_callback
        """
        # List of raw signals 
        self.anonymous_output_signals = anonymous_output_signals

    def compile_callback_all_subsystems_compiled(self):

        embedded_system = self._embedded_subsystem
        #
        # continue init as now all subsystems are compiled and the compile results and the manifest of
        # the system to compile is available.
        #
        self.init(embedded_system.compile_result.manifest, embedded_system.compile_result, embedded_system.compile_result.inputSignals)

    # post_compile_callback (called after the subsystem to embed was compiled)
    def init(self, manifest, compileResult, inputSignals):
        """
            This is a second phase initialization of this subsystem block 
            (to be called by compile_callback_all_subsystems_compiled())

            This function shall be called when the subsystem to embed is compiled
            after the instance of 'GenericSubsystem' is created. This way, it is possible
            to add blocks embeddeding sub-systems without haveing these subsystems to be
            already compiled.

            Optionally, the system this block belongs to can be set.

            manifest       - the system manifest of the subsystem to embed
            compileResults - the compile results of the subsystem to embed
            inputSignals   - input signals to the subsystem to embed (links comming from an upper-level subsystem)

        """        

        #
        #    set the manifest of the subsystem
        #
        if self.manifest is not None:
            raise BaseException("cannot call this function as the subsystem's manifest was already defined in the constructor.")

        self.manifest = manifest

        #
        #    Set the compilation result of the embedded system (if available)
        #
        self.compileResult = compileResult

        #
        #    connect the inputs (coming from the upper-level system)
        #

        if self.inputSignals is not None:
            raise BaseException("The subsystem's inputSignals were already specified in the constructor.")

        self.inputSignals = inputSignals



        def collectDependingSignals(signals, manifestFunctionInputs):
            # collect all depending input signals (that are needed to calculate the output) in a list
            # MOVE TO A FUNCTION. MAYBE MOVE TO MANIFEST.PY
            dependingInputs = []
            for i in range( len(manifestFunctionInputs['names']) ):

                dependingInput_name = manifestFunctionInputs['names'][i]
                dependingInput_type = manifestFunctionInputs['types'][i]
                dependingInput_cpptype = manifestFunctionInputs['cpptypes'][i]

                # TODO: CHECK FOR FAILING LOOKUP
                signal = signals[ dependingInput_name ]

                # check datatype
                if not signal.getDatatype().cpp_datatype_string == dependingInput_cpptype:
                    raise BaseException('datatype does not match the one specified in the manifest. (' + (dependingInput_cpptype) + ' is required in the manifest)' )

                # append signal
                dependingInputs.append( signal ) 

            return dependingInputs



        # verify the number of outputs of the embedded system
        number_of_outputs_as_described_by_manifest = self.manifest.number_of_default_ouputs

        if not number_of_outputs_as_described_by_manifest == self.Noutputs:
            BaseException("missmatch in the number of outputs")

        # get the output datatypes of the embedded system
        self.outputTypes = self.manifest.io_outputs['calculate_output']['types']  


        if self.compileResult is None:
            # collect all depending input signals (that are needed to calculate the output) in a list
            self.inputsToCalculateOutputs = collectDependingSignals( self.inputSignals, self.manifest.io_inputs['calculate_output'] )

            # collect all inputs required to perform the state update
            self.inputsToUpdateStates = collectDependingSignals( self.inputSignals, self.manifest.io_inputs['state_update'] )

        else:
            # use the available compile results to get the I/O signals
            # in this case, self.inputSignals shall be a list of signals. The order
            # shall match the signal order in self.compileResults.inputSignals

            self.inputsToCalculateOutputs = self.compileResult.simulationInputSignalsToCalculateOutputs
            self.inputsToUpdateStates = self.compileResult.simulationInputSignalsToUpdateStates

            

        # combine all inputs to a list
        self.allInputs = list()

        self.allInputs.extend( self.inputsToCalculateOutputs )
        self.allInputs.extend( self.inputsToUpdateStates )

        #
        # now initialize the propotype
        #

        # define the inputs
        self.update_input_config( self.allInputs )

        # connect the outputs signals
        if self.anonymous_output_signals is not None:

            print(" -- Nesting block: connecting anonymous signals -- ")

            Ns = len(self.outputSignals)

            if not Ns == len(  self.anonymous_output_signals ):
                raise BaseException(" missmatch in the number of output signals")

            for i in range(0,Ns):
                
                s_ananon = self.anonymous_output_signals[i]
                s_source = self.outputSignals[i]

                print("connecting the output " + s_ananon.toStr() + " of the embedding block")
                s_ananon.setequal( s_source )



        # for code generation
        self.instanceVarname = self.getUniqueVarnamePrefix() + '_subsystem_' + self.manifest.API_name


    def config_request_define_output_types(self, inputTypes):

        # the datatypes are fixed in the manifest 
        return self.outputTypes        

    def config_request_define_feedforward_input_dependencies(self, outputSignal):

        # NOTE: This is a simplified veriant so far.. no dependence on the given 'outputSignal'
        #       (Every output depends on every signal in self.inputsToCalculateOutputs)

        # TODO: 6.10.19 implement this in a more granular way.
        # also use self.compileResults to get those information

        return self.inputsToCalculateOutputs

    def config_request_define_state_update_input_dependencies(self, outputSignal):
 
        # return a list of input signals that are required to update the states
        return self.inputsToUpdateStates



    def codegen_addToNamespace(self, language):
        lines = ''

        # putting code for subsystems is performed using execution commands

        return lines

    def generate_code_defStates(self, language):
        if language == 'c++':
            lines = '// instance of ' + self.manifest.API_name + '\n'
            lines += self.manifest.API_name + ' ' + self.instanceVarname + ';\n'

            return lines

    def generate_code_reset(self, language):
        if language == 'c++':
            return self.instanceVarname + '.' + self.manifest.getAPIFunctionName('reset') +  '();\n'


    def generate_code_init(self, language):
        pass


    def generate_code_destruct(self, language):
        pass


    # helper fn to build code
    def generate_code_call_OutputFunction(self, instanceVarname, manifest, language):
        return instanceVarname + '.' + manifest.getAPIFunctionName('calculate_output') +  '(' + cgh.signal_list_to_names_string(self.outputs + self.inputsToCalculateOutputs) + ');\n'

    # helper fn to build code
    def generate_code_call_UpdateFunction(self, instanceVarname, manifest, language):
        return instanceVarname + '.' + manifest.getAPIFunctionName('state_update') +  '(' + cgh.signal_list_to_names_string(self.inputsToUpdateStates) + ');\n'

    def generate_code_output_list(self, language, signals : List [ Signal ] ):

        if language == 'c++':
            lines = ''
            
            #
            # TODO: 2.5.2020: concept: how to compute only the nescessary signals?
            #








            #
            # REWORK: introduce call to fn(Inputs & inputs, Outputs & outputs)
            # and remove the prev. style
            #

            for s in self.outputs: # for each output of the subsystem reservate a variable
                lines += cgh.define_variable_line( s ) 

                if s not in signals:
                    lines += '// NOTE: unused output signal' + s.name + '\n'
                else:
                    lines += ''                

            lines += self.generate_code_call_OutputFunction(self.instanceVarname, self.manifest, language)

        return lines

    def generate_code_update(self, language):
        if language == 'c++':

            # input to this call are the signals in self.inputsToUpdateStates
            return self.generate_code_call_UpdateFunction(self.instanceVarname, self.manifest, language)

















#
# helper functions for both SingleSubsystemEmbedder and XX
#

def setup_output_datatype_inheritance( normal_outputs_of_embedding_block, subsystem_prototype ):
    # inherit output datatypes of this block from the embedded subsystem described by subsystem_prototype

    for i in range(0, len(normal_outputs_of_embedding_block) ):

        output_signal_of_embedding_block = normal_outputs_of_embedding_block[i]
        output_signal_of_subsystem = subsystem_prototype.outputs[i]
        output_signal_of_embedding_block.inherit_datatype_from_signal( output_signal_of_subsystem )

    return


class OutputMapEmbeddingBlockToSubsystem():

    def __init__(self, normal_outputs_of_embedding_block, subsystem_prototype):

        self._output_signal_mapping, self._output_signal_index_mapping = self.create_output_mapping_table(normal_outputs_of_embedding_block, subsystem_prototype )


    def create_output_mapping_table(self, normal_outputs_of_embedding_block, subsystem_prototype ):
        # output signal mapping: map each output of SingleSubsystemEmbedder to an output of the subsystem
        output_signal_mapping = {}

        # map output signal of embedding block to output index of the embedded block
        output_signal_index_mapping = {}

        for i in range(0, len(normal_outputs_of_embedding_block) ):
            # fill in mapping table
            output_signal_mapping[ normal_outputs_of_embedding_block[i] ] = subsystem_prototype.outputs[i]
            output_signal_index_mapping[ normal_outputs_of_embedding_block[i] ] = i



        return output_signal_mapping, output_signal_index_mapping


    def map(self, output_signals_of_embedding_block):
        """

            given the signals to calculate (variable signals) in the callbacks 

                def generate_code_output_list(self, language, signals : List [ Signal ] ): 

            resolve the mapping to the embedded subsystems output signals

        """

        mapped_subsystem_output_signals = []
        for s in output_signals_of_embedding_block:
            mapped_subsystem_output_signals.append( self._output_signal_mapping[s] )

        return mapped_subsystem_output_signals
    


    def map_to_output_index(self, output_signals_of_embedding_block):
        """
            return a mapping to indices of the block outputs:

            e.g. [sig0, sig1, sig2, sig4] --> [0, 1, 2, 4]
        """

        mapped_subsystem_output_signals = []
        for s in output_signals_of_embedding_block:
            mapped_subsystem_output_signals.append( self._output_signal_index_mapping[s] )

        return mapped_subsystem_output_signals
    








    # def helper_get_output_signal_mapping_to_subsystem(self, signals_to_calculate):
    #     """

    #         given the signals to calculate (variable signals) in the callbacks 

    #             def generate_code_output_list(self, language, signals : List [ Signal ] ): 

    #         resolve the mapping to the embedded subsystems output signals

    #     """

    #     return self.outputs_map_from_embedding_block_to_subsystem.map( signals_to_calculate )

    #     # mapped_subsystem_output_signals = []
    #     # for s in signals_to_calculate:
    #     #     mapped_subsystem_output_signals.append( self._output_signal_mapping[s] )

    #     # return mapped_subsystem_output_signals
    
















# helper fn for classes that are derived from SingleSubsystemEmbedder and XX
def embed_subsystem3(language, subsystem_prototype, assign_to_signals=None, ouput_signals_of_subsystem=None, calculate_outputs = True, update_states = False, reset_states=False ):
    """  
        generate code to call a subsystem

        - subsystem_prototype - the block prototype including the subsystem - type: : dy.GenericSubsystem
        - assign_to_signals   - list of signals to which the output signals of the subsystem are assigned to
        
        - ouput_signal_of_subsystem - the output signals of the embedded subsystem

        - calculate_outputs   - generate a call to the output computation API function of the subsystem
        - update_states       - generate a call to the state update API function of the subsystem
    """


    lines = '{ // subsystem ' + subsystem_prototype.embedded_subsystem.name + '\n'

    innerLines = ''

    #
    # system_prototype is of type GenericSubsystem. call the code generation routine of the subsystem
    #

    if reset_states:
        innerLines += subsystem_prototype.generate_code_reset(language)


    # generate code for calculating the outputs 
    if calculate_outputs:

        # extract the signals names
        assign_to_signals_names = cgh.signal_list_to_name_list(assign_to_signals)
        ouput_signal_names_of_subsystem = cgh.signal_list_to_name_list(ouput_signals_of_subsystem)


        innerLines += subsystem_prototype.generate_code_output_list(language, subsystem_prototype.outputs)

        if len(ouput_signal_names_of_subsystem) != len(assign_to_signals_names):
            raise BaseException('len(ouput_signal_names_of_subsystem) != len(ouput_signals_name)')




        #
        # REWORK: read out the outputs.* structure
        #



        for i in range( 0, len( assign_to_signals_names ) ):
            innerLines += cgh.asign( ouput_signal_names_of_subsystem[i], assign_to_signals_names[i] )

    # generate code for updating the states
    if update_states:
        innerLines += subsystem_prototype.generate_code_update(language)


    lines += cgh.indent(innerLines)
    lines += '}\n'

    return lines








































# REMOVE
def embed_subsystem2(language, subsystem_prototype, ouput_signals_name=None, ouput_signal_names_of_subsystem=None, calculate_outputs = True, update_states = False, reset_states=False ):
    """  
        generate code to call a subsystem

        - subsystem_prototype - the block prototype including the subsystem - type: : dy.GenericSubsystem
        - ouput_signals_name  - list of variable names to which the output signals of the subsystem are assigned to
        
        - ouput_signal_names_of_subsystem - ....

        - calculate_outputs   - generate a call to the output computation API function of the subsystem
        - update_states       - generate a call to the state update API function of the subsystem
    """


    lines = '{ // subsystem ' + subsystem_prototype.embedded_subsystem.name + '\n'

    innerLines = ''

    #
    # system_prototype is of type GenericSubsystem. call the code generation routine of the subsystem
    #

    if reset_states:
        innerLines += subsystem_prototype.generate_code_reset(language)


    # generate code for calculating the outputs 
    if calculate_outputs:

        innerLines += subsystem_prototype.generate_code_output_list(language, subsystem_prototype.outputs)

        if len(ouput_signal_names_of_subsystem) != len(ouput_signals_name):
            raise BaseException('len(ouput_signal_names_of_subsystem) != len(ouput_signals_name)')

        #
        # REWORK: read out the outputs.* structure
        #

        for i in range( 0, len( ouput_signals_name ) ):  # NOTE: this might mix the output signals!
            innerLines += cgh.asign( ouput_signal_names_of_subsystem[i], ouput_signals_name[i] )

    # generate code for updating the states
    if update_states:
        innerLines += subsystem_prototype.generate_code_update(language)


    lines += cgh.indent(innerLines)
    lines += '}\n'

    return lines

Functions

def embed_subsystem2(language, subsystem_prototype, ouput_signals_name=None, ouput_signal_names_of_subsystem=None, calculate_outputs=True, update_states=False, reset_states=False)

generate code to call a subsystem

  • subsystem_prototype - the block prototype including the subsystem - type: : dy.GenericSubsystem
  • ouput_signals_name - list of variable names to which the output signals of the subsystem are assigned to

  • ouput_signal_names_of_subsystem - ....

  • calculate_outputs - generate a call to the output computation API function of the subsystem

  • update_states - generate a call to the state update API function of the subsystem
Expand source code
def embed_subsystem2(language, subsystem_prototype, ouput_signals_name=None, ouput_signal_names_of_subsystem=None, calculate_outputs = True, update_states = False, reset_states=False ):
    """  
        generate code to call a subsystem

        - subsystem_prototype - the block prototype including the subsystem - type: : dy.GenericSubsystem
        - ouput_signals_name  - list of variable names to which the output signals of the subsystem are assigned to
        
        - ouput_signal_names_of_subsystem - ....

        - calculate_outputs   - generate a call to the output computation API function of the subsystem
        - update_states       - generate a call to the state update API function of the subsystem
    """


    lines = '{ // subsystem ' + subsystem_prototype.embedded_subsystem.name + '\n'

    innerLines = ''

    #
    # system_prototype is of type GenericSubsystem. call the code generation routine of the subsystem
    #

    if reset_states:
        innerLines += subsystem_prototype.generate_code_reset(language)


    # generate code for calculating the outputs 
    if calculate_outputs:

        innerLines += subsystem_prototype.generate_code_output_list(language, subsystem_prototype.outputs)

        if len(ouput_signal_names_of_subsystem) != len(ouput_signals_name):
            raise BaseException('len(ouput_signal_names_of_subsystem) != len(ouput_signals_name)')

        #
        # REWORK: read out the outputs.* structure
        #

        for i in range( 0, len( ouput_signals_name ) ):  # NOTE: this might mix the output signals!
            innerLines += cgh.asign( ouput_signal_names_of_subsystem[i], ouput_signals_name[i] )

    # generate code for updating the states
    if update_states:
        innerLines += subsystem_prototype.generate_code_update(language)


    lines += cgh.indent(innerLines)
    lines += '}\n'

    return lines
def embed_subsystem3(language, subsystem_prototype, assign_to_signals=None, ouput_signals_of_subsystem=None, calculate_outputs=True, update_states=False, reset_states=False)

generate code to call a subsystem

  • subsystem_prototype - the block prototype including the subsystem - type: : dy.GenericSubsystem
  • assign_to_signals - list of signals to which the output signals of the subsystem are assigned to

  • ouput_signal_of_subsystem - the output signals of the embedded subsystem

  • calculate_outputs - generate a call to the output computation API function of the subsystem

  • update_states - generate a call to the state update API function of the subsystem
Expand source code
def embed_subsystem3(language, subsystem_prototype, assign_to_signals=None, ouput_signals_of_subsystem=None, calculate_outputs = True, update_states = False, reset_states=False ):
    """  
        generate code to call a subsystem

        - subsystem_prototype - the block prototype including the subsystem - type: : dy.GenericSubsystem
        - assign_to_signals   - list of signals to which the output signals of the subsystem are assigned to
        
        - ouput_signal_of_subsystem - the output signals of the embedded subsystem

        - calculate_outputs   - generate a call to the output computation API function of the subsystem
        - update_states       - generate a call to the state update API function of the subsystem
    """


    lines = '{ // subsystem ' + subsystem_prototype.embedded_subsystem.name + '\n'

    innerLines = ''

    #
    # system_prototype is of type GenericSubsystem. call the code generation routine of the subsystem
    #

    if reset_states:
        innerLines += subsystem_prototype.generate_code_reset(language)


    # generate code for calculating the outputs 
    if calculate_outputs:

        # extract the signals names
        assign_to_signals_names = cgh.signal_list_to_name_list(assign_to_signals)
        ouput_signal_names_of_subsystem = cgh.signal_list_to_name_list(ouput_signals_of_subsystem)


        innerLines += subsystem_prototype.generate_code_output_list(language, subsystem_prototype.outputs)

        if len(ouput_signal_names_of_subsystem) != len(assign_to_signals_names):
            raise BaseException('len(ouput_signal_names_of_subsystem) != len(ouput_signals_name)')




        #
        # REWORK: read out the outputs.* structure
        #



        for i in range( 0, len( assign_to_signals_names ) ):
            innerLines += cgh.asign( ouput_signal_names_of_subsystem[i], assign_to_signals_names[i] )

    # generate code for updating the states
    if update_states:
        innerLines += subsystem_prototype.generate_code_update(language)


    lines += cgh.indent(innerLines)
    lines += '}\n'

    return lines
def setup_output_datatype_inheritance(normal_outputs_of_embedding_block, subsystem_prototype)
Expand source code
def setup_output_datatype_inheritance( normal_outputs_of_embedding_block, subsystem_prototype ):
    # inherit output datatypes of this block from the embedded subsystem described by subsystem_prototype

    for i in range(0, len(normal_outputs_of_embedding_block) ):

        output_signal_of_embedding_block = normal_outputs_of_embedding_block[i]
        output_signal_of_subsystem = subsystem_prototype.outputs[i]
        output_signal_of_embedding_block.inherit_datatype_from_signal( output_signal_of_subsystem )

    return

Classes

class GenericSubsystem (sim: System = None, manifest=None, inputSignals=None, N_outputs=None, embedded_subsystem=None)

Include a sub-system by passing a manifest

  • sim - the simulation this block is embedded into

parameters required only in case the subsystem is already defined (e.g. loaded from a library):

  • manifest - the manifest of the subsystem to include (optional, might be handed over by init())
  • inputSignals - the inputs to the subsystem
  • N_outputs - prepare a number of nOutputs (optional in case a manifest is given)
  • embedded_subsystem - the system to embed (optional in case a manifest to an already compiled subsystem is given, NOT IMPLEMENTED)

Note: the number of outputs must be defined either by N_outputs or by a manifest

Expand source code
class GenericSubsystem(bi.BlockPrototype):
    """
        Include a sub-system by passing a manifest

        - sim - the simulation this block is embedded into

        parameters required only in case the subsystem is already defined (e.g. loaded from a library):

        - manifest           - the manifest of the subsystem to include (optional, might be handed over by init())
        - inputSignals       - the inputs to the subsystem 
        - N_outputs          - prepare a number of nOutputs (optional in case a manifest is given)
        - embedded_subsystem - the system to embed (optional in case a manifest to an already compiled subsystem is given, NOT IMPLEMENTED)

        Note: the number of outputs must be defined either by N_outputs or by a manifest

    """
    def __init__(self, sim : System = None, manifest=None, inputSignals=None, N_outputs = None, embedded_subsystem=None ):

        self.manifest = manifest
        self.inputSignals = inputSignals
        self.sim = sim
        self.Noutputs = N_outputs
        self._embedded_subsystem = embedded_subsystem

        if manifest is not None:
            if N_outputs is None:
                self.Noutputs = self.manifest.number_of_default_ouputs
            else:
                raise BaseException("N_outputs and a manifest specified at the same time")

        # output signals that were created by sth. ourside of this prototype
        # and that need to be connected to the actual outputs when init() is called.
        self.anonymous_output_signals = None

        # optional (in case this block is in charge of putting the code for the subsystem)
        self.compileResult = None

        # init super class
        bi.BlockPrototype.__init__(self, self.sim, N_outputs = self.Noutputs)

        # Note: the inputSignals are not defined when subsystems are pre-defined in the code
        # but are automatically placed and connected by the compiler during compilation

        # check if it is already possible to init this prototype
        # (in case all requred information is available)
        if inputSignals is not None and manifest is not None:
            self.init()

        # configure datatype inheritance for the outputs signals
        for i in range(0, len( embedded_subsystem.primary_outputs )):

            output_signal_of_embedding_block = self.outputs[i]
            output_signal_of_subsystem = embedded_subsystem.primary_outputs[i]

            output_signal_of_embedding_block.inherit_datatype_from_signal( output_signal_of_subsystem )

    @property
    def embedded_subsystem(self):
        """
            Return the system that is embedded (in case it was provided, returns None otherwise)
        """

        return self._embedded_subsystem

    def set_anonymous_output_signal_to_connect(self, anonymous_output_signals):
        """
            store a list of anonymous signals to connect to the outputs of the subsystems
            after running the post_compile_callback
        """
        # List of raw signals 
        self.anonymous_output_signals = anonymous_output_signals

    def compile_callback_all_subsystems_compiled(self):

        embedded_system = self._embedded_subsystem
        #
        # continue init as now all subsystems are compiled and the compile results and the manifest of
        # the system to compile is available.
        #
        self.init(embedded_system.compile_result.manifest, embedded_system.compile_result, embedded_system.compile_result.inputSignals)

    # post_compile_callback (called after the subsystem to embed was compiled)
    def init(self, manifest, compileResult, inputSignals):
        """
            This is a second phase initialization of this subsystem block 
            (to be called by compile_callback_all_subsystems_compiled())

            This function shall be called when the subsystem to embed is compiled
            after the instance of 'GenericSubsystem' is created. This way, it is possible
            to add blocks embeddeding sub-systems without haveing these subsystems to be
            already compiled.

            Optionally, the system this block belongs to can be set.

            manifest       - the system manifest of the subsystem to embed
            compileResults - the compile results of the subsystem to embed
            inputSignals   - input signals to the subsystem to embed (links comming from an upper-level subsystem)

        """        

        #
        #    set the manifest of the subsystem
        #
        if self.manifest is not None:
            raise BaseException("cannot call this function as the subsystem's manifest was already defined in the constructor.")

        self.manifest = manifest

        #
        #    Set the compilation result of the embedded system (if available)
        #
        self.compileResult = compileResult

        #
        #    connect the inputs (coming from the upper-level system)
        #

        if self.inputSignals is not None:
            raise BaseException("The subsystem's inputSignals were already specified in the constructor.")

        self.inputSignals = inputSignals



        def collectDependingSignals(signals, manifestFunctionInputs):
            # collect all depending input signals (that are needed to calculate the output) in a list
            # MOVE TO A FUNCTION. MAYBE MOVE TO MANIFEST.PY
            dependingInputs = []
            for i in range( len(manifestFunctionInputs['names']) ):

                dependingInput_name = manifestFunctionInputs['names'][i]
                dependingInput_type = manifestFunctionInputs['types'][i]
                dependingInput_cpptype = manifestFunctionInputs['cpptypes'][i]

                # TODO: CHECK FOR FAILING LOOKUP
                signal = signals[ dependingInput_name ]

                # check datatype
                if not signal.getDatatype().cpp_datatype_string == dependingInput_cpptype:
                    raise BaseException('datatype does not match the one specified in the manifest. (' + (dependingInput_cpptype) + ' is required in the manifest)' )

                # append signal
                dependingInputs.append( signal ) 

            return dependingInputs



        # verify the number of outputs of the embedded system
        number_of_outputs_as_described_by_manifest = self.manifest.number_of_default_ouputs

        if not number_of_outputs_as_described_by_manifest == self.Noutputs:
            BaseException("missmatch in the number of outputs")

        # get the output datatypes of the embedded system
        self.outputTypes = self.manifest.io_outputs['calculate_output']['types']  


        if self.compileResult is None:
            # collect all depending input signals (that are needed to calculate the output) in a list
            self.inputsToCalculateOutputs = collectDependingSignals( self.inputSignals, self.manifest.io_inputs['calculate_output'] )

            # collect all inputs required to perform the state update
            self.inputsToUpdateStates = collectDependingSignals( self.inputSignals, self.manifest.io_inputs['state_update'] )

        else:
            # use the available compile results to get the I/O signals
            # in this case, self.inputSignals shall be a list of signals. The order
            # shall match the signal order in self.compileResults.inputSignals

            self.inputsToCalculateOutputs = self.compileResult.simulationInputSignalsToCalculateOutputs
            self.inputsToUpdateStates = self.compileResult.simulationInputSignalsToUpdateStates

            

        # combine all inputs to a list
        self.allInputs = list()

        self.allInputs.extend( self.inputsToCalculateOutputs )
        self.allInputs.extend( self.inputsToUpdateStates )

        #
        # now initialize the propotype
        #

        # define the inputs
        self.update_input_config( self.allInputs )

        # connect the outputs signals
        if self.anonymous_output_signals is not None:

            print(" -- Nesting block: connecting anonymous signals -- ")

            Ns = len(self.outputSignals)

            if not Ns == len(  self.anonymous_output_signals ):
                raise BaseException(" missmatch in the number of output signals")

            for i in range(0,Ns):
                
                s_ananon = self.anonymous_output_signals[i]
                s_source = self.outputSignals[i]

                print("connecting the output " + s_ananon.toStr() + " of the embedding block")
                s_ananon.setequal( s_source )



        # for code generation
        self.instanceVarname = self.getUniqueVarnamePrefix() + '_subsystem_' + self.manifest.API_name


    def config_request_define_output_types(self, inputTypes):

        # the datatypes are fixed in the manifest 
        return self.outputTypes        

    def config_request_define_feedforward_input_dependencies(self, outputSignal):

        # NOTE: This is a simplified veriant so far.. no dependence on the given 'outputSignal'
        #       (Every output depends on every signal in self.inputsToCalculateOutputs)

        # TODO: 6.10.19 implement this in a more granular way.
        # also use self.compileResults to get those information

        return self.inputsToCalculateOutputs

    def config_request_define_state_update_input_dependencies(self, outputSignal):
 
        # return a list of input signals that are required to update the states
        return self.inputsToUpdateStates



    def codegen_addToNamespace(self, language):
        lines = ''

        # putting code for subsystems is performed using execution commands

        return lines

    def generate_code_defStates(self, language):
        if language == 'c++':
            lines = '// instance of ' + self.manifest.API_name + '\n'
            lines += self.manifest.API_name + ' ' + self.instanceVarname + ';\n'

            return lines

    def generate_code_reset(self, language):
        if language == 'c++':
            return self.instanceVarname + '.' + self.manifest.getAPIFunctionName('reset') +  '();\n'


    def generate_code_init(self, language):
        pass


    def generate_code_destruct(self, language):
        pass


    # helper fn to build code
    def generate_code_call_OutputFunction(self, instanceVarname, manifest, language):
        return instanceVarname + '.' + manifest.getAPIFunctionName('calculate_output') +  '(' + cgh.signal_list_to_names_string(self.outputs + self.inputsToCalculateOutputs) + ');\n'

    # helper fn to build code
    def generate_code_call_UpdateFunction(self, instanceVarname, manifest, language):
        return instanceVarname + '.' + manifest.getAPIFunctionName('state_update') +  '(' + cgh.signal_list_to_names_string(self.inputsToUpdateStates) + ');\n'

    def generate_code_output_list(self, language, signals : List [ Signal ] ):

        if language == 'c++':
            lines = ''
            
            #
            # TODO: 2.5.2020: concept: how to compute only the nescessary signals?
            #








            #
            # REWORK: introduce call to fn(Inputs & inputs, Outputs & outputs)
            # and remove the prev. style
            #

            for s in self.outputs: # for each output of the subsystem reservate a variable
                lines += cgh.define_variable_line( s ) 

                if s not in signals:
                    lines += '// NOTE: unused output signal' + s.name + '\n'
                else:
                    lines += ''                

            lines += self.generate_code_call_OutputFunction(self.instanceVarname, self.manifest, language)

        return lines

    def generate_code_update(self, language):
        if language == 'c++':

            # input to this call are the signals in self.inputsToUpdateStates
            return self.generate_code_call_UpdateFunction(self.instanceVarname, self.manifest, language)

Ancestors

Instance variables

var embedded_subsystem

Return the system that is embedded (in case it was provided, returns None otherwise)

Expand source code
@property
def embedded_subsystem(self):
    """
        Return the system that is embedded (in case it was provided, returns None otherwise)
    """

    return self._embedded_subsystem

Methods

def config_request_define_output_types(self, inputTypes)
Expand source code
def config_request_define_output_types(self, inputTypes):

    # the datatypes are fixed in the manifest 
    return self.outputTypes        
def generate_code_call_OutputFunction(self, instanceVarname, manifest, language)
Expand source code
def generate_code_call_OutputFunction(self, instanceVarname, manifest, language):
    return instanceVarname + '.' + manifest.getAPIFunctionName('calculate_output') +  '(' + cgh.signal_list_to_names_string(self.outputs + self.inputsToCalculateOutputs) + ');\n'
def generate_code_call_UpdateFunction(self, instanceVarname, manifest, language)
Expand source code
def generate_code_call_UpdateFunction(self, instanceVarname, manifest, language):
    return instanceVarname + '.' + manifest.getAPIFunctionName('state_update') +  '(' + cgh.signal_list_to_names_string(self.inputsToUpdateStates) + ');\n'
def generate_code_output_list(self, language, signals: List[Signal])
Expand source code
def generate_code_output_list(self, language, signals : List [ Signal ] ):

    if language == 'c++':
        lines = ''
        
        #
        # TODO: 2.5.2020: concept: how to compute only the nescessary signals?
        #








        #
        # REWORK: introduce call to fn(Inputs & inputs, Outputs & outputs)
        # and remove the prev. style
        #

        for s in self.outputs: # for each output of the subsystem reservate a variable
            lines += cgh.define_variable_line( s ) 

            if s not in signals:
                lines += '// NOTE: unused output signal' + s.name + '\n'
            else:
                lines += ''                

        lines += self.generate_code_call_OutputFunction(self.instanceVarname, self.manifest, language)

    return lines
def generate_code_reset(self, language)
Expand source code
def generate_code_reset(self, language):
    if language == 'c++':
        return self.instanceVarname + '.' + self.manifest.getAPIFunctionName('reset') +  '();\n'
def generate_code_update(self, language)
Expand source code
def generate_code_update(self, language):
    if language == 'c++':

        # input to this call are the signals in self.inputsToUpdateStates
        return self.generate_code_call_UpdateFunction(self.instanceVarname, self.manifest, language)
def init(self, manifest, compileResult, inputSignals)

This is a second phase initialization of this subsystem block (to be called by compile_callback_all_subsystems_compiled())

This function shall be called when the subsystem to embed is compiled after the instance of 'GenericSubsystem' is created. This way, it is possible to add blocks embeddeding sub-systems without haveing these subsystems to be already compiled.

Optionally, the system this block belongs to can be set.

manifest - the system manifest of the subsystem to embed compileResults - the compile results of the subsystem to embed inputSignals - input signals to the subsystem to embed (links comming from an upper-level subsystem)

Expand source code
def init(self, manifest, compileResult, inputSignals):
    """
        This is a second phase initialization of this subsystem block 
        (to be called by compile_callback_all_subsystems_compiled())

        This function shall be called when the subsystem to embed is compiled
        after the instance of 'GenericSubsystem' is created. This way, it is possible
        to add blocks embeddeding sub-systems without haveing these subsystems to be
        already compiled.

        Optionally, the system this block belongs to can be set.

        manifest       - the system manifest of the subsystem to embed
        compileResults - the compile results of the subsystem to embed
        inputSignals   - input signals to the subsystem to embed (links comming from an upper-level subsystem)

    """        

    #
    #    set the manifest of the subsystem
    #
    if self.manifest is not None:
        raise BaseException("cannot call this function as the subsystem's manifest was already defined in the constructor.")

    self.manifest = manifest

    #
    #    Set the compilation result of the embedded system (if available)
    #
    self.compileResult = compileResult

    #
    #    connect the inputs (coming from the upper-level system)
    #

    if self.inputSignals is not None:
        raise BaseException("The subsystem's inputSignals were already specified in the constructor.")

    self.inputSignals = inputSignals



    def collectDependingSignals(signals, manifestFunctionInputs):
        # collect all depending input signals (that are needed to calculate the output) in a list
        # MOVE TO A FUNCTION. MAYBE MOVE TO MANIFEST.PY
        dependingInputs = []
        for i in range( len(manifestFunctionInputs['names']) ):

            dependingInput_name = manifestFunctionInputs['names'][i]
            dependingInput_type = manifestFunctionInputs['types'][i]
            dependingInput_cpptype = manifestFunctionInputs['cpptypes'][i]

            # TODO: CHECK FOR FAILING LOOKUP
            signal = signals[ dependingInput_name ]

            # check datatype
            if not signal.getDatatype().cpp_datatype_string == dependingInput_cpptype:
                raise BaseException('datatype does not match the one specified in the manifest. (' + (dependingInput_cpptype) + ' is required in the manifest)' )

            # append signal
            dependingInputs.append( signal ) 

        return dependingInputs



    # verify the number of outputs of the embedded system
    number_of_outputs_as_described_by_manifest = self.manifest.number_of_default_ouputs

    if not number_of_outputs_as_described_by_manifest == self.Noutputs:
        BaseException("missmatch in the number of outputs")

    # get the output datatypes of the embedded system
    self.outputTypes = self.manifest.io_outputs['calculate_output']['types']  


    if self.compileResult is None:
        # collect all depending input signals (that are needed to calculate the output) in a list
        self.inputsToCalculateOutputs = collectDependingSignals( self.inputSignals, self.manifest.io_inputs['calculate_output'] )

        # collect all inputs required to perform the state update
        self.inputsToUpdateStates = collectDependingSignals( self.inputSignals, self.manifest.io_inputs['state_update'] )

    else:
        # use the available compile results to get the I/O signals
        # in this case, self.inputSignals shall be a list of signals. The order
        # shall match the signal order in self.compileResults.inputSignals

        self.inputsToCalculateOutputs = self.compileResult.simulationInputSignalsToCalculateOutputs
        self.inputsToUpdateStates = self.compileResult.simulationInputSignalsToUpdateStates

        

    # combine all inputs to a list
    self.allInputs = list()

    self.allInputs.extend( self.inputsToCalculateOutputs )
    self.allInputs.extend( self.inputsToUpdateStates )

    #
    # now initialize the propotype
    #

    # define the inputs
    self.update_input_config( self.allInputs )

    # connect the outputs signals
    if self.anonymous_output_signals is not None:

        print(" -- Nesting block: connecting anonymous signals -- ")

        Ns = len(self.outputSignals)

        if not Ns == len(  self.anonymous_output_signals ):
            raise BaseException(" missmatch in the number of output signals")

        for i in range(0,Ns):
            
            s_ananon = self.anonymous_output_signals[i]
            s_source = self.outputSignals[i]

            print("connecting the output " + s_ananon.toStr() + " of the embedding block")
            s_ananon.setequal( s_source )



    # for code generation
    self.instanceVarname = self.getUniqueVarnamePrefix() + '_subsystem_' + self.manifest.API_name
def set_anonymous_output_signal_to_connect(self, anonymous_output_signals)

store a list of anonymous signals to connect to the outputs of the subsystems after running the post_compile_callback

Expand source code
def set_anonymous_output_signal_to_connect(self, anonymous_output_signals):
    """
        store a list of anonymous signals to connect to the outputs of the subsystems
        after running the post_compile_callback
    """
    # List of raw signals 
    self.anonymous_output_signals = anonymous_output_signals

Inherited members

class OutputMapEmbeddingBlockToSubsystem (normal_outputs_of_embedding_block, subsystem_prototype)
Expand source code
class OutputMapEmbeddingBlockToSubsystem():

    def __init__(self, normal_outputs_of_embedding_block, subsystem_prototype):

        self._output_signal_mapping, self._output_signal_index_mapping = self.create_output_mapping_table(normal_outputs_of_embedding_block, subsystem_prototype )


    def create_output_mapping_table(self, normal_outputs_of_embedding_block, subsystem_prototype ):
        # output signal mapping: map each output of SingleSubsystemEmbedder to an output of the subsystem
        output_signal_mapping = {}

        # map output signal of embedding block to output index of the embedded block
        output_signal_index_mapping = {}

        for i in range(0, len(normal_outputs_of_embedding_block) ):
            # fill in mapping table
            output_signal_mapping[ normal_outputs_of_embedding_block[i] ] = subsystem_prototype.outputs[i]
            output_signal_index_mapping[ normal_outputs_of_embedding_block[i] ] = i



        return output_signal_mapping, output_signal_index_mapping


    def map(self, output_signals_of_embedding_block):
        """

            given the signals to calculate (variable signals) in the callbacks 

                def generate_code_output_list(self, language, signals : List [ Signal ] ): 

            resolve the mapping to the embedded subsystems output signals

        """

        mapped_subsystem_output_signals = []
        for s in output_signals_of_embedding_block:
            mapped_subsystem_output_signals.append( self._output_signal_mapping[s] )

        return mapped_subsystem_output_signals
    


    def map_to_output_index(self, output_signals_of_embedding_block):
        """
            return a mapping to indices of the block outputs:

            e.g. [sig0, sig1, sig2, sig4] --> [0, 1, 2, 4]
        """

        mapped_subsystem_output_signals = []
        for s in output_signals_of_embedding_block:
            mapped_subsystem_output_signals.append( self._output_signal_index_mapping[s] )

        return mapped_subsystem_output_signals

Methods

def create_output_mapping_table(self, normal_outputs_of_embedding_block, subsystem_prototype)
Expand source code
def create_output_mapping_table(self, normal_outputs_of_embedding_block, subsystem_prototype ):
    # output signal mapping: map each output of SingleSubsystemEmbedder to an output of the subsystem
    output_signal_mapping = {}

    # map output signal of embedding block to output index of the embedded block
    output_signal_index_mapping = {}

    for i in range(0, len(normal_outputs_of_embedding_block) ):
        # fill in mapping table
        output_signal_mapping[ normal_outputs_of_embedding_block[i] ] = subsystem_prototype.outputs[i]
        output_signal_index_mapping[ normal_outputs_of_embedding_block[i] ] = i



    return output_signal_mapping, output_signal_index_mapping
def map(self, output_signals_of_embedding_block)

given the signals to calculate (variable signals) in the callbacks

def generate_code_output_list(self, language, signals : List [ Signal ] ):

resolve the mapping to the embedded subsystems output signals

Expand source code
def map(self, output_signals_of_embedding_block):
    """

        given the signals to calculate (variable signals) in the callbacks 

            def generate_code_output_list(self, language, signals : List [ Signal ] ): 

        resolve the mapping to the embedded subsystems output signals

    """

    mapped_subsystem_output_signals = []
    for s in output_signals_of_embedding_block:
        mapped_subsystem_output_signals.append( self._output_signal_mapping[s] )

    return mapped_subsystem_output_signals
def map_to_output_index(self, output_signals_of_embedding_block)

return a mapping to indices of the block outputs:

e.g. [sig0, sig1, sig2, sig4] –> [0, 1, 2, 4]

Expand source code
def map_to_output_index(self, output_signals_of_embedding_block):
    """
        return a mapping to indices of the block outputs:

        e.g. [sig0, sig1, sig2, sig4] --> [0, 1, 2, 4]
    """

    mapped_subsystem_output_signals = []
    for s in output_signals_of_embedding_block:
        mapped_subsystem_output_signals.append( self._output_signal_index_mapping[s] )

    return mapped_subsystem_output_signals