Module openrtdynamics2.lang.diagram_core.system
Expand source code
from .signal_network.signals import *
from .signal_network.Block import *
from .datatype_propagation import *
from contextlib import contextmanager
from typing import Dict, List
from colorama import init, Fore, Back, Style
init(autoreset=True)
class System:
def __init__(self, upper_level_system, name : str ):
self.upper_level_system = upper_level_system
self._name = name
self.blocks_in_system = []
# counter for system input signals
# This determines the order of teh arguments of the generated c++ functions
self.simulation_input_signal_counter = 0
self._top_level_system = None
if upper_level_system is None:
self._top_level_system = self
self.block_id_counter = 0
self.signal_id_counter = 0
# manager to determine datatypes as new blocks are added
# only for the highest-level system -- subsystems use the
# datatype propagation of the main system
self.datatype_propagation_instance = DatatypePropagation(self)
else:
self._top_level_system = upper_level_system._top_level_system
# # share the counter of the
# self.block_id_counter = upper_level_system.block_id_counter
# self.signal_id_counter = upper_level_system.signal_id_counter
# re-use the upper-level propagation
self.datatype_propagation_instance = upper_level_system.datatype_propagation_instance
# components
self.components_ = {}
# subsystems
self._subsystems = []
# primary outputs
self._output_signals = []
# signals that must be computed
self._signals_mandatory_to_compute = []
# the results of the compilation of this system
self.compile_result = None
def getName(self):
return self._name
@property
def name(self):
return self._name
@property
def parent_system(self):
return self.upper_level_system
def generate_new_block_id(self):
self._top_level_system.block_id_counter += 1
return self._top_level_system.block_id_counter
# get a new unique id for creating a signal
def generate_new_signal_id(self):
self._top_level_system.signal_id_counter += 1
return self._top_level_system.signal_id_counter
def append_subsystem(self, system):
"""
add a subsystem to the list of subsystems to this system
"""
self._subsystems.append( system )
@property
def subsystems(self):
return self._subsystems
def addBlock(self, blk : Block):
self.blocks_in_system.append(blk)
# TODO: remove this?
def set_primary_outputs(self, outputSignals):
self._output_signals = outputSignals
def append_primay_ouput(self, outputSignals):
self._output_signals.append(outputSignals)
@property
def primary_outputs(self):
return self._output_signals
def add_signal_mandatory_to_compute(self, signal):
self._signals_mandatory_to_compute = self._signals_mandatory_to_compute + [ signal ]
@property
def signals_mandatory_to_compute(self):
return self._signals_mandatory_to_compute
def ShowBlocks(self):
print("-----------------------------")
print("Blocks in simulation " + self._name + ":")
print("-----------------------------")
for blk in self.blocks_in_system:
print(Fore.YELLOW + "* " + Style.RESET_ALL + "'" + blk.name + "' (" + str(blk.id) + ")" )
# list input singals
print(Fore.RED + " input signals")
if blk.getInputSignals() is not None and len( blk.getInputSignals() ) > 0:
for inSig in blk.getInputSignals():
print(Style.DIM + " - " + inSig.toStr() )
else:
print(Style.DIM + " * undef *")
# list output singals
if len( blk.getOutputSignals() ) > 0:
print(Fore.GREEN + " output signals")
for inSig in blk.getOutputSignals():
print(Style.DIM + " - " + inSig.toStr() )
print()
print(" nested subsystems")
for sys in self._subsystems:
print(" - " + sys.getName() )
@property
def components(self):
return self.components_
def exportGraph(self):
# TODO: remove from this class and move to aonther class 'visualization' or 'editor'
def createBlockNode(nodes_array_index, block):
idstr = 'bid_' + str( block.id )
node = {}
node['name'] = block.name
node['type'] = 'block'
node['tostr'] = block.toStr()
node['id'] = idstr
node['nodes_array_index'] = nodes_array_index
return node, idstr
def createSimulationInputNode(nodes_array_index, inSig):
# append a node that stands for a simulation input
idstr = 'insig_' + inSig.name
node = {}
node['name'] = 'in_' + inSig.name
node['type'] = 'simulation_input'
# node['tostr'] = block.toStr()
node['id'] = idstr
node['nodes_array_index'] = nodes_array_index
return node, idstr
def createLink(signal, sourceBlock, destBlock):
# create a link in-between blocks
link = {}
link['tostr'] = signal.toStr()
link['name'] = signal.name
link['source'] = ''
link['target'] = ''
link['source_bid'] = sourceBlock.id
link['target_bid'] = destBlock.id
link['source_key'] = 'bid_' + str( sourceBlock.id )
link['target_key'] = 'bid_' + str( destBlock.id )
return link
def createLinkFromSimulationInput(signal, destBlock):
# create a link between a maker-node for the simulation input signal
# anf the destination block
link = {}
link['tostr'] = signal.toStr()
link['name'] = signal.name
link['source'] = ''
link['target'] = ''
link['target_bid'] = destBlock.id
link['source_key'] = idstr
link['target_key'] = 'bid_' + str( destBlock.id )
return link
# init/reset the list of all nodes and links
nodes_array = []
nodes_hash = {}
links = []
# create a node for each block in the simulation
nodes_array_index = 0
for block in self.blocks_in_system:
node, idstr = createBlockNode(nodes_array_index, block)
nodes_array.append( node )
nodes_hash[idstr] = node
nodes_array_index += 1
# build links
for blk in self.blocks_in_system:
# list input singals
if blk.getInputSignals() is not None and len( blk.getInputSignals() ) > 0:
print(Fore.RED + " input signals")
for inSig in blk.getInputSignals():
print(Style.DIM + " - " + inSig.toStr() )
if isinstance(inSig.lookupSource(), BlockOutputSignal):
# this is a block to block connection. Create a normal link in-between
sourceBlock = inSig.getSourceBlock()
link = createLink(signal=inSig, sourceBlock=sourceBlock, destBlock=blk)
links.append( link )
if isinstance(inSig, SimulationInputSignal):
# this is an input to the simulation: add a special marker node
# and add a link from this newly created node to the block
# TODO: only create this node, of it does not already exist
node, idstr = createSimulationInputNode(nodes_array_index, inSig)
nodes_array.append( node )
nodes_hash[idstr] = node
nodes_array_index += 1
link = createLinkFromSimulationInput(inSig, blk)
links.append( link )
# create nodes/links/something for the simulation outputs
# (TODO)
# finish the graph structure
graph = {}
graph['nodes_hash'] = nodes_hash
graph['nodes'] = nodes_array # d3 requires an array
graph['links'] = links
# print graph
import json
print(json.dumps(graph, indent=4, sort_keys=True))
#
return graph
@property
def blocks(self):
return self.blocks_in_system
def resolve_anonymous_signals(self, ignore_signals_with_datatype_inheritance=False):
"""
close down the anonymous signals and wire the connected blocks directly to the source.
"""
for block in self.blocks_in_system:
block.verifyInputSignals(ignore_signals_with_datatype_inheritance)
def propagate_datatypes(self):
self.resolve_anonymous_signals(ignore_signals_with_datatype_inheritance=True)
# find out the output datatypes
self.datatype_propagation_instance.fixateTypes()
# execute this later in the compilatin process
# for block in self.blocks_in_system:
# block.verifyInputSignals(ignore_signals_with_datatype_inheritance=False)
@property
def signal_with_unresolved_datatypes(self):
return self.datatype_propagation_instance.signalsWithUnderminedTypes
Classes
class System (upper_level_system, name: str)
-
Expand source code
class System: def __init__(self, upper_level_system, name : str ): self.upper_level_system = upper_level_system self._name = name self.blocks_in_system = [] # counter for system input signals # This determines the order of teh arguments of the generated c++ functions self.simulation_input_signal_counter = 0 self._top_level_system = None if upper_level_system is None: self._top_level_system = self self.block_id_counter = 0 self.signal_id_counter = 0 # manager to determine datatypes as new blocks are added # only for the highest-level system -- subsystems use the # datatype propagation of the main system self.datatype_propagation_instance = DatatypePropagation(self) else: self._top_level_system = upper_level_system._top_level_system # # share the counter of the # self.block_id_counter = upper_level_system.block_id_counter # self.signal_id_counter = upper_level_system.signal_id_counter # re-use the upper-level propagation self.datatype_propagation_instance = upper_level_system.datatype_propagation_instance # components self.components_ = {} # subsystems self._subsystems = [] # primary outputs self._output_signals = [] # signals that must be computed self._signals_mandatory_to_compute = [] # the results of the compilation of this system self.compile_result = None def getName(self): return self._name @property def name(self): return self._name @property def parent_system(self): return self.upper_level_system def generate_new_block_id(self): self._top_level_system.block_id_counter += 1 return self._top_level_system.block_id_counter # get a new unique id for creating a signal def generate_new_signal_id(self): self._top_level_system.signal_id_counter += 1 return self._top_level_system.signal_id_counter def append_subsystem(self, system): """ add a subsystem to the list of subsystems to this system """ self._subsystems.append( system ) @property def subsystems(self): return self._subsystems def addBlock(self, blk : Block): self.blocks_in_system.append(blk) # TODO: remove this? def set_primary_outputs(self, outputSignals): self._output_signals = outputSignals def append_primay_ouput(self, outputSignals): self._output_signals.append(outputSignals) @property def primary_outputs(self): return self._output_signals def add_signal_mandatory_to_compute(self, signal): self._signals_mandatory_to_compute = self._signals_mandatory_to_compute + [ signal ] @property def signals_mandatory_to_compute(self): return self._signals_mandatory_to_compute def ShowBlocks(self): print("-----------------------------") print("Blocks in simulation " + self._name + ":") print("-----------------------------") for blk in self.blocks_in_system: print(Fore.YELLOW + "* " + Style.RESET_ALL + "'" + blk.name + "' (" + str(blk.id) + ")" ) # list input singals print(Fore.RED + " input signals") if blk.getInputSignals() is not None and len( blk.getInputSignals() ) > 0: for inSig in blk.getInputSignals(): print(Style.DIM + " - " + inSig.toStr() ) else: print(Style.DIM + " * undef *") # list output singals if len( blk.getOutputSignals() ) > 0: print(Fore.GREEN + " output signals") for inSig in blk.getOutputSignals(): print(Style.DIM + " - " + inSig.toStr() ) print() print(" nested subsystems") for sys in self._subsystems: print(" - " + sys.getName() ) @property def components(self): return self.components_ def exportGraph(self): # TODO: remove from this class and move to aonther class 'visualization' or 'editor' def createBlockNode(nodes_array_index, block): idstr = 'bid_' + str( block.id ) node = {} node['name'] = block.name node['type'] = 'block' node['tostr'] = block.toStr() node['id'] = idstr node['nodes_array_index'] = nodes_array_index return node, idstr def createSimulationInputNode(nodes_array_index, inSig): # append a node that stands for a simulation input idstr = 'insig_' + inSig.name node = {} node['name'] = 'in_' + inSig.name node['type'] = 'simulation_input' # node['tostr'] = block.toStr() node['id'] = idstr node['nodes_array_index'] = nodes_array_index return node, idstr def createLink(signal, sourceBlock, destBlock): # create a link in-between blocks link = {} link['tostr'] = signal.toStr() link['name'] = signal.name link['source'] = '' link['target'] = '' link['source_bid'] = sourceBlock.id link['target_bid'] = destBlock.id link['source_key'] = 'bid_' + str( sourceBlock.id ) link['target_key'] = 'bid_' + str( destBlock.id ) return link def createLinkFromSimulationInput(signal, destBlock): # create a link between a maker-node for the simulation input signal # anf the destination block link = {} link['tostr'] = signal.toStr() link['name'] = signal.name link['source'] = '' link['target'] = '' link['target_bid'] = destBlock.id link['source_key'] = idstr link['target_key'] = 'bid_' + str( destBlock.id ) return link # init/reset the list of all nodes and links nodes_array = [] nodes_hash = {} links = [] # create a node for each block in the simulation nodes_array_index = 0 for block in self.blocks_in_system: node, idstr = createBlockNode(nodes_array_index, block) nodes_array.append( node ) nodes_hash[idstr] = node nodes_array_index += 1 # build links for blk in self.blocks_in_system: # list input singals if blk.getInputSignals() is not None and len( blk.getInputSignals() ) > 0: print(Fore.RED + " input signals") for inSig in blk.getInputSignals(): print(Style.DIM + " - " + inSig.toStr() ) if isinstance(inSig.lookupSource(), BlockOutputSignal): # this is a block to block connection. Create a normal link in-between sourceBlock = inSig.getSourceBlock() link = createLink(signal=inSig, sourceBlock=sourceBlock, destBlock=blk) links.append( link ) if isinstance(inSig, SimulationInputSignal): # this is an input to the simulation: add a special marker node # and add a link from this newly created node to the block # TODO: only create this node, of it does not already exist node, idstr = createSimulationInputNode(nodes_array_index, inSig) nodes_array.append( node ) nodes_hash[idstr] = node nodes_array_index += 1 link = createLinkFromSimulationInput(inSig, blk) links.append( link ) # create nodes/links/something for the simulation outputs # (TODO) # finish the graph structure graph = {} graph['nodes_hash'] = nodes_hash graph['nodes'] = nodes_array # d3 requires an array graph['links'] = links # print graph import json print(json.dumps(graph, indent=4, sort_keys=True)) # return graph @property def blocks(self): return self.blocks_in_system def resolve_anonymous_signals(self, ignore_signals_with_datatype_inheritance=False): """ close down the anonymous signals and wire the connected blocks directly to the source. """ for block in self.blocks_in_system: block.verifyInputSignals(ignore_signals_with_datatype_inheritance) def propagate_datatypes(self): self.resolve_anonymous_signals(ignore_signals_with_datatype_inheritance=True) # find out the output datatypes self.datatype_propagation_instance.fixateTypes() # execute this later in the compilatin process # for block in self.blocks_in_system: # block.verifyInputSignals(ignore_signals_with_datatype_inheritance=False) @property def signal_with_unresolved_datatypes(self): return self.datatype_propagation_instance.signalsWithUnderminedTypes
Instance variables
var blocks
-
Expand source code
@property def blocks(self): return self.blocks_in_system
var components
-
Expand source code
@property def components(self): return self.components_
var name
-
Expand source code
@property def name(self): return self._name
var parent_system
-
Expand source code
@property def parent_system(self): return self.upper_level_system
var primary_outputs
-
Expand source code
@property def primary_outputs(self): return self._output_signals
var signal_with_unresolved_datatypes
-
Expand source code
@property def signal_with_unresolved_datatypes(self): return self.datatype_propagation_instance.signalsWithUnderminedTypes
var signals_mandatory_to_compute
-
Expand source code
@property def signals_mandatory_to_compute(self): return self._signals_mandatory_to_compute
var subsystems
-
Expand source code
@property def subsystems(self): return self._subsystems
Methods
def ShowBlocks(self)
-
Expand source code
def ShowBlocks(self): print("-----------------------------") print("Blocks in simulation " + self._name + ":") print("-----------------------------") for blk in self.blocks_in_system: print(Fore.YELLOW + "* " + Style.RESET_ALL + "'" + blk.name + "' (" + str(blk.id) + ")" ) # list input singals print(Fore.RED + " input signals") if blk.getInputSignals() is not None and len( blk.getInputSignals() ) > 0: for inSig in blk.getInputSignals(): print(Style.DIM + " - " + inSig.toStr() ) else: print(Style.DIM + " * undef *") # list output singals if len( blk.getOutputSignals() ) > 0: print(Fore.GREEN + " output signals") for inSig in blk.getOutputSignals(): print(Style.DIM + " - " + inSig.toStr() ) print() print(" nested subsystems") for sys in self._subsystems: print(" - " + sys.getName() )
def addBlock(self, blk:
openrtdynamics2.lang.diagram_core.signal_network.Block' from '/Users/chr/Google Drive/ORTD/PythonAPI_Experiments/openrtdynamics2/lang/diagram_core/signal_network/Block.py'>) -
Expand source code
def addBlock(self, blk : Block): self.blocks_in_system.append(blk)
def add_signal_mandatory_to_compute(self, signal)
-
Expand source code
def add_signal_mandatory_to_compute(self, signal): self._signals_mandatory_to_compute = self._signals_mandatory_to_compute + [ signal ]
def append_primay_ouput(self, outputSignals)
-
Expand source code
def append_primay_ouput(self, outputSignals): self._output_signals.append(outputSignals)
def append_subsystem(self, system)
-
add a subsystem to the list of subsystems to this system
Expand source code
def append_subsystem(self, system): """ add a subsystem to the list of subsystems to this system """ self._subsystems.append( system )
def exportGraph(self)
-
Expand source code
def exportGraph(self): # TODO: remove from this class and move to aonther class 'visualization' or 'editor' def createBlockNode(nodes_array_index, block): idstr = 'bid_' + str( block.id ) node = {} node['name'] = block.name node['type'] = 'block' node['tostr'] = block.toStr() node['id'] = idstr node['nodes_array_index'] = nodes_array_index return node, idstr def createSimulationInputNode(nodes_array_index, inSig): # append a node that stands for a simulation input idstr = 'insig_' + inSig.name node = {} node['name'] = 'in_' + inSig.name node['type'] = 'simulation_input' # node['tostr'] = block.toStr() node['id'] = idstr node['nodes_array_index'] = nodes_array_index return node, idstr def createLink(signal, sourceBlock, destBlock): # create a link in-between blocks link = {} link['tostr'] = signal.toStr() link['name'] = signal.name link['source'] = '' link['target'] = '' link['source_bid'] = sourceBlock.id link['target_bid'] = destBlock.id link['source_key'] = 'bid_' + str( sourceBlock.id ) link['target_key'] = 'bid_' + str( destBlock.id ) return link def createLinkFromSimulationInput(signal, destBlock): # create a link between a maker-node for the simulation input signal # anf the destination block link = {} link['tostr'] = signal.toStr() link['name'] = signal.name link['source'] = '' link['target'] = '' link['target_bid'] = destBlock.id link['source_key'] = idstr link['target_key'] = 'bid_' + str( destBlock.id ) return link # init/reset the list of all nodes and links nodes_array = [] nodes_hash = {} links = [] # create a node for each block in the simulation nodes_array_index = 0 for block in self.blocks_in_system: node, idstr = createBlockNode(nodes_array_index, block) nodes_array.append( node ) nodes_hash[idstr] = node nodes_array_index += 1 # build links for blk in self.blocks_in_system: # list input singals if blk.getInputSignals() is not None and len( blk.getInputSignals() ) > 0: print(Fore.RED + " input signals") for inSig in blk.getInputSignals(): print(Style.DIM + " - " + inSig.toStr() ) if isinstance(inSig.lookupSource(), BlockOutputSignal): # this is a block to block connection. Create a normal link in-between sourceBlock = inSig.getSourceBlock() link = createLink(signal=inSig, sourceBlock=sourceBlock, destBlock=blk) links.append( link ) if isinstance(inSig, SimulationInputSignal): # this is an input to the simulation: add a special marker node # and add a link from this newly created node to the block # TODO: only create this node, of it does not already exist node, idstr = createSimulationInputNode(nodes_array_index, inSig) nodes_array.append( node ) nodes_hash[idstr] = node nodes_array_index += 1 link = createLinkFromSimulationInput(inSig, blk) links.append( link ) # create nodes/links/something for the simulation outputs # (TODO) # finish the graph structure graph = {} graph['nodes_hash'] = nodes_hash graph['nodes'] = nodes_array # d3 requires an array graph['links'] = links # print graph import json print(json.dumps(graph, indent=4, sort_keys=True)) # return graph
def generate_new_block_id(self)
-
Expand source code
def generate_new_block_id(self): self._top_level_system.block_id_counter += 1 return self._top_level_system.block_id_counter
def generate_new_signal_id(self)
-
Expand source code
def generate_new_signal_id(self): self._top_level_system.signal_id_counter += 1 return self._top_level_system.signal_id_counter
def getName(self)
-
Expand source code
def getName(self): return self._name
def propagate_datatypes(self)
-
Expand source code
def propagate_datatypes(self): self.resolve_anonymous_signals(ignore_signals_with_datatype_inheritance=True) # find out the output datatypes self.datatype_propagation_instance.fixateTypes()
def resolve_anonymous_signals(self, ignore_signals_with_datatype_inheritance=False)
-
close down the anonymous signals and wire the connected blocks directly to the source.
Expand source code
def resolve_anonymous_signals(self, ignore_signals_with_datatype_inheritance=False): """ close down the anonymous signals and wire the connected blocks directly to the source. """ for block in self.blocks_in_system: block.verifyInputSignals(ignore_signals_with_datatype_inheritance)
def set_primary_outputs(self, outputSignals)
-
Expand source code
def set_primary_outputs(self, outputSignals): self._output_signals = outputSignals