Module openrtdynamics2.lang.diagram_core.graph_traversion

Expand source code
from .signal_network.signals import *
from .signal_network.Block import *

from typing import Dict, List
from colorama import init,  Fore, Back, Style
init(autoreset=True)



#
# NOTE: this is not used 
#
class graph_traversion2:

    def __init__(self):

        # the list of reachable blocks
        self.reachableBlocks = []




    # Start forward traversion starting from the given startBlock
    def forwardTraverse(self, startBlock : Block):
        self.reachableBlocks = []

        # fill in self.reachableBlocks
        self.forwardTraverse__(startBlock, depthCounter = 0)

        # reset graph traversion markers
        for block in self.reachableBlocks:
            block.graphTraversionMarkerReset()

        return self.reachableBlocks

    # Start forward traversion starting from the given startBlock
    def forwardTraverse__(self, startBlock : Block, depthCounter : int):
        
        tabs = ''
        for i in range(0, depthCounter):
            tabs += '   '

        # print(tabs + "....... depth " + str( depthCounter )  )

        #
        if startBlock.graphTraversionMarkerMarkIsVisited():
            print(tabs + "*** visited *** "  + startBlock.name + " (" + str( startBlock.id ) + ") ****")  ## TODO investigtare: why is this never reached?
            return

        # store this block as it is reachable
        self.reachableBlocks.append( startBlock )

        # make the node as visited
        startBlock.graphTraversionMarkerMarkVisited()

        print(tabs + "-- " + startBlock.name + " (" + str( startBlock.id ) + ") --" )



        # find out the links to other blocks
        for signal in startBlock.getOutputSignals():
            # for each output signal

            print(tabs + "-> S " + signal.name )

            if len( signal.getDestinationBlocks() ) == 0:
                print(tabs + '-- none --')

            for destinationBlock in signal.getDestinationBlocks():
                # destinationBlock is a link to a connected block

                print( tabs + "*", destinationBlock.name, "(", destinationBlock.id, ")"  )

                # recursion
                self.forwardTraverse__( destinationBlock, depthCounter = depthCounter + 1 )


    # Start backward traversion starting from the given startBlock
    #
    # Note this is not fully tested 
    # DELETE SOON, if it is not needed
    #
    def backwardTraverseExec(self, startBlock : Block):
        self.reachableBlocks = []

        # fill in self.reachableBlocks
        self.backwardTraverseExec__(startBlock, depthCounter = 0)

        # reset graph traversion markers
        for block in self.reachableBlocks:
            block.graphTraversionMarkerReset()

        return self.reachableBlocks


    # Start backward traversion starting from the given startBlock
    def backwardTraverseExec__(self, startBlock : Block, depthCounter : int):
        
        tabs = ''
        for i in range(0, depthCounter):
            tabs += '   '

        #print(tabs + "....... depth " + str( depthCounter )  )

        #
        if startBlock.graphTraversionMarkerMarkIsVisited():
            print(tabs + "*** visited *** "  + startBlock.name + " (" + str( startBlock.id ) + ") ****")  ## TODO investigtare: why is this never reached?
            return

        # check of the block 'startBlock'
        #if config_request_define_feedforward_input_dependencies( signal )

        # store this block as it is reachable
        self.reachableBlocks.append( startBlock )

        # make the node as visited
        startBlock.graphTraversionMarkerMarkVisited()

        print(tabs + "--- " + startBlock.name + " (" + str( startBlock.id ) + ") --" )



        # find out the links to other blocks
        for signal in startBlock.getInputSignals():
            # for each output signal


            print(tabs + "-> S " + signal.name )

            if signal.getSourceBlock() is None:
                print(tabs + '-- ERROR: no input signal defined for this block! --')
                
            else:

                print( tabs + "*", signal.getSourceBlock().name, "(", signal.getSourceBlock().id, ")"  )

                self.forwardTraverse__( signal.getSourceBlock(), depthCounter = depthCounter + 1 )








class ExecutionLine():
    """
        This is a data structure
        
        It contains a list 'signalOrder' of signals to be computed in the given order.
        The computation of these signals depends on a list of signals given by
        'dependencySignals'.
    """

    def __init__(
        self,
        signalOrder : List[ Signal ],
        dependencySignals : List[ Signal ],
        dependencySignalsSimulationInputs : List[ Signal ],
        blocksToUpdateStates : List[ Block ],
        dependencySignalsThroughStates : List[ Signal ]
    ):
        self.signalOrder                       = signalOrder
        self.dependencySignals                 = dependencySignals  # TODO: check if this is still needed.
        self.dependencySignalsSimulationInputs = dependencySignalsSimulationInputs
        self.blocksToUpdateStates              = blocksToUpdateStates
        self.dependencySignalsThroughStates    = dependencySignalsThroughStates

    def printExecutionLine(self):
        print("------ print of execution line -----")

        print(Fore.RED + "dependent sources of any kind:")

        for s in self.dependencySignals:
            print("  - " + s.name )

        print(Fore.RED + "dependent sources (simulation inputs):")
                
        for s in self.dependencySignalsSimulationInputs:
            print("  - " + s.name )

        print(Fore.RED + "dependent sources (through state-dependend blocks):")
        
        for s in self.dependencySignalsThroughStates:
            print("  - " + s.name )

        print(Fore.GREEN + "execution order:")

        for s in self.signalOrder:
            print("  - " + s.name )

        print(Fore.GREEN + "blocks whose states must be updated:")

        for block in self.blocksToUpdateStates:
            print("  - " + block.name )


    def getSignalsToExecute(self):
        l = []

        l.extend( self.signalOrder )

        return l


    def appendExecutionLine(self, executionLineToAppend):

        # merge dependencySignals: only add the elements of executionLineToAppend.dependencySignals
        # to self.dependencySignals that are not part of self.dependencySignals or self.signalOrder

        # TODO: to optimize: use sets to merge

        # for s in executionLineToAppend.dependencySignals:
        #     if not s in self.dependencySignals and not s in self.signalOrder:
        #         self.dependencySignals.append(s)


        for s in executionLineToAppend.dependencySignalsSimulationInputs:
            if not s in self.dependencySignalsSimulationInputs and not s in self.signalOrder:
                self.dependencySignalsSimulationInputs.append(s)

        
        for s in executionLineToAppend.dependencySignalsThroughStates:
            if not s in self.dependencySignalsThroughStates and not s in self.signalOrder:
                self.dependencySignalsThroughStates.append(s)

        


        original_list_tmp = self.signalOrder.copy()

        for s in executionLineToAppend.signalOrder:
            # TODO: (for optimization purposes) 
            # check if there common blocks in the list. (only in case a block has more than one
            # output signals and one of these signals is in the list executionLineToAppend.signalOrder
            # and another one in self.signalOrder  )

            # just append the 
            if not s in original_list_tmp:
                self.signalOrder.append( s )

            else:
                print("appendExecutionLine: skipped to add " + s.name)




        original_list_tmp = self.blocksToUpdateStates.copy()

        for b in executionLineToAppend.blocksToUpdateStates:
            # TODO: (for optimization purposes) 
            # check if there comcon blocks in the list. 

            # just append the 
            if not b in original_list_tmp:
                self.blocksToUpdateStates.append( b )






class BuildExecutionPath:
    """
        Find out the order in which signals have to be computed such that a given signal
        'signalToCalculte'can be calculated. This means finding out all dependencies of
        'signalToCalculte'. For each call to 'getExecutionLine' only the signals that
        were not already marked as a dependency in previous calls are returned.
        Each call to 'getExecutionLine' gives an instance 'ExecutionLine'
    """
    def __init__(self, show_print:int=0):

        self._show_print = show_print

        # list of signals the computation depends on (the tips of the execution tree)
        self.dependencySignals = []

        self.dependencySignalsThroughStates = []

        # the list of signals to compute in correct order 
        self.execution_order = []

        # the list of simulation input signals required for the computation
        self.dependencySignalsSimulationInputs = []

        # For each signgal self.execution_order there might be a blocks that
        # has an internal memory. It is required to build a list of those blocks
        # that need a state update after their output(s) are calculated.
        self.blocksToUpdateStates = []

        # list of marked signals (important to reset their visited flags)
        self.markedSignals = []

        # number of calls to getExecutionLine()
        self.level = 0

    def __del__(self):
        # reset the grap markers stored in the signals
        self.resetMarkers()


    def getExecutionLine(self, signalToCalculte : Signal):

        """
            get the order of computation steps and their order that have
            to be performed to compute 'signalToCalculte'
            
            For each call to this function, a list is generated that does not contain
            signals that are already part of a previous list (that are already computed)
            
            This function can be called multiple times and returns only the necessaray 
            computations. Computations already planned in previous calls of this function
            are not listed again. (until resetMarkers() is called)

            -- results --

            self.execution_order contains the list of signals to comute in the correct order including
            the target signals. Not included in this list are signals that cross the border to the simulation
            specified by signalToCalculte.system (coming from an outer system). Further, not included are
            signals that have been computet in a previous call to getExecutionLine().

            self.dependencySignals contains all signals that are required to comput signalToCalculate
            and either cross the border of a simulation, 
        """

        # TODO: dependency signals should stay as theiy are but reachableSignals should not contain signals
        #       that already have been calculated. Further, reachableSignals shall also contain dependency if they 
        #       were not already calculated

        if self._show_print > 0:
            print("getExecutionLine on level " + str(self.level) )

        # reset the lists TODO: use sets instead to avoid duplication?
        self.execution_order = []
        self.dependencySignals = []
        self.dependencySignalsThroughStates = []
        self.dependencySignalsSimulationInputs = []
        self.blocksToUpdateStates = []

        # search within this system
        self.system = signalToCalculte.sim

        # compute by traversing the tree
        self.backwardTraverseSignalsExec__(startSignal=signalToCalculte, depthCounter = 0)

        # the iteration level
        self.level = self.level + 1

        return ExecutionLine( self.execution_order, self.dependencySignals, self.dependencySignalsSimulationInputs, self.blocksToUpdateStates, self.dependencySignalsThroughStates )

    def printExecutionLine(self):
        pass

    def resetMarkers(self):
        # reset graph traversion markers
        for signal in self.markedSignals:
            signal.graphTraversionMarkerReset()

        # reset status variables
        self.markedSignals = []
        self.level = 0

    def place_marker_for_current_level(self, signal):
        # mark the node/signal as being visited (meaning computed)
        signal.graphTraversionMarkerMarkVisited(self.level)
        self.markedSignals.append(signal)

    def isSignalAlreadyComputable(self, signal : Signal):
        return signal.graphTraversionMarkerMarkIsVisited()

    # Start backward traversion starting from the given startSignal
    def backwardTraverseSignalsExec__(self, startSignal : Signal, depthCounter : int, system_context = None):

        #
        # check if the datatype of startSignal is defined
        #

        if startSignal.datatype is None:
            raise BaseException('Unknown datatype for signal ' + startSignal.name + ': no datatype has been specified or could be determined automatically.')


        #
        #
        #

        
        tabs = ''
        for i in range(0, depthCounter):
            tabs += '.  '


        if not (isinstance(startSignal, SimulationInputSignal) or isinstance(startSignal, BlockOutputSignal)):
            
            # this case must be an error..                  
            raise BaseException('not implemented or internal error: unexpected type of signal ' + startSignal.name)

        # check if the signal is a system input signal
        is_crossing_simulation_border = startSignal.is_crossing_system_boundary(self.system) #  self.system != startSignal.sim

        # TODO: IMPLEMENT: except when startSignal is a simulation input (in this case it is not comuted)
        #  and not isinstance(startSignal, SimulationInputSignal)
        if startSignal.graphTraversionMarkerMarkIsVisitedOnLevelLowerThan(self.level):
            # - a previously computed signal has been reached

            if self._show_print > 1:
                print(Style.DIM + tabs + "has already been calculated in a previous traversion") 

            self.dependencySignals.append( startSignal )

            # in case startSignal is a simulation input, still add it to the list of simulation input dependiencies
            # though it has already been computed
            if is_crossing_simulation_border:

                if self._show_print > 1:
                    print(Style.DIM + tabs + "as it is also a simulation input, adding it to the list of depended inputs")

                # also note down that this is a (actually used) simulation input
                self.dependencySignalsSimulationInputs.append( startSignal )

            return

        if startSignal.graphTraversionMarkerMarkIsVisited():

            if self._show_print > 1:
                print(Style.DIM + tabs + "has already been calculated in this traversion") 

            return

        if is_crossing_simulation_border:
            # signal is an input to the simulation
            # add to the list of dependent inputs

            if self._show_print > 1:
                print(Fore.YELLOW + tabs + "  --> crosses system bounds")

            # startSignal is at the top of the tree, so add it to the dependiencies
            self.dependencySignals.append( startSignal )

            # also note down that this is a (actually used) simulation input
            self.dependencySignalsSimulationInputs.append( startSignal )

            if self._show_print > 1:
                print(Style.DIM + tabs + "added input dependency " + startSignal.toStr())

            # mark the node/signal as being visited (meaning computed)
            self.place_marker_for_current_level(startSignal)

            return


        # get the blocks prototype function to calculate startSignal
        block = startSignal.getSourceBlock()
        blocksPrototype = block.getBlockPrototype()

        #
        # check if the block that yields startSignal uses internal-states to compute startSignal
        #

        inputsToUpdateStatesTmp = blocksPrototype.config_request_define_state_update_input_dependencies( startSignal )
        if inputsToUpdateStatesTmp is not None:

            if self._show_print > 1:
                print(tabs + "--- signals needed *indirectly* to compute " + startSignal.name + " (through state update) --" )

            # 
            self.blocksToUpdateStates.append( block )

            # please note: blocksPrototype.config_request_define_state_update_input_dependencies might return some undetermined signals that are resolved here
            resolveUndeterminedSignals( inputsToUpdateStatesTmp )

            # add the signals that are required to perform the state update
            self.dependencySignalsThroughStates.extend( inputsToUpdateStatesTmp )

            if self._show_print > 1:
                for signal in inputsToUpdateStatesTmp:
                    print(Fore.MAGENTA + tabs + "-> S " + signal.name )


        #
        # find out the links to other signals but only these ones that are 
        # needed to calculate 'startSignal'
        #

        if self._show_print > 1:
            print(tabs + "--- signals needed for " + startSignal.name + " --" )

        dependingSignals = blocksPrototype.config_request_define_feedforward_input_dependencies(startSignal)

        # please note: blocksPrototype.config_request_define_feedforward_input_dependencies might return some undetermined signals that are resolved here
        resolveUndeterminedSignals( dependingSignals )

        if len(dependingSignals) == 0:
            # no dependencies to calculate startSignal (e.g. in case of const blocks or blocks without direct feedthrough)
            if self._show_print > 1:
                print(Style.DIM + tabs + "  (no signals needed) "  )

            # block startSignal.getSourceBlock() --> startSignal is a starting point

            # startSignal is at the top of the tree, so add it to the dependencies
            self.dependencySignals.append( startSignal )

            #
            if self._show_print > 1:
                print(Style.DIM + tabs + "added " + startSignal.toStr())
    
            self.execution_order.append( startSignal )

            # mark the node/signal as being visited (meaning computed)
            self.place_marker_for_current_level(startSignal)

            return



        #
        # ITERATE: go through all signals needed to calculate startSignal
        #          only in case there are any, we come to this point
        #

        for signal in dependingSignals:

            if self._show_print > 1:
                print(Fore.MAGENTA + tabs + "-> S " + signal.name )

            self.backwardTraverseSignalsExec__( signal, depthCounter = depthCounter + 1 )

        #
        # FINALIZE: now also startSignal can be computed
        #

        #
        # store startSignal as reachable (put it on the exeution list)
        # NOTE: if startSignal is the tip of the tree (no dependingSignals) it is excluded
        #       from this list. However, it is still in the list of dependencySignals.
        #

        if self._show_print > 1:
            print(Style.DIM + tabs + "added " + startSignal.toStr())

        self.execution_order.append( startSignal )

        # mark the node/signal as being visited (meaning computed)
        self.place_marker_for_current_level(startSignal)

Classes

class BuildExecutionPath (show_print: int = 0)

Find out the order in which signals have to be computed such that a given signal 'signalToCalculte'can be calculated. This means finding out all dependencies of 'signalToCalculte'. For each call to 'getExecutionLine' only the signals that were not already marked as a dependency in previous calls are returned. Each call to 'getExecutionLine' gives an instance 'ExecutionLine'

Expand source code
class BuildExecutionPath:
    """
        Find out the order in which signals have to be computed such that a given signal
        'signalToCalculte'can be calculated. This means finding out all dependencies of
        'signalToCalculte'. For each call to 'getExecutionLine' only the signals that
        were not already marked as a dependency in previous calls are returned.
        Each call to 'getExecutionLine' gives an instance 'ExecutionLine'
    """
    def __init__(self, show_print:int=0):

        self._show_print = show_print

        # list of signals the computation depends on (the tips of the execution tree)
        self.dependencySignals = []

        self.dependencySignalsThroughStates = []

        # the list of signals to compute in correct order 
        self.execution_order = []

        # the list of simulation input signals required for the computation
        self.dependencySignalsSimulationInputs = []

        # For each signgal self.execution_order there might be a blocks that
        # has an internal memory. It is required to build a list of those blocks
        # that need a state update after their output(s) are calculated.
        self.blocksToUpdateStates = []

        # list of marked signals (important to reset their visited flags)
        self.markedSignals = []

        # number of calls to getExecutionLine()
        self.level = 0

    def __del__(self):
        # reset the grap markers stored in the signals
        self.resetMarkers()


    def getExecutionLine(self, signalToCalculte : Signal):

        """
            get the order of computation steps and their order that have
            to be performed to compute 'signalToCalculte'
            
            For each call to this function, a list is generated that does not contain
            signals that are already part of a previous list (that are already computed)
            
            This function can be called multiple times and returns only the necessaray 
            computations. Computations already planned in previous calls of this function
            are not listed again. (until resetMarkers() is called)

            -- results --

            self.execution_order contains the list of signals to comute in the correct order including
            the target signals. Not included in this list are signals that cross the border to the simulation
            specified by signalToCalculte.system (coming from an outer system). Further, not included are
            signals that have been computet in a previous call to getExecutionLine().

            self.dependencySignals contains all signals that are required to comput signalToCalculate
            and either cross the border of a simulation, 
        """

        # TODO: dependency signals should stay as theiy are but reachableSignals should not contain signals
        #       that already have been calculated. Further, reachableSignals shall also contain dependency if they 
        #       were not already calculated

        if self._show_print > 0:
            print("getExecutionLine on level " + str(self.level) )

        # reset the lists TODO: use sets instead to avoid duplication?
        self.execution_order = []
        self.dependencySignals = []
        self.dependencySignalsThroughStates = []
        self.dependencySignalsSimulationInputs = []
        self.blocksToUpdateStates = []

        # search within this system
        self.system = signalToCalculte.sim

        # compute by traversing the tree
        self.backwardTraverseSignalsExec__(startSignal=signalToCalculte, depthCounter = 0)

        # the iteration level
        self.level = self.level + 1

        return ExecutionLine( self.execution_order, self.dependencySignals, self.dependencySignalsSimulationInputs, self.blocksToUpdateStates, self.dependencySignalsThroughStates )

    def printExecutionLine(self):
        pass

    def resetMarkers(self):
        # reset graph traversion markers
        for signal in self.markedSignals:
            signal.graphTraversionMarkerReset()

        # reset status variables
        self.markedSignals = []
        self.level = 0

    def place_marker_for_current_level(self, signal):
        # mark the node/signal as being visited (meaning computed)
        signal.graphTraversionMarkerMarkVisited(self.level)
        self.markedSignals.append(signal)

    def isSignalAlreadyComputable(self, signal : Signal):
        return signal.graphTraversionMarkerMarkIsVisited()

    # Start backward traversion starting from the given startSignal
    def backwardTraverseSignalsExec__(self, startSignal : Signal, depthCounter : int, system_context = None):

        #
        # check if the datatype of startSignal is defined
        #

        if startSignal.datatype is None:
            raise BaseException('Unknown datatype for signal ' + startSignal.name + ': no datatype has been specified or could be determined automatically.')


        #
        #
        #

        
        tabs = ''
        for i in range(0, depthCounter):
            tabs += '.  '


        if not (isinstance(startSignal, SimulationInputSignal) or isinstance(startSignal, BlockOutputSignal)):
            
            # this case must be an error..                  
            raise BaseException('not implemented or internal error: unexpected type of signal ' + startSignal.name)

        # check if the signal is a system input signal
        is_crossing_simulation_border = startSignal.is_crossing_system_boundary(self.system) #  self.system != startSignal.sim

        # TODO: IMPLEMENT: except when startSignal is a simulation input (in this case it is not comuted)
        #  and not isinstance(startSignal, SimulationInputSignal)
        if startSignal.graphTraversionMarkerMarkIsVisitedOnLevelLowerThan(self.level):
            # - a previously computed signal has been reached

            if self._show_print > 1:
                print(Style.DIM + tabs + "has already been calculated in a previous traversion") 

            self.dependencySignals.append( startSignal )

            # in case startSignal is a simulation input, still add it to the list of simulation input dependiencies
            # though it has already been computed
            if is_crossing_simulation_border:

                if self._show_print > 1:
                    print(Style.DIM + tabs + "as it is also a simulation input, adding it to the list of depended inputs")

                # also note down that this is a (actually used) simulation input
                self.dependencySignalsSimulationInputs.append( startSignal )

            return

        if startSignal.graphTraversionMarkerMarkIsVisited():

            if self._show_print > 1:
                print(Style.DIM + tabs + "has already been calculated in this traversion") 

            return

        if is_crossing_simulation_border:
            # signal is an input to the simulation
            # add to the list of dependent inputs

            if self._show_print > 1:
                print(Fore.YELLOW + tabs + "  --> crosses system bounds")

            # startSignal is at the top of the tree, so add it to the dependiencies
            self.dependencySignals.append( startSignal )

            # also note down that this is a (actually used) simulation input
            self.dependencySignalsSimulationInputs.append( startSignal )

            if self._show_print > 1:
                print(Style.DIM + tabs + "added input dependency " + startSignal.toStr())

            # mark the node/signal as being visited (meaning computed)
            self.place_marker_for_current_level(startSignal)

            return


        # get the blocks prototype function to calculate startSignal
        block = startSignal.getSourceBlock()
        blocksPrototype = block.getBlockPrototype()

        #
        # check if the block that yields startSignal uses internal-states to compute startSignal
        #

        inputsToUpdateStatesTmp = blocksPrototype.config_request_define_state_update_input_dependencies( startSignal )
        if inputsToUpdateStatesTmp is not None:

            if self._show_print > 1:
                print(tabs + "--- signals needed *indirectly* to compute " + startSignal.name + " (through state update) --" )

            # 
            self.blocksToUpdateStates.append( block )

            # please note: blocksPrototype.config_request_define_state_update_input_dependencies might return some undetermined signals that are resolved here
            resolveUndeterminedSignals( inputsToUpdateStatesTmp )

            # add the signals that are required to perform the state update
            self.dependencySignalsThroughStates.extend( inputsToUpdateStatesTmp )

            if self._show_print > 1:
                for signal in inputsToUpdateStatesTmp:
                    print(Fore.MAGENTA + tabs + "-> S " + signal.name )


        #
        # find out the links to other signals but only these ones that are 
        # needed to calculate 'startSignal'
        #

        if self._show_print > 1:
            print(tabs + "--- signals needed for " + startSignal.name + " --" )

        dependingSignals = blocksPrototype.config_request_define_feedforward_input_dependencies(startSignal)

        # please note: blocksPrototype.config_request_define_feedforward_input_dependencies might return some undetermined signals that are resolved here
        resolveUndeterminedSignals( dependingSignals )

        if len(dependingSignals) == 0:
            # no dependencies to calculate startSignal (e.g. in case of const blocks or blocks without direct feedthrough)
            if self._show_print > 1:
                print(Style.DIM + tabs + "  (no signals needed) "  )

            # block startSignal.getSourceBlock() --> startSignal is a starting point

            # startSignal is at the top of the tree, so add it to the dependencies
            self.dependencySignals.append( startSignal )

            #
            if self._show_print > 1:
                print(Style.DIM + tabs + "added " + startSignal.toStr())
    
            self.execution_order.append( startSignal )

            # mark the node/signal as being visited (meaning computed)
            self.place_marker_for_current_level(startSignal)

            return



        #
        # ITERATE: go through all signals needed to calculate startSignal
        #          only in case there are any, we come to this point
        #

        for signal in dependingSignals:

            if self._show_print > 1:
                print(Fore.MAGENTA + tabs + "-> S " + signal.name )

            self.backwardTraverseSignalsExec__( signal, depthCounter = depthCounter + 1 )

        #
        # FINALIZE: now also startSignal can be computed
        #

        #
        # store startSignal as reachable (put it on the exeution list)
        # NOTE: if startSignal is the tip of the tree (no dependingSignals) it is excluded
        #       from this list. However, it is still in the list of dependencySignals.
        #

        if self._show_print > 1:
            print(Style.DIM + tabs + "added " + startSignal.toStr())

        self.execution_order.append( startSignal )

        # mark the node/signal as being visited (meaning computed)
        self.place_marker_for_current_level(startSignal)

Methods

def backwardTraverseSignalsExec__(self, startSignal: Signal, depthCounter: int, system_context=None)
Expand source code
def backwardTraverseSignalsExec__(self, startSignal : Signal, depthCounter : int, system_context = None):

    #
    # check if the datatype of startSignal is defined
    #

    if startSignal.datatype is None:
        raise BaseException('Unknown datatype for signal ' + startSignal.name + ': no datatype has been specified or could be determined automatically.')


    #
    #
    #

    
    tabs = ''
    for i in range(0, depthCounter):
        tabs += '.  '


    if not (isinstance(startSignal, SimulationInputSignal) or isinstance(startSignal, BlockOutputSignal)):
        
        # this case must be an error..                  
        raise BaseException('not implemented or internal error: unexpected type of signal ' + startSignal.name)

    # check if the signal is a system input signal
    is_crossing_simulation_border = startSignal.is_crossing_system_boundary(self.system) #  self.system != startSignal.sim

    # TODO: IMPLEMENT: except when startSignal is a simulation input (in this case it is not comuted)
    #  and not isinstance(startSignal, SimulationInputSignal)
    if startSignal.graphTraversionMarkerMarkIsVisitedOnLevelLowerThan(self.level):
        # - a previously computed signal has been reached

        if self._show_print > 1:
            print(Style.DIM + tabs + "has already been calculated in a previous traversion") 

        self.dependencySignals.append( startSignal )

        # in case startSignal is a simulation input, still add it to the list of simulation input dependiencies
        # though it has already been computed
        if is_crossing_simulation_border:

            if self._show_print > 1:
                print(Style.DIM + tabs + "as it is also a simulation input, adding it to the list of depended inputs")

            # also note down that this is a (actually used) simulation input
            self.dependencySignalsSimulationInputs.append( startSignal )

        return

    if startSignal.graphTraversionMarkerMarkIsVisited():

        if self._show_print > 1:
            print(Style.DIM + tabs + "has already been calculated in this traversion") 

        return

    if is_crossing_simulation_border:
        # signal is an input to the simulation
        # add to the list of dependent inputs

        if self._show_print > 1:
            print(Fore.YELLOW + tabs + "  --> crosses system bounds")

        # startSignal is at the top of the tree, so add it to the dependiencies
        self.dependencySignals.append( startSignal )

        # also note down that this is a (actually used) simulation input
        self.dependencySignalsSimulationInputs.append( startSignal )

        if self._show_print > 1:
            print(Style.DIM + tabs + "added input dependency " + startSignal.toStr())

        # mark the node/signal as being visited (meaning computed)
        self.place_marker_for_current_level(startSignal)

        return


    # get the blocks prototype function to calculate startSignal
    block = startSignal.getSourceBlock()
    blocksPrototype = block.getBlockPrototype()

    #
    # check if the block that yields startSignal uses internal-states to compute startSignal
    #

    inputsToUpdateStatesTmp = blocksPrototype.config_request_define_state_update_input_dependencies( startSignal )
    if inputsToUpdateStatesTmp is not None:

        if self._show_print > 1:
            print(tabs + "--- signals needed *indirectly* to compute " + startSignal.name + " (through state update) --" )

        # 
        self.blocksToUpdateStates.append( block )

        # please note: blocksPrototype.config_request_define_state_update_input_dependencies might return some undetermined signals that are resolved here
        resolveUndeterminedSignals( inputsToUpdateStatesTmp )

        # add the signals that are required to perform the state update
        self.dependencySignalsThroughStates.extend( inputsToUpdateStatesTmp )

        if self._show_print > 1:
            for signal in inputsToUpdateStatesTmp:
                print(Fore.MAGENTA + tabs + "-> S " + signal.name )


    #
    # find out the links to other signals but only these ones that are 
    # needed to calculate 'startSignal'
    #

    if self._show_print > 1:
        print(tabs + "--- signals needed for " + startSignal.name + " --" )

    dependingSignals = blocksPrototype.config_request_define_feedforward_input_dependencies(startSignal)

    # please note: blocksPrototype.config_request_define_feedforward_input_dependencies might return some undetermined signals that are resolved here
    resolveUndeterminedSignals( dependingSignals )

    if len(dependingSignals) == 0:
        # no dependencies to calculate startSignal (e.g. in case of const blocks or blocks without direct feedthrough)
        if self._show_print > 1:
            print(Style.DIM + tabs + "  (no signals needed) "  )

        # block startSignal.getSourceBlock() --> startSignal is a starting point

        # startSignal is at the top of the tree, so add it to the dependencies
        self.dependencySignals.append( startSignal )

        #
        if self._show_print > 1:
            print(Style.DIM + tabs + "added " + startSignal.toStr())

        self.execution_order.append( startSignal )

        # mark the node/signal as being visited (meaning computed)
        self.place_marker_for_current_level(startSignal)

        return



    #
    # ITERATE: go through all signals needed to calculate startSignal
    #          only in case there are any, we come to this point
    #

    for signal in dependingSignals:

        if self._show_print > 1:
            print(Fore.MAGENTA + tabs + "-> S " + signal.name )

        self.backwardTraverseSignalsExec__( signal, depthCounter = depthCounter + 1 )

    #
    # FINALIZE: now also startSignal can be computed
    #

    #
    # store startSignal as reachable (put it on the exeution list)
    # NOTE: if startSignal is the tip of the tree (no dependingSignals) it is excluded
    #       from this list. However, it is still in the list of dependencySignals.
    #

    if self._show_print > 1:
        print(Style.DIM + tabs + "added " + startSignal.toStr())

    self.execution_order.append( startSignal )

    # mark the node/signal as being visited (meaning computed)
    self.place_marker_for_current_level(startSignal)
def getExecutionLine(self, signalToCalculte: Signal)

get the order of computation steps and their order that have to be performed to compute 'signalToCalculte'

For each call to this function, a list is generated that does not contain signals that are already part of a previous list (that are already computed)

This function can be called multiple times and returns only the necessaray computations. Computations already planned in previous calls of this function are not listed again. (until resetMarkers() is called)

– results –

self.execution_order contains the list of signals to comute in the correct order including the target signals. Not included in this list are signals that cross the border to the simulation specified by signalToCalculte.system (coming from an outer system). Further, not included are signals that have been computet in a previous call to getExecutionLine().

self.dependencySignals contains all signals that are required to comput signalToCalculate and either cross the border of a simulation,

Expand source code
def getExecutionLine(self, signalToCalculte : Signal):

    """
        get the order of computation steps and their order that have
        to be performed to compute 'signalToCalculte'
        
        For each call to this function, a list is generated that does not contain
        signals that are already part of a previous list (that are already computed)
        
        This function can be called multiple times and returns only the necessaray 
        computations. Computations already planned in previous calls of this function
        are not listed again. (until resetMarkers() is called)

        -- results --

        self.execution_order contains the list of signals to comute in the correct order including
        the target signals. Not included in this list are signals that cross the border to the simulation
        specified by signalToCalculte.system (coming from an outer system). Further, not included are
        signals that have been computet in a previous call to getExecutionLine().

        self.dependencySignals contains all signals that are required to comput signalToCalculate
        and either cross the border of a simulation, 
    """

    # TODO: dependency signals should stay as theiy are but reachableSignals should not contain signals
    #       that already have been calculated. Further, reachableSignals shall also contain dependency if they 
    #       were not already calculated

    if self._show_print > 0:
        print("getExecutionLine on level " + str(self.level) )

    # reset the lists TODO: use sets instead to avoid duplication?
    self.execution_order = []
    self.dependencySignals = []
    self.dependencySignalsThroughStates = []
    self.dependencySignalsSimulationInputs = []
    self.blocksToUpdateStates = []

    # search within this system
    self.system = signalToCalculte.sim

    # compute by traversing the tree
    self.backwardTraverseSignalsExec__(startSignal=signalToCalculte, depthCounter = 0)

    # the iteration level
    self.level = self.level + 1

    return ExecutionLine( self.execution_order, self.dependencySignals, self.dependencySignalsSimulationInputs, self.blocksToUpdateStates, self.dependencySignalsThroughStates )
def isSignalAlreadyComputable(self, signal: Signal)
Expand source code
def isSignalAlreadyComputable(self, signal : Signal):
    return signal.graphTraversionMarkerMarkIsVisited()
def place_marker_for_current_level(self, signal)
Expand source code
def place_marker_for_current_level(self, signal):
    # mark the node/signal as being visited (meaning computed)
    signal.graphTraversionMarkerMarkVisited(self.level)
    self.markedSignals.append(signal)
def printExecutionLine(self)
Expand source code
def printExecutionLine(self):
    pass
def resetMarkers(self)
Expand source code
def resetMarkers(self):
    # reset graph traversion markers
    for signal in self.markedSignals:
        signal.graphTraversionMarkerReset()

    # reset status variables
    self.markedSignals = []
    self.level = 0
class ExecutionLine (signalOrder: List[Signal], dependencySignals: List[Signal], dependencySignalsSimulationInputs: List[Signal], blocksToUpdateStates: List[Block], dependencySignalsThroughStates: List[Signal])

This is a data structure

It contains a list 'signalOrder' of signals to be computed in the given order. The computation of these signals depends on a list of signals given by 'dependencySignals'.

Expand source code
class ExecutionLine():
    """
        This is a data structure
        
        It contains a list 'signalOrder' of signals to be computed in the given order.
        The computation of these signals depends on a list of signals given by
        'dependencySignals'.
    """

    def __init__(
        self,
        signalOrder : List[ Signal ],
        dependencySignals : List[ Signal ],
        dependencySignalsSimulationInputs : List[ Signal ],
        blocksToUpdateStates : List[ Block ],
        dependencySignalsThroughStates : List[ Signal ]
    ):
        self.signalOrder                       = signalOrder
        self.dependencySignals                 = dependencySignals  # TODO: check if this is still needed.
        self.dependencySignalsSimulationInputs = dependencySignalsSimulationInputs
        self.blocksToUpdateStates              = blocksToUpdateStates
        self.dependencySignalsThroughStates    = dependencySignalsThroughStates

    def printExecutionLine(self):
        print("------ print of execution line -----")

        print(Fore.RED + "dependent sources of any kind:")

        for s in self.dependencySignals:
            print("  - " + s.name )

        print(Fore.RED + "dependent sources (simulation inputs):")
                
        for s in self.dependencySignalsSimulationInputs:
            print("  - " + s.name )

        print(Fore.RED + "dependent sources (through state-dependend blocks):")
        
        for s in self.dependencySignalsThroughStates:
            print("  - " + s.name )

        print(Fore.GREEN + "execution order:")

        for s in self.signalOrder:
            print("  - " + s.name )

        print(Fore.GREEN + "blocks whose states must be updated:")

        for block in self.blocksToUpdateStates:
            print("  - " + block.name )


    def getSignalsToExecute(self):
        l = []

        l.extend( self.signalOrder )

        return l


    def appendExecutionLine(self, executionLineToAppend):

        # merge dependencySignals: only add the elements of executionLineToAppend.dependencySignals
        # to self.dependencySignals that are not part of self.dependencySignals or self.signalOrder

        # TODO: to optimize: use sets to merge

        # for s in executionLineToAppend.dependencySignals:
        #     if not s in self.dependencySignals and not s in self.signalOrder:
        #         self.dependencySignals.append(s)


        for s in executionLineToAppend.dependencySignalsSimulationInputs:
            if not s in self.dependencySignalsSimulationInputs and not s in self.signalOrder:
                self.dependencySignalsSimulationInputs.append(s)

        
        for s in executionLineToAppend.dependencySignalsThroughStates:
            if not s in self.dependencySignalsThroughStates and not s in self.signalOrder:
                self.dependencySignalsThroughStates.append(s)

        


        original_list_tmp = self.signalOrder.copy()

        for s in executionLineToAppend.signalOrder:
            # TODO: (for optimization purposes) 
            # check if there common blocks in the list. (only in case a block has more than one
            # output signals and one of these signals is in the list executionLineToAppend.signalOrder
            # and another one in self.signalOrder  )

            # just append the 
            if not s in original_list_tmp:
                self.signalOrder.append( s )

            else:
                print("appendExecutionLine: skipped to add " + s.name)




        original_list_tmp = self.blocksToUpdateStates.copy()

        for b in executionLineToAppend.blocksToUpdateStates:
            # TODO: (for optimization purposes) 
            # check if there comcon blocks in the list. 

            # just append the 
            if not b in original_list_tmp:
                self.blocksToUpdateStates.append( b )

Methods

def appendExecutionLine(self, executionLineToAppend)
Expand source code
def appendExecutionLine(self, executionLineToAppend):

    # merge dependencySignals: only add the elements of executionLineToAppend.dependencySignals
    # to self.dependencySignals that are not part of self.dependencySignals or self.signalOrder

    # TODO: to optimize: use sets to merge

    # for s in executionLineToAppend.dependencySignals:
    #     if not s in self.dependencySignals and not s in self.signalOrder:
    #         self.dependencySignals.append(s)


    for s in executionLineToAppend.dependencySignalsSimulationInputs:
        if not s in self.dependencySignalsSimulationInputs and not s in self.signalOrder:
            self.dependencySignalsSimulationInputs.append(s)

    
    for s in executionLineToAppend.dependencySignalsThroughStates:
        if not s in self.dependencySignalsThroughStates and not s in self.signalOrder:
            self.dependencySignalsThroughStates.append(s)

    


    original_list_tmp = self.signalOrder.copy()

    for s in executionLineToAppend.signalOrder:
        # TODO: (for optimization purposes) 
        # check if there common blocks in the list. (only in case a block has more than one
        # output signals and one of these signals is in the list executionLineToAppend.signalOrder
        # and another one in self.signalOrder  )

        # just append the 
        if not s in original_list_tmp:
            self.signalOrder.append( s )

        else:
            print("appendExecutionLine: skipped to add " + s.name)




    original_list_tmp = self.blocksToUpdateStates.copy()

    for b in executionLineToAppend.blocksToUpdateStates:
        # TODO: (for optimization purposes) 
        # check if there comcon blocks in the list. 

        # just append the 
        if not b in original_list_tmp:
            self.blocksToUpdateStates.append( b )
def getSignalsToExecute(self)
Expand source code
def getSignalsToExecute(self):
    l = []

    l.extend( self.signalOrder )

    return l
def printExecutionLine(self)
Expand source code
def printExecutionLine(self):
    print("------ print of execution line -----")

    print(Fore.RED + "dependent sources of any kind:")

    for s in self.dependencySignals:
        print("  - " + s.name )

    print(Fore.RED + "dependent sources (simulation inputs):")
            
    for s in self.dependencySignalsSimulationInputs:
        print("  - " + s.name )

    print(Fore.RED + "dependent sources (through state-dependend blocks):")
    
    for s in self.dependencySignalsThroughStates:
        print("  - " + s.name )

    print(Fore.GREEN + "execution order:")

    for s in self.signalOrder:
        print("  - " + s.name )

    print(Fore.GREEN + "blocks whose states must be updated:")

    for block in self.blocksToUpdateStates:
        print("  - " + block.name )
class graph_traversion2
Expand source code
class graph_traversion2:

    def __init__(self):

        # the list of reachable blocks
        self.reachableBlocks = []




    # Start forward traversion starting from the given startBlock
    def forwardTraverse(self, startBlock : Block):
        self.reachableBlocks = []

        # fill in self.reachableBlocks
        self.forwardTraverse__(startBlock, depthCounter = 0)

        # reset graph traversion markers
        for block in self.reachableBlocks:
            block.graphTraversionMarkerReset()

        return self.reachableBlocks

    # Start forward traversion starting from the given startBlock
    def forwardTraverse__(self, startBlock : Block, depthCounter : int):
        
        tabs = ''
        for i in range(0, depthCounter):
            tabs += '   '

        # print(tabs + "....... depth " + str( depthCounter )  )

        #
        if startBlock.graphTraversionMarkerMarkIsVisited():
            print(tabs + "*** visited *** "  + startBlock.name + " (" + str( startBlock.id ) + ") ****")  ## TODO investigtare: why is this never reached?
            return

        # store this block as it is reachable
        self.reachableBlocks.append( startBlock )

        # make the node as visited
        startBlock.graphTraversionMarkerMarkVisited()

        print(tabs + "-- " + startBlock.name + " (" + str( startBlock.id ) + ") --" )



        # find out the links to other blocks
        for signal in startBlock.getOutputSignals():
            # for each output signal

            print(tabs + "-> S " + signal.name )

            if len( signal.getDestinationBlocks() ) == 0:
                print(tabs + '-- none --')

            for destinationBlock in signal.getDestinationBlocks():
                # destinationBlock is a link to a connected block

                print( tabs + "*", destinationBlock.name, "(", destinationBlock.id, ")"  )

                # recursion
                self.forwardTraverse__( destinationBlock, depthCounter = depthCounter + 1 )


    # Start backward traversion starting from the given startBlock
    #
    # Note this is not fully tested 
    # DELETE SOON, if it is not needed
    #
    def backwardTraverseExec(self, startBlock : Block):
        self.reachableBlocks = []

        # fill in self.reachableBlocks
        self.backwardTraverseExec__(startBlock, depthCounter = 0)

        # reset graph traversion markers
        for block in self.reachableBlocks:
            block.graphTraversionMarkerReset()

        return self.reachableBlocks


    # Start backward traversion starting from the given startBlock
    def backwardTraverseExec__(self, startBlock : Block, depthCounter : int):
        
        tabs = ''
        for i in range(0, depthCounter):
            tabs += '   '

        #print(tabs + "....... depth " + str( depthCounter )  )

        #
        if startBlock.graphTraversionMarkerMarkIsVisited():
            print(tabs + "*** visited *** "  + startBlock.name + " (" + str( startBlock.id ) + ") ****")  ## TODO investigtare: why is this never reached?
            return

        # check of the block 'startBlock'
        #if config_request_define_feedforward_input_dependencies( signal )

        # store this block as it is reachable
        self.reachableBlocks.append( startBlock )

        # make the node as visited
        startBlock.graphTraversionMarkerMarkVisited()

        print(tabs + "--- " + startBlock.name + " (" + str( startBlock.id ) + ") --" )



        # find out the links to other blocks
        for signal in startBlock.getInputSignals():
            # for each output signal


            print(tabs + "-> S " + signal.name )

            if signal.getSourceBlock() is None:
                print(tabs + '-- ERROR: no input signal defined for this block! --')
                
            else:

                print( tabs + "*", signal.getSourceBlock().name, "(", signal.getSourceBlock().id, ")"  )

                self.forwardTraverse__( signal.getSourceBlock(), depthCounter = depthCounter + 1 )

Methods

def backwardTraverseExec(self, startBlock: Block)
Expand source code
def backwardTraverseExec(self, startBlock : Block):
    self.reachableBlocks = []

    # fill in self.reachableBlocks
    self.backwardTraverseExec__(startBlock, depthCounter = 0)

    # reset graph traversion markers
    for block in self.reachableBlocks:
        block.graphTraversionMarkerReset()

    return self.reachableBlocks
def backwardTraverseExec__(self, startBlock: Block, depthCounter: int)
Expand source code
def backwardTraverseExec__(self, startBlock : Block, depthCounter : int):
    
    tabs = ''
    for i in range(0, depthCounter):
        tabs += '   '

    #print(tabs + "....... depth " + str( depthCounter )  )

    #
    if startBlock.graphTraversionMarkerMarkIsVisited():
        print(tabs + "*** visited *** "  + startBlock.name + " (" + str( startBlock.id ) + ") ****")  ## TODO investigtare: why is this never reached?
        return

    # check of the block 'startBlock'
    #if config_request_define_feedforward_input_dependencies( signal )

    # store this block as it is reachable
    self.reachableBlocks.append( startBlock )

    # make the node as visited
    startBlock.graphTraversionMarkerMarkVisited()

    print(tabs + "--- " + startBlock.name + " (" + str( startBlock.id ) + ") --" )



    # find out the links to other blocks
    for signal in startBlock.getInputSignals():
        # for each output signal


        print(tabs + "-> S " + signal.name )

        if signal.getSourceBlock() is None:
            print(tabs + '-- ERROR: no input signal defined for this block! --')
            
        else:

            print( tabs + "*", signal.getSourceBlock().name, "(", signal.getSourceBlock().id, ")"  )

            self.forwardTraverse__( signal.getSourceBlock(), depthCounter = depthCounter + 1 )
def forwardTraverse(self, startBlock: Block)
Expand source code
def forwardTraverse(self, startBlock : Block):
    self.reachableBlocks = []

    # fill in self.reachableBlocks
    self.forwardTraverse__(startBlock, depthCounter = 0)

    # reset graph traversion markers
    for block in self.reachableBlocks:
        block.graphTraversionMarkerReset()

    return self.reachableBlocks
def forwardTraverse__(self, startBlock: Block, depthCounter: int)
Expand source code
def forwardTraverse__(self, startBlock : Block, depthCounter : int):
    
    tabs = ''
    for i in range(0, depthCounter):
        tabs += '   '

    # print(tabs + "....... depth " + str( depthCounter )  )

    #
    if startBlock.graphTraversionMarkerMarkIsVisited():
        print(tabs + "*** visited *** "  + startBlock.name + " (" + str( startBlock.id ) + ") ****")  ## TODO investigtare: why is this never reached?
        return

    # store this block as it is reachable
    self.reachableBlocks.append( startBlock )

    # make the node as visited
    startBlock.graphTraversionMarkerMarkVisited()

    print(tabs + "-- " + startBlock.name + " (" + str( startBlock.id ) + ") --" )



    # find out the links to other blocks
    for signal in startBlock.getOutputSignals():
        # for each output signal

        print(tabs + "-> S " + signal.name )

        if len( signal.getDestinationBlocks() ) == 0:
            print(tabs + '-- none --')

        for destinationBlock in signal.getDestinationBlocks():
            # destinationBlock is a link to a connected block

            print( tabs + "*", destinationBlock.name, "(", destinationBlock.id, ")"  )

            # recursion
            self.forwardTraverse__( destinationBlock, depthCounter = depthCounter + 1 )