Module openrtdynamics2.py_execute

Expand source code
from .py_execute import *

__pdoc__ = {}
__pdoc__['py_execute'] = False
#__all__ = ['show_required_inputs']

__all__ = ['show_required_inputs', 'run_batch_simulation', 'SystemInstance', 'CompiledCode']

Functions

def run_batch_simulation(system_instance: openrtdynamics2.py_execute.py_execute.SystemInstance, input_data, N, output_keys=None, reset_system=True)

Run a simulation

system_instance : SystemInstance - the instance of the system to simulate input_data - hash array containing the input data stored in arrays N - the number of steps to simulate output_keys - a list of output signals of which the traces are stored reset_system - reset the system before starting the simulation (if false, a previous simulation can be continued)

Example usage:

testsim = SystemInstance(algorithm_sourcecode, manifest)

1) Get all output signals

N=3000 input_data = {'velocity' : 2*np.ones(N), 'time_scale' : 7 } sim_results_full = run_batch_simulation(testsim, input_data, N )

2) Get only a subset of the output signals

sim_results = run_batch_simulation(testsim, input_data, N, output_keys=['x', 'y', 'steering'] )

Expand source code
def run_batch_simulation(system_instance : SystemInstance, input_data, N, output_keys=None, reset_system=True ):
    """
        Run a simulation
        
        system_instance : SystemInstance - the instance of the system to simulate
        input_data                       - hash array containing the input data stored in arrays
        N                                - the number of steps to simulate
        output_keys                      - a list of output signals of which the traces are stored
        reset_system                     - reset the system before starting the simulation (if false, a
                                           previous simulation can be continued)
        
    
        Example usage:
        --------------
    
        testsim = SystemInstance(algorithm_sourcecode, manifest)
    
        # 1) Get all output signals
        
        N=3000
        input_data = {'velocity' : 2*np.ones(N), 'time_scale' : 7 }
        sim_results_full = run_batch_simulation(testsim, input_data, N )
        
        # 2) Get only a subset of the output signals
        
        sim_results = run_batch_simulation(testsim, input_data, N,  output_keys=['x', 'y', 'steering'] )

    """
    
    # get memory for input signals 
    inputs = system_instance.inputs
    
    #
    if output_keys is None:
        output_keys = system_instance.manifest['io']['outputs']['calculate_output']['names']
    
    # allocate memory for output signals
    storage = { k : np.zeros(N) for k in output_keys }

    # detect single values in input_data which will be treated as constants
    input_data_without_const_values = {}
    for k in input_data.keys():

        val = input_data[k]

        if hasattr(inputs, k):
            if type(val) == float or type(val) == int or np.size( val ) == 1:
                setattr(inputs, k, val)
                # print('set const value', val, ' for ', k)
            else:
                input_data_without_const_values[k] = val

                
    # reset system 
    if reset_system:
        system_instance.reset_states()
    
    # start simulation
    for i in range(0,N):
        
        for k in input_data_without_const_values.keys():
            val = input_data_without_const_values[k][i]
            setattr(inputs, k, val)

        outputs = system_instance.calculate_outputs()
        system_instance.update_states()

        for k in output_keys:
            val = getattr(outputs, k)
            storage[k][i] = val

    return storage
def show_required_inputs(testsim)

Print a table containing all input signals as required by the simulator functions of the implemented system.

system_instance : SystemInstance - the instance of the system

Expand source code
def show_required_inputs(testsim):
    """
        Print a table containing all input signals as required by the simulator functions
        of the implemented system.

        system_instance : SystemInstance - the instance of the system
    """

    from prettytable import PrettyTable

    s_o_1 = testsim.manifest['io']['inputs']['calculate_output']['names']
    s_u = testsim.manifest['io']['inputs']['state_update']['names']
    s_r = testsim.manifest['io']['inputs']['reset']['names']

    all_signals = list(set(s_o_1 + s_u + s_r) )

    table_rows = []
    for k in all_signals:

        o_1, u, r = '', '', ''

        if k in s_o_1:
            o_1 = 'X'

        if k in s_u:
            u = 'X'

        if k in s_r:
            r = 'X'

        row = [ k, o_1, u, r ]

        table_rows.append(row)


    x = PrettyTable()
    x.field_names = ["input signal,  needed for -->", "calc. outputs", "update", "reset"]

    x.add_rows(table_rows)

    print(x)

Classes

class CompiledCode (code_gen_results)

Compile a system so that it can be instanciated and, hence, executed

code_gen_results - the returned results of dy.generate_code

Note: internally the c++ interpreter Cling C++ interpreter is used that is interfaced by Python via https://pypi.org/project/cppyy/ .

Expand source code
class CompiledCode:
    
    def __init__(self, code_gen_results):
        """
            Compile a system so that it can be instanciated and, hence, executed 
        
            code_gen_results  - the returned results of dy.generate_code

            Note: internally the c++ interpreter Cling C++ interpreter is used
            that is interfaced by Python via https://pypi.org/project/cppyy/ .
        """
    
        import cppyy

        global system_instance_counter

        #
        # Note:
        #
        # Issue: https://bitbucket.org/wlav/cppyy/issues/295/q-reset-interpreter-or-multiple-instances
        #
        # Symbols once create in the cppyy.glb namespace cannot be deleted or overwritten.
        # Therefore, all code is wrapped into a namespace with a unique name that involves a
        # counter that increases for each call of cppyy.cppdef(src). Even if an instance of CompiledCode
        # is destructed, the compiled c++ module remains in the cppyy.glb namespace, hence, over time
        # symbols are accumulated. Hence, this is a memory leak. I do not know how to solve this.
        #

        ortd_auto_namespace_id = system_instance_counter
        system_instance_counter = system_instance_counter + 1
        
        algorithm_sourcecode, manifest = code_gen_results['algorithm_sourcecode'], code_gen_results['manifest']        
        self._manifest = manifest        

        # wrap all classes into a unique namespace
        src = 'namespace ' + 'ortd_system_ns' + str(ortd_auto_namespace_id) + '{\n' + algorithm_sourcecode + '\n}'

        # send sourcecode to jit-compiler
        cppyy.cppdef(src)

        # load module and extract the main class 'simulation'
        cpp_class_of_system = getattr(cppyy.gbl, 'ortd_system_ns' + str(ortd_auto_namespace_id) ).simulation

        # store
        self._cpp_class_of_system = cpp_class_of_system

    @property
    def system_class(self):
        return self._cpp_class_of_system
        
    @property
    def manifest(self):
        return self._manifest

Instance variables

var manifest
Expand source code
@property
def manifest(self):
    return self._manifest
var system_class
Expand source code
@property
def system_class(self):
    return self._cpp_class_of_system
class SystemInstance (compiled_code: openrtdynamics2.py_execute.py_execute.CompiledCode)

Instantiate a system so that it can be executed

compiled_code - an instance of CompiledCode

Expand source code
class SystemInstance:
    
    def __init__(self, compiled_code : CompiledCode):
        """
            Instantiate a system so that it can be executed 
        
            compiled_code   - an instance of CompiledCode
        """

        self._compiled_code = compiled_code

        system_class = compiled_code.system_class


        # create an instance of the system
        self._sim = system_class()

        # create instances for the in- and output signals        
        self.inputs = system_class.Inputs()
        self.outputs = system_class.Outputs()
        
        fill_default_input_values( compiled_code.manifest, self.inputs )
        
        
    def reset_states(self):
        """
            reset all internal states of the system and all subsystems
        """
        self._sim.step( self.outputs, self.inputs, 0, False, True )

    def calculate_outputs(self):
        self._sim.step( self.outputs, self.inputs, 1, False, False )
        return self.outputs

    def update_states(self):
        self._sim.step( self.outputs, self.inputs, 0, True, False )

    def single_step(self):
        """
            perform one step and return the system outputs

            Please note: the inputs must be via the member variable inputs before.
        """
        res = self.calculate_outputs()
        self.update_states()

        return res

        
    @property
    def instance(self):
        return self._sim
    
    @property
    def manifest(self):
        return self._compiled_code.manifest

Instance variables

var instance
Expand source code
@property
def instance(self):
    return self._sim
var manifest
Expand source code
@property
def manifest(self):
    return self._compiled_code.manifest

Methods

def calculate_outputs(self)
Expand source code
def calculate_outputs(self):
    self._sim.step( self.outputs, self.inputs, 1, False, False )
    return self.outputs
def reset_states(self)

reset all internal states of the system and all subsystems

Expand source code
def reset_states(self):
    """
        reset all internal states of the system and all subsystems
    """
    self._sim.step( self.outputs, self.inputs, 0, False, True )
def single_step(self)

perform one step and return the system outputs

Please note: the inputs must be via the member variable inputs before.

Expand source code
def single_step(self):
    """
        perform one step and return the system outputs

        Please note: the inputs must be via the member variable inputs before.
    """
    res = self.calculate_outputs()
    self.update_states()

    return res
def update_states(self)
Expand source code
def update_states(self):
    self._sim.step( self.outputs, self.inputs, 0, True, False )