Module openrtdynamics2.lang.subsystems

Expand source code
from typing import Dict, List

from . import lang as dy
from . import block_prototypes as bp
from . import signal_interface as si
from .system_context import generate_subsystem_name, enter_subsystem
from .diagram_core.signal_network.signals import Signal, UndeterminedSignal, BlockOutputSignal, SimulationInputSignal

"""
    User interface for subsystems 
"""



# class sub:
#     def __init__(self, subsystem_name = None ):

#         if subsystem_name is not None:
#             self._subsystem_name = subsystem_name
#         else:
#             self._subsystem_name = generate_subsystem_name()

#         self._outputs_of_embeded_subsystem = []
#         self._outputs_of_embedding_block = []

#     def add_output(self, output_signal : dy.SignalUserTemplate):

#         self._outputs_of_embeded_subsystem.append(output_signal)


#         # use SubsystemOutputLink to generate a new signal to be used outside of the subsystem
#         # This creates a link output_signal_of_embedding_system --> output_signal
#         output_signal_of_embedding_system = dy.SubsystemOutputLinkUser( dy.get_system_context().upper_level_system, output_signal )

#         # inherit datatype from output_signal
#         output_signal_of_embedding_system.inherit_datatype( output_signal )

#         self._outputs_of_embedding_block.append( output_signal_of_embedding_system.unwrap )

#         return output_signal_of_embedding_system

#         return None

#     def set_outputs(self, signals):
#         """
#             connect the list of outputs
#         """
#         return_signals = []

#         for s in signals:
#             return_signals.append( self.add_output( s ) )

#         return return_signals




#     def __enter__(self):

#         enter_subsystem(self._subsystem_name )

#         return self


#     def __exit__(self, type, value, traceback):

#         # set the outputs of the system
#         dy.get_system_context().set_primary_outputs( si.unwrap_list( self._outputs_of_embeded_subsystem ) )

#         # Please note: in case it is really necessary to specify a system != None here, use the upper-level system
#         # not the embedded one.

#         # store an embeeder prototype (as generated by bp.GenericSubsystem) into the date structure of the subsystem
#         embeddedingBlockPrototype = bp.GenericSubsystem( sim=dy.get_system_context().upper_level_system, 
#                                                         inputSignals=None, manifest=None, 
#                                                         additionalInputs=None )

#         dy.get_system_context().embeddedingBlockPrototype = embeddedingBlockPrototype

#         #
#         # Link output_signal_of_embedding_system to the outputs created by bp.GenericSubsystem
#         #

#         embeddedingBlockPrototype.set_anonymous_output_signal_to_connect(   self._outputs_of_embedding_block  )


#         # TODO: add a system wrapper/embedded (e.g. this if-block) to leave_system
#         dy.leave_system()









class sub_if:
    """

        NOTE: in case the if condition is false, the outputs are hold. Eventally uninitialized.
    """


    def __init__(self, condition_signal : dy.SignalUserTemplate, subsystem_name = None, prevent_output_computation = False ):

        if subsystem_name is not None:
            self._subsystem_name = subsystem_name
        else:
            self._subsystem_name = generate_subsystem_name()

        self._condition_signal = condition_signal
        self._prevent_output_computation = prevent_output_computation

        # 
        self._outputs_of_embeded_subsystem = []

        # outputs (links to the subsystem outputs) to be used by the user
        self._output_links = None


    def set_outputs(self, signals):
        self._outputs_of_embeded_subsystem = signals.copy()

    def __enter__(self):
        self._system = enter_subsystem(self._subsystem_name )

        return self


    def __exit__(self, type, value, traceback):

        embedded_subsystem = dy.get_system_context()

        # set the outputs of the system
        embedded_subsystem.set_primary_outputs( si.unwrap_list( self._outputs_of_embeded_subsystem ) )

        # create generic subsystem block prototype
        self._subsystem_block_prototype = bp.GenericSubsystem( sim=embedded_subsystem.upper_level_system, 
                                                    manifest=None, inputSignals=None, 
                                                    embedded_subsystem=embedded_subsystem,
                                                    N_outputs=len(self._outputs_of_embeded_subsystem) )

        # leave the context of the subsystem
        dy.leave_system()

        #
        # now in the system in which the embeder block (including the logic) shall be placed.
        #

        # create the embedder prototype
        embeddedingBlockPrototype = bp.TruggeredSubsystem( sim=dy.get_system_context(), 
                control_input=si.unwrap( self._condition_signal ), 
                subsystem_prototype=self._subsystem_block_prototype,
                prevent_output_computation = self._prevent_output_computation)


        # connect the normal outputs via links
        self._output_links = si.wrap_signal_list( embeddedingBlockPrototype.outputs )

        # connect the additional (control) outputs
        # self._state_output = si.wrap_signal( embeddedingBlockPrototype.state_output )

    @property
    def outputs(self):

        if self._output_links is None:
            BaseException("Please close the subsystem before querying its outputs")
        
        return self._output_links
    






class sub_loop:
    """

    """


    def __init__(self, max_iterations : int, subsystem_name = None ):

        if subsystem_name is not None:
            self._subsystem_name = subsystem_name
        else:
            self._subsystem_name = generate_subsystem_name()

        self._max_iterations = max_iterations

        # control outputs of the embedded subsystem
        self._until_signal = None
        self._yield_signal = None

        # 
        self._outputs_of_embeded_subsystem = []

        # outputs (links to the subsystem outputs) to be used by the user
        self._output_links = None


    def set_outputs(self, signals):
        self._outputs_of_embeded_subsystem = si.unwrap_list( signals.copy() )

    def loop_until(self, condition_signal):
        self._until_signal = condition_signal.unwrap

    def loop_yield(self, condition_signal):
        self._yield_signal = condition_signal.unwrap

    def __enter__(self):

        self._system = enter_subsystem(self._subsystem_name )

        return self


    def __exit__(self, type, value, traceback):

        embedded_subsystem = dy.get_system_context()

        # collect all outputs
        all_output_signals = []
        all_output_signals.extend(self._outputs_of_embeded_subsystem)
        if self._until_signal is not None:
            all_output_signals.append(self._until_signal)
        if self._yield_signal is not None:
            all_output_signals.append(self._yield_signal)

        # set the outputs of the system
        embedded_subsystem.set_primary_outputs(  all_output_signals  )

        # create generic subsystem block prototype
        self._subsystem_block_prototype = bp.GenericSubsystem( sim=embedded_subsystem.upper_level_system, 
                                                    manifest=None, inputSignals=None, 
                                                    embedded_subsystem=embedded_subsystem,
                                                    N_outputs=len(all_output_signals) )

        # leave the context of the subsystem
        dy.leave_system()

        #
        # now in the system in which the embeder block (including the logic) shall be placed.
        #

        # create the embeeder prototype
        embeddedingBlockPrototype = bp.LoopUntilSubsystem( sim=dy.get_system_context(), 
                max_iterations=self._max_iterations, 
                subsystem_prototype=self._subsystem_block_prototype,
                until_signal=self._until_signal,
                yield_signal=self._yield_signal)


                # subsystem_prototypes=subsystem_prototypes, 
                # reference_outputs=  si.unwrap_list( self._reference_outputs ) )

        # connect the normal outputs via links
        self._output_links = si.wrap_signal_list( embeddedingBlockPrototype.outputs )

        # connect the additional (control) outputs
        # self._state_output = si.wrap_signal( embeddedingBlockPrototype.state_output )

    @property
    def outputs(self):

        if self._output_links is None:
            BaseException("Please close the subsystem before querying its outputs")
        
        return self._output_links
    









#
# new stuff
#





class SwitchPrototype:
    """
        a switch for subsystems that are implemented by SwitchedSubsystemPrototype (class to be derived)

        switch_subsystem_name        - the name of the switch
        number_of_control_outputs - the number of system outputs in addition to the embedded systems outputs
                                       i.e. control outputs of a switch/statemaching/...

        - member variables -

        self._switch_output_links    - overwrite by derived class when calling on_exit()
        self.outputs                 - a list of output signals as defined by self._switch_output_links

        - methods to be defined -

        on_exit(subsystem_prototypes)  - callback once all subsystems were defined
                                         during this callback self._switch_output_links must be defined

    """

    # NOTE: in case of an exception, nothing happens just __exit__ is called silently which then aborts

    def __init__(self, switch_subsystem_name, number_of_control_outputs=0):

        self._switch_subsystem_name = switch_subsystem_name
        self._total_number_of_subsystem_outputs = None
        self._switch_output_links = None
        self._switch_system = None
        self._number_of_control_outputs = number_of_control_outputs
        self._number_of_switched_outputs = None

        # List [ bp.GenericSubsystem ]
        self._subsystem_prototypes = None

        # List [ switch_single_sub ]
        self._subsystem_list = None


    def new_subsystem(self, subsystem_name = None):
        raise BaseException("to be implemented")

    def __enter__(self):

        self._subsystem_list = []
        self._subsystem_prototypes = []

        return self

    def on_exit(self, subsystem_prototypes):
        """
            called when all subsystems have been added to the switch

            subsystem_prototypes - the list of subsystem block prototypes of type bp.GenericSubsystem
        """
        raise BaseException("to be implemented")

    def __exit__(self, type, value, traceback):
        # collect all prototyes thet embedd the subsystems
        for system in self._subsystem_list:
            self._subsystem_prototypes.append( system.subsystem_prototype )

        # analyze the default subsystem (the first) to get the output datatypes to use
        for subsystem in [ self._subsystem_list[0] ]:

            # get the outputs that will serve as reference points for datatype inheritance
            number_of_normal_outputs = len( subsystem.outputs ) - self._number_of_control_outputs
            self._reference_outputs = subsystem.outputs[0:number_of_normal_outputs]
            self._total_number_of_subsystem_outputs = len(subsystem.outputs)

            self._number_of_switched_outputs = self._total_number_of_subsystem_outputs - self._number_of_control_outputs

        self.on_exit( self._subsystem_prototypes )

    @property
    def outputs(self):

        if self._switch_output_links is None:
            BaseException("Please close the switch subsystem before querying its outputs")
        
        return self._switch_output_links
    


class SwitchedSubsystemPrototype:
    """
        A single subsystem as part of a switch (implemented by SwitchPrototype) inbeween multiple subsystems

        - methods to called by the user -

        set_switched_outputs(signals)  - connect a list of signals to the output of the switch
    """

    # NOTE: in case of an exception, nothing happens just __exit__ is called silently which then aborts

    def __init__(self, subsystem_name = None ):

        if subsystem_name is not None:
            self._subsystem_name = generate_subsystem_name() + '_' + subsystem_name
        else:        
            self._subsystem_name = generate_subsystem_name()

        self._outputs_of_embeded_subsystem = None

        self._system = None
        self._anonymous_output_signals = None
        self._embeddedingBlockPrototype = None

    @property
    def system(self):
        return self._system

    @property
    def name(self):
        return self._subsystem_name

    @property
    def outputs(self):
        return self._outputs_of_embeded_subsystem

    def set_switched_outputs(self, signals):
        """
            connect a list of outputs to the switch that switches between multple subsystems of this kind

            use self.set_switched_outputs_prototype in the derived classes to set this
        """
        
        BaseException("to be implemented")

    def set_switched_outputs_prototype(self, signals):
        """
            connect a list of outputs to the switch that switches between multple subsystems of this kind
        """

        if self._outputs_of_embeded_subsystem is None:
            self._outputs_of_embeded_subsystem = signals.copy()
        else:
            raise BaseException("tried to overwrite previously set output")





    def __enter__(self):
        self._system = enter_subsystem(self._subsystem_name )

        return self


    def __exit__(self, type, value, traceback):
        embedded_subsystem = dy.get_system_context()

        #
        number_of_subsystem_ouputs = len(self._outputs_of_embeded_subsystem)

        # set the outputs of the system
        embedded_subsystem.set_primary_outputs( si.unwrap_list( self._outputs_of_embeded_subsystem ) )

        # create generic subsystem block prototype
        self._embeddedingBlockPrototype = bp.GenericSubsystem( sim=embedded_subsystem.upper_level_system, 
                                                    manifest=None, inputSignals=None, 
                                                    embedded_subsystem=embedded_subsystem,
                                                    N_outputs=number_of_subsystem_ouputs )

        # leave the context of the subsystem
        dy.leave_system()

    @property
    def subsystem_prototype(self):
        return self._embeddedingBlockPrototype


##
##
## Derivatives of SwitchedSubsystemPrototype
##
##




#
# Switch among subsystems i.e. similar to select/case
#

class SwitchedSubsystem(SwitchedSubsystemPrototype):
    """
        A single subsystem as part of a switch (implemented by SwitchPrototype) inbeween multiple subsystems

        - methods to be called by the user -

        set_switched_outputs(signals)  - connect a list of signals to the output of the switch
    """
    def __init__(self, subsystem_name = None ):

        SwitchedSubsystemPrototype.__init__(self, subsystem_name)


    def set_switched_outputs(self, signals):
        """
            connect a list of outputs to the switch that switches between multple subsystems of this kind
        """

        self.set_switched_outputs_prototype(signals)

        # if self._outputs_of_embeded_subsystem is None:
        #     self._outputs_of_embeded_subsystem = signals.copy()
        # else:
        #     raise BaseException("tried to overwrite previously set outputs")



class sub_switch(SwitchPrototype):
    def __init__(self, switch_subsystem_name, select_signal : dy.SignalUserTemplate ):

        self._select_signal = select_signal
        SwitchPrototype.__init__(self, switch_subsystem_name, number_of_control_outputs=0)

    def new_subsystem(self, subsystem_name = None):

        system = SwitchedSubsystem(subsystem_name=subsystem_name)
        self._subsystem_list.append(system)

        return system


    def on_exit(self, subsystem_prototypes):

        # create the  embeeder prototype
        embeddedingBlockPrototype = bp.SwitchSubsystems( sim=dy.get_system_context(), 
                control_input=self._select_signal.unwrap, 
                subsystem_prototypes=subsystem_prototypes, 
                reference_outputs=  si.unwrap_list( self._reference_outputs ) )

        # connect the normal outputs via links
        self._switch_output_links = si.wrap_signal_list( embeddedingBlockPrototype.switched_normal_outputs )

        # connect the additional (control) outputs
        # -- None --



#
# State machines
#

class state_sub(SwitchedSubsystemPrototype):
    """
        A single subsystem as part of a state machine (implemented by sub_statemachine)

        - methods to called by the user -

        set_switched_outputs(signals, state_signal)  - connect a list of signals to the output of the state machine
    """

    def __init__(self, subsystem_name = None ):
        SwitchedSubsystemPrototype.__init__(self, subsystem_name)

        self._output_signals = None
        self._state_signal = None


    def set_switched_outputs(self, signals, state_signal):
        """
            set the output signals of a subsystem embedded into the state machine

            - signals      - normal system output that are forwarded using a switch
            - state_signal - control signal indicating the next state the state machine enters
        """
        self._output_signals = signals
        self._state_signal = state_signal

        self.set_switched_outputs_prototype( signals +  [state_signal] )

    @property
    def state_control_output(self):
         return self._state_signal

    @property
    def subsystem_outputs(self):
        return self._output_signals



class sub_statemachine(SwitchPrototype):
    """
        A state machine subsystem

        - properties -

        self.state - status signal of the state machine (available after 'with sub_statemachine' has findished)
    """
    def __init__(self, switch_subsystem_name):
        number_of_control_outputs = 1 # add one control output to inform about the current state

        SwitchPrototype.__init__(self, switch_subsystem_name, number_of_control_outputs )

        # state output signal undefined until defined by on_exit() 
        self._state_output = None

    @property
    def state(self):
        """
            get the signal describing the current state
        """
        return self._state_output

    def new_subsystem(self, subsystem_name = None):

        system = state_sub(subsystem_name=subsystem_name)
        self._subsystem_list.append(system)

        return system

    def on_exit(self, subsystem_prototypes):

        # create the embeeder prototype
        embeddedingBlockPrototype = bp.StatemachineSwitchSubsystems( sim=dy.get_system_context(), 
                subsystem_prototypes=subsystem_prototypes, 
                reference_outputs=  si.unwrap_list( self._reference_outputs ) )

        # connect the normal outputs via links
        self._switch_output_links = si.wrap_signal_list( embeddedingBlockPrototype.switched_normal_outputs )

        # connect the additional (control) outputs
        self._state_output = si.wrap_signal( embeddedingBlockPrototype.state_output )

Classes

class SwitchPrototype (switch_subsystem_name, number_of_control_outputs=0)

a switch for subsystems that are implemented by SwitchedSubsystemPrototype (class to be derived)

switch_subsystem_name - the name of the switch number_of_control_outputs - the number of system outputs in addition to the embedded systems outputs i.e. control outputs of a switch/statemaching/…

  • member variables -

self._switch_output_links - overwrite by derived class when calling on_exit() self.outputs - a list of output signals as defined by self._switch_output_links

  • methods to be defined -

on_exit(subsystem_prototypes) - callback once all subsystems were defined during this callback self._switch_output_links must be defined

Expand source code
class SwitchPrototype:
    """
        a switch for subsystems that are implemented by SwitchedSubsystemPrototype (class to be derived)

        switch_subsystem_name        - the name of the switch
        number_of_control_outputs - the number of system outputs in addition to the embedded systems outputs
                                       i.e. control outputs of a switch/statemaching/...

        - member variables -

        self._switch_output_links    - overwrite by derived class when calling on_exit()
        self.outputs                 - a list of output signals as defined by self._switch_output_links

        - methods to be defined -

        on_exit(subsystem_prototypes)  - callback once all subsystems were defined
                                         during this callback self._switch_output_links must be defined

    """

    # NOTE: in case of an exception, nothing happens just __exit__ is called silently which then aborts

    def __init__(self, switch_subsystem_name, number_of_control_outputs=0):

        self._switch_subsystem_name = switch_subsystem_name
        self._total_number_of_subsystem_outputs = None
        self._switch_output_links = None
        self._switch_system = None
        self._number_of_control_outputs = number_of_control_outputs
        self._number_of_switched_outputs = None

        # List [ bp.GenericSubsystem ]
        self._subsystem_prototypes = None

        # List [ switch_single_sub ]
        self._subsystem_list = None


    def new_subsystem(self, subsystem_name = None):
        raise BaseException("to be implemented")

    def __enter__(self):

        self._subsystem_list = []
        self._subsystem_prototypes = []

        return self

    def on_exit(self, subsystem_prototypes):
        """
            called when all subsystems have been added to the switch

            subsystem_prototypes - the list of subsystem block prototypes of type bp.GenericSubsystem
        """
        raise BaseException("to be implemented")

    def __exit__(self, type, value, traceback):
        # collect all prototyes thet embedd the subsystems
        for system in self._subsystem_list:
            self._subsystem_prototypes.append( system.subsystem_prototype )

        # analyze the default subsystem (the first) to get the output datatypes to use
        for subsystem in [ self._subsystem_list[0] ]:

            # get the outputs that will serve as reference points for datatype inheritance
            number_of_normal_outputs = len( subsystem.outputs ) - self._number_of_control_outputs
            self._reference_outputs = subsystem.outputs[0:number_of_normal_outputs]
            self._total_number_of_subsystem_outputs = len(subsystem.outputs)

            self._number_of_switched_outputs = self._total_number_of_subsystem_outputs - self._number_of_control_outputs

        self.on_exit( self._subsystem_prototypes )

    @property
    def outputs(self):

        if self._switch_output_links is None:
            BaseException("Please close the switch subsystem before querying its outputs")
        
        return self._switch_output_links

Subclasses

Instance variables

var outputs
Expand source code
@property
def outputs(self):

    if self._switch_output_links is None:
        BaseException("Please close the switch subsystem before querying its outputs")
    
    return self._switch_output_links

Methods

def new_subsystem(self, subsystem_name=None)
Expand source code
def new_subsystem(self, subsystem_name = None):
    raise BaseException("to be implemented")
def on_exit(self, subsystem_prototypes)

called when all subsystems have been added to the switch

subsystem_prototypes - the list of subsystem block prototypes of type bp.GenericSubsystem

Expand source code
def on_exit(self, subsystem_prototypes):
    """
        called when all subsystems have been added to the switch

        subsystem_prototypes - the list of subsystem block prototypes of type bp.GenericSubsystem
    """
    raise BaseException("to be implemented")
class SwitchedSubsystem (subsystem_name=None)

A single subsystem as part of a switch (implemented by SwitchPrototype) inbeween multiple subsystems

  • methods to be called by the user -

set_switched_outputs(signals) - connect a list of signals to the output of the switch

Expand source code
class SwitchedSubsystem(SwitchedSubsystemPrototype):
    """
        A single subsystem as part of a switch (implemented by SwitchPrototype) inbeween multiple subsystems

        - methods to be called by the user -

        set_switched_outputs(signals)  - connect a list of signals to the output of the switch
    """
    def __init__(self, subsystem_name = None ):

        SwitchedSubsystemPrototype.__init__(self, subsystem_name)


    def set_switched_outputs(self, signals):
        """
            connect a list of outputs to the switch that switches between multple subsystems of this kind
        """

        self.set_switched_outputs_prototype(signals)

Ancestors

Methods

def set_switched_outputs(self, signals)

connect a list of outputs to the switch that switches between multple subsystems of this kind

Expand source code
def set_switched_outputs(self, signals):
    """
        connect a list of outputs to the switch that switches between multple subsystems of this kind
    """

    self.set_switched_outputs_prototype(signals)

Inherited members

class SwitchedSubsystemPrototype (subsystem_name=None)

A single subsystem as part of a switch (implemented by SwitchPrototype) inbeween multiple subsystems

  • methods to called by the user -

set_switched_outputs(signals) - connect a list of signals to the output of the switch

Expand source code
class SwitchedSubsystemPrototype:
    """
        A single subsystem as part of a switch (implemented by SwitchPrototype) inbeween multiple subsystems

        - methods to called by the user -

        set_switched_outputs(signals)  - connect a list of signals to the output of the switch
    """

    # NOTE: in case of an exception, nothing happens just __exit__ is called silently which then aborts

    def __init__(self, subsystem_name = None ):

        if subsystem_name is not None:
            self._subsystem_name = generate_subsystem_name() + '_' + subsystem_name
        else:        
            self._subsystem_name = generate_subsystem_name()

        self._outputs_of_embeded_subsystem = None

        self._system = None
        self._anonymous_output_signals = None
        self._embeddedingBlockPrototype = None

    @property
    def system(self):
        return self._system

    @property
    def name(self):
        return self._subsystem_name

    @property
    def outputs(self):
        return self._outputs_of_embeded_subsystem

    def set_switched_outputs(self, signals):
        """
            connect a list of outputs to the switch that switches between multple subsystems of this kind

            use self.set_switched_outputs_prototype in the derived classes to set this
        """
        
        BaseException("to be implemented")

    def set_switched_outputs_prototype(self, signals):
        """
            connect a list of outputs to the switch that switches between multple subsystems of this kind
        """

        if self._outputs_of_embeded_subsystem is None:
            self._outputs_of_embeded_subsystem = signals.copy()
        else:
            raise BaseException("tried to overwrite previously set output")





    def __enter__(self):
        self._system = enter_subsystem(self._subsystem_name )

        return self


    def __exit__(self, type, value, traceback):
        embedded_subsystem = dy.get_system_context()

        #
        number_of_subsystem_ouputs = len(self._outputs_of_embeded_subsystem)

        # set the outputs of the system
        embedded_subsystem.set_primary_outputs( si.unwrap_list( self._outputs_of_embeded_subsystem ) )

        # create generic subsystem block prototype
        self._embeddedingBlockPrototype = bp.GenericSubsystem( sim=embedded_subsystem.upper_level_system, 
                                                    manifest=None, inputSignals=None, 
                                                    embedded_subsystem=embedded_subsystem,
                                                    N_outputs=number_of_subsystem_ouputs )

        # leave the context of the subsystem
        dy.leave_system()

    @property
    def subsystem_prototype(self):
        return self._embeddedingBlockPrototype

Subclasses

Instance variables

var name
Expand source code
@property
def name(self):
    return self._subsystem_name
var outputs
Expand source code
@property
def outputs(self):
    return self._outputs_of_embeded_subsystem
var subsystem_prototype
Expand source code
@property
def subsystem_prototype(self):
    return self._embeddedingBlockPrototype
var system
Expand source code
@property
def system(self):
    return self._system

Methods

def set_switched_outputs(self, signals)

connect a list of outputs to the switch that switches between multple subsystems of this kind

use self.set_switched_outputs_prototype in the derived classes to set this

Expand source code
def set_switched_outputs(self, signals):
    """
        connect a list of outputs to the switch that switches between multple subsystems of this kind

        use self.set_switched_outputs_prototype in the derived classes to set this
    """
    
    BaseException("to be implemented")
def set_switched_outputs_prototype(self, signals)

connect a list of outputs to the switch that switches between multple subsystems of this kind

Expand source code
def set_switched_outputs_prototype(self, signals):
    """
        connect a list of outputs to the switch that switches between multple subsystems of this kind
    """

    if self._outputs_of_embeded_subsystem is None:
        self._outputs_of_embeded_subsystem = signals.copy()
    else:
        raise BaseException("tried to overwrite previously set output")
class state_sub (subsystem_name=None)

A single subsystem as part of a state machine (implemented by sub_statemachine)

  • methods to called by the user -

set_switched_outputs(signals, state_signal) - connect a list of signals to the output of the state machine

Expand source code
class state_sub(SwitchedSubsystemPrototype):
    """
        A single subsystem as part of a state machine (implemented by sub_statemachine)

        - methods to called by the user -

        set_switched_outputs(signals, state_signal)  - connect a list of signals to the output of the state machine
    """

    def __init__(self, subsystem_name = None ):
        SwitchedSubsystemPrototype.__init__(self, subsystem_name)

        self._output_signals = None
        self._state_signal = None


    def set_switched_outputs(self, signals, state_signal):
        """
            set the output signals of a subsystem embedded into the state machine

            - signals      - normal system output that are forwarded using a switch
            - state_signal - control signal indicating the next state the state machine enters
        """
        self._output_signals = signals
        self._state_signal = state_signal

        self.set_switched_outputs_prototype( signals +  [state_signal] )

    @property
    def state_control_output(self):
         return self._state_signal

    @property
    def subsystem_outputs(self):
        return self._output_signals

Ancestors

Instance variables

var state_control_output
Expand source code
@property
def state_control_output(self):
     return self._state_signal
var subsystem_outputs
Expand source code
@property
def subsystem_outputs(self):
    return self._output_signals

Methods

def set_switched_outputs(self, signals, state_signal)

set the output signals of a subsystem embedded into the state machine

  • signals - normal system output that are forwarded using a switch
  • state_signal - control signal indicating the next state the state machine enters
Expand source code
def set_switched_outputs(self, signals, state_signal):
    """
        set the output signals of a subsystem embedded into the state machine

        - signals      - normal system output that are forwarded using a switch
        - state_signal - control signal indicating the next state the state machine enters
    """
    self._output_signals = signals
    self._state_signal = state_signal

    self.set_switched_outputs_prototype( signals +  [state_signal] )

Inherited members

class sub_if (condition_signal: SignalUserTemplate, subsystem_name=None, prevent_output_computation=False)

NOTE: in case the if condition is false, the outputs are hold. Eventally uninitialized.

Expand source code
class sub_if:
    """

        NOTE: in case the if condition is false, the outputs are hold. Eventally uninitialized.
    """


    def __init__(self, condition_signal : dy.SignalUserTemplate, subsystem_name = None, prevent_output_computation = False ):

        if subsystem_name is not None:
            self._subsystem_name = subsystem_name
        else:
            self._subsystem_name = generate_subsystem_name()

        self._condition_signal = condition_signal
        self._prevent_output_computation = prevent_output_computation

        # 
        self._outputs_of_embeded_subsystem = []

        # outputs (links to the subsystem outputs) to be used by the user
        self._output_links = None


    def set_outputs(self, signals):
        self._outputs_of_embeded_subsystem = signals.copy()

    def __enter__(self):
        self._system = enter_subsystem(self._subsystem_name )

        return self


    def __exit__(self, type, value, traceback):

        embedded_subsystem = dy.get_system_context()

        # set the outputs of the system
        embedded_subsystem.set_primary_outputs( si.unwrap_list( self._outputs_of_embeded_subsystem ) )

        # create generic subsystem block prototype
        self._subsystem_block_prototype = bp.GenericSubsystem( sim=embedded_subsystem.upper_level_system, 
                                                    manifest=None, inputSignals=None, 
                                                    embedded_subsystem=embedded_subsystem,
                                                    N_outputs=len(self._outputs_of_embeded_subsystem) )

        # leave the context of the subsystem
        dy.leave_system()

        #
        # now in the system in which the embeder block (including the logic) shall be placed.
        #

        # create the embedder prototype
        embeddedingBlockPrototype = bp.TruggeredSubsystem( sim=dy.get_system_context(), 
                control_input=si.unwrap( self._condition_signal ), 
                subsystem_prototype=self._subsystem_block_prototype,
                prevent_output_computation = self._prevent_output_computation)


        # connect the normal outputs via links
        self._output_links = si.wrap_signal_list( embeddedingBlockPrototype.outputs )

        # connect the additional (control) outputs
        # self._state_output = si.wrap_signal( embeddedingBlockPrototype.state_output )

    @property
    def outputs(self):

        if self._output_links is None:
            BaseException("Please close the subsystem before querying its outputs")
        
        return self._output_links

Instance variables

var outputs
Expand source code
@property
def outputs(self):

    if self._output_links is None:
        BaseException("Please close the subsystem before querying its outputs")
    
    return self._output_links

Methods

def set_outputs(self, signals)
Expand source code
def set_outputs(self, signals):
    self._outputs_of_embeded_subsystem = signals.copy()
class sub_loop (max_iterations: int, subsystem_name=None)
Expand source code
class sub_loop:
    """

    """


    def __init__(self, max_iterations : int, subsystem_name = None ):

        if subsystem_name is not None:
            self._subsystem_name = subsystem_name
        else:
            self._subsystem_name = generate_subsystem_name()

        self._max_iterations = max_iterations

        # control outputs of the embedded subsystem
        self._until_signal = None
        self._yield_signal = None

        # 
        self._outputs_of_embeded_subsystem = []

        # outputs (links to the subsystem outputs) to be used by the user
        self._output_links = None


    def set_outputs(self, signals):
        self._outputs_of_embeded_subsystem = si.unwrap_list( signals.copy() )

    def loop_until(self, condition_signal):
        self._until_signal = condition_signal.unwrap

    def loop_yield(self, condition_signal):
        self._yield_signal = condition_signal.unwrap

    def __enter__(self):

        self._system = enter_subsystem(self._subsystem_name )

        return self


    def __exit__(self, type, value, traceback):

        embedded_subsystem = dy.get_system_context()

        # collect all outputs
        all_output_signals = []
        all_output_signals.extend(self._outputs_of_embeded_subsystem)
        if self._until_signal is not None:
            all_output_signals.append(self._until_signal)
        if self._yield_signal is not None:
            all_output_signals.append(self._yield_signal)

        # set the outputs of the system
        embedded_subsystem.set_primary_outputs(  all_output_signals  )

        # create generic subsystem block prototype
        self._subsystem_block_prototype = bp.GenericSubsystem( sim=embedded_subsystem.upper_level_system, 
                                                    manifest=None, inputSignals=None, 
                                                    embedded_subsystem=embedded_subsystem,
                                                    N_outputs=len(all_output_signals) )

        # leave the context of the subsystem
        dy.leave_system()

        #
        # now in the system in which the embeder block (including the logic) shall be placed.
        #

        # create the embeeder prototype
        embeddedingBlockPrototype = bp.LoopUntilSubsystem( sim=dy.get_system_context(), 
                max_iterations=self._max_iterations, 
                subsystem_prototype=self._subsystem_block_prototype,
                until_signal=self._until_signal,
                yield_signal=self._yield_signal)


                # subsystem_prototypes=subsystem_prototypes, 
                # reference_outputs=  si.unwrap_list( self._reference_outputs ) )

        # connect the normal outputs via links
        self._output_links = si.wrap_signal_list( embeddedingBlockPrototype.outputs )

        # connect the additional (control) outputs
        # self._state_output = si.wrap_signal( embeddedingBlockPrototype.state_output )

    @property
    def outputs(self):

        if self._output_links is None:
            BaseException("Please close the subsystem before querying its outputs")
        
        return self._output_links

Instance variables

var outputs
Expand source code
@property
def outputs(self):

    if self._output_links is None:
        BaseException("Please close the subsystem before querying its outputs")
    
    return self._output_links

Methods

def loop_until(self, condition_signal)
Expand source code
def loop_until(self, condition_signal):
    self._until_signal = condition_signal.unwrap
def loop_yield(self, condition_signal)
Expand source code
def loop_yield(self, condition_signal):
    self._yield_signal = condition_signal.unwrap
def set_outputs(self, signals)
Expand source code
def set_outputs(self, signals):
    self._outputs_of_embeded_subsystem = si.unwrap_list( signals.copy() )
class sub_statemachine (switch_subsystem_name)

A state machine subsystem

  • properties -

self.state - status signal of the state machine (available after 'with sub_statemachine' has findished)

Expand source code
class sub_statemachine(SwitchPrototype):
    """
        A state machine subsystem

        - properties -

        self.state - status signal of the state machine (available after 'with sub_statemachine' has findished)
    """
    def __init__(self, switch_subsystem_name):
        number_of_control_outputs = 1 # add one control output to inform about the current state

        SwitchPrototype.__init__(self, switch_subsystem_name, number_of_control_outputs )

        # state output signal undefined until defined by on_exit() 
        self._state_output = None

    @property
    def state(self):
        """
            get the signal describing the current state
        """
        return self._state_output

    def new_subsystem(self, subsystem_name = None):

        system = state_sub(subsystem_name=subsystem_name)
        self._subsystem_list.append(system)

        return system

    def on_exit(self, subsystem_prototypes):

        # create the embeeder prototype
        embeddedingBlockPrototype = bp.StatemachineSwitchSubsystems( sim=dy.get_system_context(), 
                subsystem_prototypes=subsystem_prototypes, 
                reference_outputs=  si.unwrap_list( self._reference_outputs ) )

        # connect the normal outputs via links
        self._switch_output_links = si.wrap_signal_list( embeddedingBlockPrototype.switched_normal_outputs )

        # connect the additional (control) outputs
        self._state_output = si.wrap_signal( embeddedingBlockPrototype.state_output )

Ancestors

Instance variables

var state

get the signal describing the current state

Expand source code
@property
def state(self):
    """
        get the signal describing the current state
    """
    return self._state_output

Methods

def new_subsystem(self, subsystem_name=None)
Expand source code
def new_subsystem(self, subsystem_name = None):

    system = state_sub(subsystem_name=subsystem_name)
    self._subsystem_list.append(system)

    return system

Inherited members

class sub_switch (switch_subsystem_name, select_signal: SignalUserTemplate)

a switch for subsystems that are implemented by SwitchedSubsystemPrototype (class to be derived)

switch_subsystem_name - the name of the switch number_of_control_outputs - the number of system outputs in addition to the embedded systems outputs i.e. control outputs of a switch/statemaching/…

  • member variables -

self._switch_output_links - overwrite by derived class when calling on_exit() self.outputs - a list of output signals as defined by self._switch_output_links

  • methods to be defined -

on_exit(subsystem_prototypes) - callback once all subsystems were defined during this callback self._switch_output_links must be defined

Expand source code
class sub_switch(SwitchPrototype):
    def __init__(self, switch_subsystem_name, select_signal : dy.SignalUserTemplate ):

        self._select_signal = select_signal
        SwitchPrototype.__init__(self, switch_subsystem_name, number_of_control_outputs=0)

    def new_subsystem(self, subsystem_name = None):

        system = SwitchedSubsystem(subsystem_name=subsystem_name)
        self._subsystem_list.append(system)

        return system


    def on_exit(self, subsystem_prototypes):

        # create the  embeeder prototype
        embeddedingBlockPrototype = bp.SwitchSubsystems( sim=dy.get_system_context(), 
                control_input=self._select_signal.unwrap, 
                subsystem_prototypes=subsystem_prototypes, 
                reference_outputs=  si.unwrap_list( self._reference_outputs ) )

        # connect the normal outputs via links
        self._switch_output_links = si.wrap_signal_list( embeddedingBlockPrototype.switched_normal_outputs )

Ancestors

Methods

def new_subsystem(self, subsystem_name=None)
Expand source code
def new_subsystem(self, subsystem_name = None):

    system = SwitchedSubsystem(subsystem_name=subsystem_name)
    self._subsystem_list.append(system)

    return system

Inherited members