Source code for revpimodio2.io

# -*- coding: utf-8 -*-
"""RevPiModIO module for managing IOs."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv2"

import struct
import warnings
from re import match as rematch
from threading import Event

from ._internal import consttostr, RISING, FALLING, BOTH, INP, OUT, MEM, PROCESS_IMAGE_SIZE

try:
    # Only works on Unix
    from fcntl import ioctl
except Exception:
    ioctl = None


[docs] class IOEvent(object): """Base class for IO events.""" __slots__ = "as_thread", "delay", "edge", "func", "overwrite", "prefire"
[docs] def __init__(self, func, edge, as_thread, delay, overwrite, prefire): """Init IOEvent class.""" self.as_thread = as_thread self.delay = delay self.edge = edge self.func = func self.overwrite = overwrite self.prefire = prefire
[docs] class IOList(object): """Base class for direct access to IO objects."""
[docs] def __init__(self, modio): """Init IOList class.""" self.__dict_iobyte = {k: [] for k in range(PROCESS_IMAGE_SIZE)} self.__dict_iorefname = {} self.__modio = modio
[docs] def __contains__(self, key): """ Checks if IO exists. :param key: IO name <class 'str'> or byte number <class 'int'> :return: True if IO exists / byte is occupied """ if type(key) == int: return len(self.__dict_iobyte.get(key, [])) > 0 else: return hasattr(self, key) and type(getattr(self, key)) != DeadIO
[docs] def __delattr__(self, key): """ Removes specified IO. :param key: IO to remove """ io_del = object.__getattribute__(self, key) # Delete old events from device io_del.unreg_event() # Remove IO from byte list and attributes if io_del._bitshift: self.__dict_iobyte[io_del.address][io_del._bitaddress] = None # Do not use any() because we want to know None, not 0 if self.__dict_iobyte[io_del.address] == [ None, None, None, None, None, None, None, None, ]: self.__dict_iobyte[io_del.address] = [] else: self.__dict_iobyte[io_del.address].remove(io_del) object.__delattr__(self, key) io_del._parentdevice._update_my_io_list()
[docs] def __enter__(self): """ Read inputs on entering context manager and write outputs on leaving. All entries are read when entering the context manager. Within the context manager, further .readprocimg() or .writeprocimg() calls can be made and the process image can be read or written. When exiting, all outputs are always written into the process image. When 'autorefresh=True' is used, all read or write actions in the background are performed automatically. """ if not self.__modio._context_manager: # If ModIO itself is in a context manager, it sets the _looprunning=True flag itself if self.__modio._looprunning: raise RuntimeError("can not enter context manager inside mainloop or cycleloop") self.__modio._looprunning = True self.__modio.readprocimg() return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """Write outputs to process image before leaving the context manager.""" if self.__modio._imgwriter.is_alive(): # Reset new data flat to sync with imgwriter self.__modio._imgwriter.newdata.clear() # Write outputs on devices without autorefresh if not self.__modio._monitoring: self.__modio.writeprocimg() if self.__modio._imgwriter.is_alive(): # Wait until imgwriter has written outputs self.__modio._imgwriter.newdata.wait(2.5) if not self.__modio._context_manager: # Do not reset if ModIO is in a context manager itself, it will handle that flag self.__modio._looprunning = False
[docs] def __getattr__(self, key): """ Manages deleted IOs (attributes that do not exist). :param key: Name or byte of an old IO :return: Old IO if in ref lists """ if key in self.__dict_iorefname: return self.__dict_iorefname[key] else: raise AttributeError("can not find io '{0}'".format(key))
[docs] def __getitem__(self, key): """ Retrieves specified IO. If the key is <class 'str'>, a single IO is returned. If the key is passed as <class 'int'>, a <class 'list'> is returned with 0, 1 or 8 entries. If a <class 'slice'> is given as key, the lists are returned in a list. :param key: IO name as <class 'str'> or byte as <class 'int'>. :return: IO object or list of IOs """ if type(key) == int: if key not in self.__dict_iobyte: raise IndexError("byte '{0}' does not exist".format(key)) return self.__dict_iobyte[key] elif type(key) == slice: return [ self.__dict_iobyte[int_io] for int_io in range(key.start, key.stop, 1 if key.step is None else key.step) ] else: return getattr(self, key)
[docs] def __iter__(self): """ Returns iterator of all IOs. :return: Iterator of all IOs """ for int_io in sorted(self.__dict_iobyte): for io in self.__dict_iobyte[int_io]: if io is not None: yield io
[docs] def __len__(self): """ Returns the number of all IOs. :return: Number of all IOs """ int_ios = 0 for int_io in self.__dict_iobyte: for io in self.__dict_iobyte[int_io]: if io is not None: int_ios += 1 return int_ios
[docs] def __setattr__(self, key, value): """Prohibits direct setting of attributes for performance reasons.""" if key in ( "_IOList__dict_iobyte", "_IOList__dict_iorefname", "_IOList__modio", ): object.__setattr__(self, key, value) else: raise AttributeError("direct assignment is not supported - use .value Attribute")
def __private_replace_oldio_with_newio(self, io) -> None: """ Replaces existing IOs with the newly registered one. :param io: New IO to be inserted """ # Define scan range if io._bitshift: scan_start = io._parentio_address scan_stop = scan_start + io._parentio_length else: scan_start = io.address scan_stop = scan_start + (1 if io._length == 0 else io._length) # Collect default value over multiple bytes calc_defaultvalue = b"" for i in range(scan_start, scan_stop): for oldio in self.__dict_iobyte[i]: if type(oldio) == StructIO: # There is already a new IO here if oldio._bitshift: if ( io._bitshift == oldio._bitshift and io._slc_address == oldio._slc_address ): raise MemoryError( "bit {0} already assigned to '{1}'".format( io._bitaddress, oldio._name ) ) else: # Already overwritten bytes are invalid raise MemoryError( "new io '{0}' overlaps memory of '{1}'".format(io._name, oldio._name) ) elif oldio is not None: # Remember IOs in the memory area of the new IO if io._bitshift: # Store IOs for ref at bitaddress self.__dict_iorefname[oldio._name] = DeadIO(oldio) else: # Calculate default value oldio.byteorder = io._byteorder if io._byteorder == "little": calc_defaultvalue += oldio._defaultvalue else: calc_defaultvalue = oldio._defaultvalue + calc_defaultvalue # Remove IOs from lists delattr(self, oldio._name) if io._defaultvalue is None: # Only take over for StructIO and no given defaultvalue if io._bitshift: io_byte_address = io._parentio_address - io.address io._defaultvalue = bool(io._parentio_defaultvalue[io_byte_address] & io._bitshift) else: io._defaultvalue = calc_defaultvalue def _private_register_new_io_object(self, new_io) -> None: """ Registers new IO object independently of __setattr__. :param new_io: New IO object """ if isinstance(new_io, IOBase): if hasattr(self, new_io._name): raise AttributeError( "attribute {0} already exists - can not set io".format(new_io._name) ) do_replace = type(new_io) is StructIO if do_replace: self.__private_replace_oldio_with_newio(new_io) # Adapt byte dict for address access if new_io._bitshift: if len(self.__dict_iobyte[new_io.address]) != 8: # "Quickly" create 8 entries since these are BIT IOs self.__dict_iobyte[new_io.address] += [ None, None, None, None, None, None, None, None, ] # Check for overlapping IOs if ( not do_replace and self.__dict_iobyte[new_io.address][new_io._bitaddress] is not None ): warnings.warn( "ignore io '{0}', as an io already exists at the address '{1} Bit {2}'. " "this can be caused by an incorrect pictory configuration.".format( new_io.name, new_io.address, new_io._bitaddress, ), Warning, ) return self.__dict_iobyte[new_io.address][new_io._bitaddress] = new_io else: # Search the previous IO to calculate the length offset_end = new_io.address search_index = new_io.address while search_index >= 0: previous_io = self.__dict_iobyte[search_index] if len(previous_io) == 8: # Bits on this address are always 1 byte offset_end -= 1 elif len(previous_io) == 1: # Found IO, calculate offset + length of IO offset_end = previous_io[0].address + previous_io[0].length break search_index -= 1 # Check if the length of the previous IO overlaps with the new IO if offset_end > new_io.address: warnings.warn( "ignore io '{0}', as an io already exists at the address '{1}'. " "this can be caused by an incorrect pictory configuration.".format( new_io.name, new_io.address, ), Warning, ) return self.__dict_iobyte[new_io.address].append(new_io) object.__setattr__(self, new_io._name, new_io) if type(new_io) is StructIO: new_io._parentdevice._update_my_io_list() else: raise TypeError("io must be <class 'IOBase'> or sub class")
class DeadIO(object): """Class for managing replaced IOs.""" __slots__ = "__deadio" def __init__(self, deadio): """ Instantiation of the DeadIO class. :param deadio: IO that was replaced """ self.__deadio = deadio def replace_io(self, name: str, frm: str, **kwargs) -> None: """ Provides function for further bit replacements. :ref: :func:IntIOReplaceable.replace_io() """ self.__deadio.replace_io(name, frm, **kwargs) _parentdevice = property(lambda self: None)
[docs] class IOBase(object): """ Base class for all IO objects. The basic functionality enables reading and writing of values as <class bytes'> or <class 'bool'>. This is decided during instantiation. If a bit address is specified, <class 'bool'> values are expected and returned, otherwise <class bytes'>. This class serves as a basis for other IO classes with which the values can also be used as <class 'int'>. """ __slots__ = ( "__bit_ioctl_off", "__bit_ioctl_on", "_bitaddress", "_bitshift", "_bitlength", "_byteorder", "_defaultvalue", "_export", "_iotype", "_length", "_name", "_parentdevice", "_read_only_io", "_signed", "_slc_address", "bmk", )
[docs] def __init__(self, parentdevice, valuelist: list, iotype: int, byteorder: str, signed: bool): """ Instantiation of the IOBase class. :param parentdevice: Parent device on which the IO is located :param valuelist: Data list for instantiation ["name","defval","bitlen","startaddrdev",exp,"idx","bmk","bitaddr"] :param iotype: <class 'int'> value :param byteorder: Byteorder 'little'/'big' for <class 'int'> calculation :param signed: Perform int calculation with sign """ # ["name","defval","bitlen","startaddrdev",exp,"idx","bmk","bitaddr"] # [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ] self._parentdevice = parentdevice # Break down bit addresses to bytes and convert self._bitaddress = -1 if valuelist[7] == "" else int(valuelist[7]) % 8 self._bitshift = None if self._bitaddress == -1 else 1 << self._bitaddress # Length calculation self._bitlength = int(valuelist[2]) self._length = 1 if self._bitaddress == 0 else int(self._bitlength / 8) self.__bit_ioctl_off = None self.__bit_ioctl_on = None self._read_only_io = iotype != OUT self._byteorder = byteorder self._iotype = iotype self._name = valuelist[0] self._signed = signed self.bmk = valuelist[6] self._export = int(valuelist[4]) & 1 int_startaddress = int(valuelist[3]) if self._bitshift: # Wrap bits higher than 7 to next bytes int_startaddress += int(int(valuelist[7]) / 8) self._slc_address = slice(int_startaddress, int_startaddress + 1) # Determine default value, otherwise False if valuelist[1] is None and type(self) == StructIO: self._defaultvalue = None else: try: self._defaultvalue = bool(int(valuelist[1])) except Exception: self._defaultvalue = False # Set ioctl for bit setting self.__bit_ioctl_off = struct.pack("<HB", self._get_address(), self._bitaddress) self.__bit_ioctl_on = self.__bit_ioctl_off + b"\x01" else: self._slc_address = slice(int_startaddress, int_startaddress + self._length) if str(valuelist[1]).isdigit(): # Convert default value from number to bytes self._defaultvalue = int(valuelist[1]).to_bytes( self._length, byteorder=self._byteorder ) elif valuelist[1] is None and type(self) == StructIO: # Set to None to take over calculated values later self._defaultvalue = None elif type(valuelist[1]) == bytes: # Take default value directly from bytes if len(valuelist[1]) == self._length: self._defaultvalue = valuelist[1] else: raise ValueError( "given bytes for default value must have a length " "of {0} but {1} was given" "".format(self._length, len(valuelist[1])) ) else: # Fill default value with empty bytes self._defaultvalue = bytes(self._length) # Try to convert string to ASCII bytes if type(valuelist[1]) == str: try: buff = valuelist[1].encode("ASCII") if len(buff) <= self._length: self._defaultvalue = buff + bytes(self._length - len(buff)) except Exception: pass
[docs] def __bool__(self): """ <class 'bool'> value of the class. :return: <class 'bool'> Only False if False or 0, otherwise True """ if self._bitshift: return bool(self._parentdevice._ba_devdata[self._slc_address.start] & self._bitshift) else: return any(self._parentdevice._ba_devdata[self._slc_address])
[docs] def __call__(self, value=None): """ Get or set the IO value using function call syntax. :param value: If None, returns current value; otherwise sets the value :return: Current IO value when called without arguments """ if value is None: # Inline get_value() if self._bitshift: return bool( self._parentdevice._ba_devdata[self._slc_address.start] & self._bitshift ) else: return bytes(self._parentdevice._ba_devdata[self._slc_address]) else: self.set_value(value)
[docs] def __len__(self): """ Returns the byte length of the IO. :return: Byte length of the IO - 0 for BITs """ return 0 if self._bitaddress > 0 else self._length
[docs] def __str__(self): """ <class 'str'> value of the class. :return: Name of the IO """ return self._name
def __reg_xevent( self, func, delay: int, edge: int, as_thread: bool, overwrite: bool, prefire: bool ) -> None: """ Manages reg_event and reg_timerevent. :param func: Function to be called on change :param delay: Delay in ms for triggering - also on value change :param edge: Execute on RISING, FALLING or BOTH value change :param as_thread: If True, execute function as EventCallback thread :param overwrite: If True, event will be overwritten :param prefire: Trigger with current value when mainloop starts """ # Check if function is callable if not callable(func): raise ValueError("registered function '{0}' is not callable".format(func)) if type(delay) != int or delay < 0: raise ValueError("'delay' must be <class 'int'> and greater or equal 0") if edge != BOTH and not self._bitshift: raise ValueError("parameter 'edge' can be used with bit io objects only") if prefire and self._parentdevice._modio._looprunning: raise RuntimeError("prefire can not be used if mainloop is running") if self not in self._parentdevice._dict_events: with self._parentdevice._filelock: self._parentdevice._dict_events[self] = [ IOEvent(func, edge, as_thread, delay, overwrite, prefire) ] else: # Check if function is already registered for regfunc in self._parentdevice._dict_events[self]: if regfunc.func != func: # Test next entry continue if edge == BOTH or regfunc.edge == BOTH: if self._bitshift: raise RuntimeError( "io '{0}' with function '{1}' already in list " "with edge '{2}' - edge '{3}' not allowed anymore" "".format(self._name, func, consttostr(regfunc.edge), consttostr(edge)) ) else: raise RuntimeError( "io '{0}' with function '{1}' already in list." "".format(self._name, func) ) elif regfunc.edge == edge: raise RuntimeError( "io '{0}' with function '{1}' for given edge '{2}' " "already in list".format(self._name, func, consttostr(edge)) ) # Insert event function with self._parentdevice._filelock: self._parentdevice._dict_events[self].append( IOEvent(func, edge, as_thread, delay, overwrite, prefire) ) def _get_address(self) -> int: """ Returns the absolute byte address in the process image. :return: Absolute byte address """ return self._parentdevice._offset + self._slc_address.start def _get_byteorder(self) -> str: """ Returns configured byteorder. :return: <class 'str'> Byteorder """ return self._byteorder def _get_export(self) -> bool: """Return value of export flag.""" return bool(self._export & 1) def _get_iotype(self) -> int: """ Returns io type. :return: <class 'int'> io type """ return self._iotype def _set_export(self, value: bool) -> None: """Set value of export flag and remember this change for export.""" if type(value) != bool: raise ValueError("Value must be <class 'bool'>") self._export = 2 + int(value) def _write_to_procimg(self) -> bool: """ Write value of io directly to the process image. :return: True after successful write operation """ if not self._parentdevice._shared_procimg: raise RuntimeError("device is not marked for shared_procimg") # note: Will not be removed from _shared_write on direct call if self._bitshift: # Write single bit to process image value = self._parentdevice._ba_devdata[self._slc_address.start] & self._bitshift if self._parentdevice._modio._run_on_pi: # IOCTL on the RevPi with self._parentdevice._modio._myfh_lck: try: # Perform set value (function K+16) ioctl( self._parentdevice._modio._myfh, 19216, self.__bit_ioctl_on if value else self.__bit_ioctl_off, ) except Exception as e: self._parentdevice._modio._gotioerror("ioset", e) return False elif hasattr(self._parentdevice._modio._myfh, "ioctl"): # IOCTL over network with self._parentdevice._modio._myfh_lck: try: self._parentdevice._modio._myfh.ioctl( 19216, self.__bit_ioctl_on if value else self.__bit_ioctl_off, ) except Exception as e: self._parentdevice._modio._gotioerror("net_ioset", e) return False else: # Simulate IOCTL in file try: # Execute set value (function K+16) self._parentdevice._modio._simulate_ioctl( 19216, self.__bit_ioctl_on if value else self.__bit_ioctl_off, ) except Exception as e: self._parentdevice._modio._gotioerror("file_ioset", e) return False else: # Write one or more bytes to process image value = bytes(self._parentdevice._ba_devdata[self._slc_address]) with self._parentdevice._modio._myfh_lck: try: self._parentdevice._modio._myfh.seek(self._get_address()) self._parentdevice._modio._myfh.write(value) if self._parentdevice._modio._buffedwrite: self._parentdevice._modio._myfh.flush() except IOError as e: self._parentdevice._modio._gotioerror("ioset", e) return False return True
[docs] def get_defaultvalue(self): """ Returns the default value from piCtory. :return: Default value as <class 'byte'> or <class 'bool'> """ return self._defaultvalue
[docs] def get_value(self): """ Returns the value of the IO. :return: IO value as <class 'bytes'> or <class 'bool'> """ if self._bitshift: return bool(self._parentdevice._ba_devdata[self._slc_address.start] & self._bitshift) else: return bytes(self._parentdevice._ba_devdata[self._slc_address])
[docs] def reg_event(self, func, delay=0, edge=BOTH, as_thread=False, prefire=False): """ Registers an event for the IO in the event monitoring. The passed function is executed when the IO value changes. With specification of optional parameters, the trigger behavior can be controlled. NOTE: The delay time must fit into .cycletime, if not, it will ALWAYS be rounded up! :param func: Function to be called on change :param delay: Delay in ms for triggering if value stays the same :param edge: Execute on RISING, FALLING or BOTH value change :param as_thread: If True, execute function as EventCallback thread :param prefire: Trigger with current value when mainloop starts """ self.__reg_xevent(func, delay, edge, as_thread, True, prefire)
[docs] def reg_timerevent(self, func, delay, edge=BOTH, as_thread=False, prefire=False): """ Registers a timer for the IO which executes func after delay. The timer is started when the IO value changes and executes the passed function - even if the IO value has changed in the meantime. If the timer has not expired and the condition is met again, the timer is NOT reset to the delay value or started a second time. For this behavior, .reg_event(..., delay=value) can be used. NOTE: The delay time must fit into .cycletime, if not, it will ALWAYS be rounded up! :param func: Function to be called on change :param delay: Delay in ms for triggering - also on value change :param edge: Execute on RISING, FALLING or BOTH value change :param as_thread: If True, execute function as EventCallback thread :param prefire: Trigger with current value when mainloop starts """ self.__reg_xevent(func, delay, edge, as_thread, False, prefire)
[docs] def set_value(self, value) -> None: """ Sets the value of the IO. :param value: IO value as <class bytes'> or <class 'bool'> """ if self._read_only_io: if self._iotype == INP: if self._parentdevice._modio._simulator: raise RuntimeError( "can not write to output '{0}' in simulator mode".format(self._name) ) else: raise RuntimeError("can not write to input '{0}'".format(self._name)) elif self._iotype == MEM: raise RuntimeError("can not write to memory '{0}'".format(self._name)) raise RuntimeError("the io object '{0}' is read only".format(self._name)) if self._bitshift: # Try to convert any type to bool value = bool(value) # Lock for bit operations self._parentdevice._filelock.acquire() if self._parentdevice._shared_procimg: # Mark this IO for write operations self._parentdevice._shared_write.add(self) # There is always only one byte here, get as int int_byte = self._parentdevice._ba_devdata[self._slc_address.start] # Compare current value and set if necessary if not bool(int_byte & self._bitshift) == value: if value: int_byte += self._bitshift else: int_byte -= self._bitshift # Write back if changed self._parentdevice._ba_devdata[self._slc_address.start] = int_byte self._parentdevice._filelock.release() else: if type(value) != bytes: raise TypeError( "'{0}' requires a <class 'bytes'> object, not {1}".format( self._name, type(value) ) ) if self._length != len(value): raise ValueError( "'{0}' requires a <class 'bytes'> object of " "length {1}, but {2} was given".format(self._name, self._length, len(value)) ) if self._parentdevice._shared_procimg: with self._parentdevice._filelock: # Mark this IO as changed self._parentdevice._shared_write.add(self) self._parentdevice._ba_devdata[self._slc_address] = value
[docs] def unreg_event(self, func=None, edge=None) -> None: """ Removes an event from event monitoring. :param func: Only events with specified function :param edge: Only events with specified function and specified edge """ if self in self._parentdevice._dict_events: if func is None: with self._parentdevice._filelock: del self._parentdevice._dict_events[self] else: newlist = [] for regfunc in self._parentdevice._dict_events[self]: if regfunc.func != func or edge is not None and regfunc.edge != edge: newlist.append(regfunc) # If functions remain, take them over with self._parentdevice._filelock: if len(newlist) > 0: self._parentdevice._dict_events[self] = newlist else: del self._parentdevice._dict_events[self]
[docs] def wait(self, edge=BOTH, exitevent=None, okvalue=None, timeout=0) -> int: """ Waits for value change of an IO. The value change is always checked when new data has been read for devices with autorefresh enabled. On value change, waiting ends with 0 as return value. NOTE: If <class 'ProcimgWriter'> does not deliver new data, it will wait forever (not when "timeout" is specified). If edge is specified with RISING or FALLING, this edge must be triggered. If the value is 1 when entering with edge RISING, the wait will only end when changing from 0 to 1. A <class 'threading.Event'> object can be passed as exitevent, which ends the waiting immediately with 1 as return value when is_set(). If the value okvalue is present at the IO for waiting, the waiting ends immediately with -1 as return value. The timeout value aborts the waiting immediately when reached with value 2 as return value. (The timeout is calculated via the cycle time of the autorefresh function, so it does not correspond exactly to the specified milliseconds! It is always rounded up!) :param edge: Edge RISING, FALLING, BOTH that must occur :param exitevent: <class 'threading.Event'> for early termination :param okvalue: IO value at which waiting ends immediately :param timeout: Time in ms after which to abort :return: <class 'int'> successful values <= 0 - Successfully waited - Value 0: IO has changed value - Value -1: okvalue matched IO - Erroneously waited - Value 1: exitevent was set - Value 2: timeout expired - Value 100: Devicelist.exit() was called """ # Check if device is in autorefresh if not self._parentdevice._selfupdate: raise RuntimeError( "autorefresh is not activated for device '{0}|{1}' - there " "will never be new data".format( self._parentdevice._position, self._parentdevice._name ) ) if not (RISING <= edge <= BOTH): raise ValueError( "parameter 'edge' must be revpimodio2.RISING, " "revpimodio2.FALLING or revpimodio2.BOTH" ) if not (exitevent is None or type(exitevent) == Event): raise TypeError("parameter 'exitevent' must be <class 'threading.Event'>") if type(timeout) != int or timeout < 0: raise ValueError("parameter 'timeout' must be <class 'int'> and greater than 0") if edge != BOTH and not self._bitshift: raise ValueError("parameter 'edge' can be used with bit Inputs only") # Check abort value if okvalue == self.value: return -1 # WaitExit Event säubern self._parentdevice._modio._waitexit.clear() val_start = self.value timeout = timeout / 1000 bool_timecount = timeout > 0 if exitevent is None: exitevent = Event() flt_timecount = 0 if bool_timecount else -1 while ( not self._parentdevice._modio._waitexit.is_set() and not exitevent.is_set() and flt_timecount < timeout ): if self._parentdevice._modio._imgwriter.newdata.wait(2.5): self._parentdevice._modio._imgwriter.newdata.clear() if val_start != self.value: if ( edge == BOTH or edge == RISING and not val_start or edge == FALLING and val_start ): return 0 else: val_start = not val_start if bool_timecount: flt_timecount += self._parentdevice._modio._imgwriter._refresh elif bool_timecount: flt_timecount += 2.5 # Abort event was set if exitevent.is_set(): return 1 # RevPiModIO mainloop was exited if self._parentdevice._modio._waitexit.is_set(): return 100 # Timeout expired return 2
address = property(_get_address) byteorder = property(_get_byteorder) defaultvalue = property(get_defaultvalue) export = property(_get_export, _set_export) length = property(__len__) name = property(__str__) type = property(_get_iotype) value = property(get_value, set_value)
[docs] class IntIO(IOBase): """ Class for accessing data with conversion to int. This class extends the functionality of <class 'IOBase'> with functions for working with <class 'int'> values. For the conversion, 'byteorder' (default 'little') and 'signed' (default False) can be set as parameters. :ref: :class:`IOBase` """ __slots__ = ()
[docs] def __int__(self): """ Returns IO value considering byteorder/signed. :return: IO value as <class 'int'> """ return int.from_bytes( self._parentdevice._ba_devdata[self._slc_address], byteorder=self._byteorder, signed=self._signed, )
[docs] def __call__(self, value=None): """ Get or set the integer IO value using function call syntax. :param value: If None, returns current integer value; otherwise sets the integer value :return: Current IO value as integer when called without arguments :raises TypeError: If value is not an integer """ if value is None: # Inline get_intvalue() return int.from_bytes( self._parentdevice._ba_devdata[self._slc_address], byteorder=self._byteorder, signed=self._signed, ) else: # Inline from set_intvalue() if type(value) == int: self.set_value( value.to_bytes( self._length, byteorder=self._byteorder, signed=self._signed, ) ) else: raise TypeError( "'{0}' need a <class 'int'> value, but {1} was given" "".format(self._name, type(value)) )
def _get_signed(self) -> bool: """ Retrieves whether the value should be treated as signed. :return: True if signed """ return self._signed def _set_byteorder(self, value: str) -> None: """ Sets byteorder for <class 'int'> conversion. :param value: <class 'str'> 'little' or 'big' """ if not (value == "little" or value == "big"): raise ValueError("byteorder must be 'little' or 'big'") if self._byteorder != value: self._byteorder = value self._defaultvalue = self._defaultvalue[::-1] def _set_signed(self, value: bool) -> None: """ Sets whether the value should be treated as signed. :param value: True if to be treated as signed """ if type(value) != bool: raise TypeError("signed must be <class 'bool'> True or False") self._signed = value
[docs] def get_intdefaultvalue(self) -> int: """ Returns the default value as <class 'int'>. :return: <class 'int'> Default value """ return int.from_bytes(self._defaultvalue, byteorder=self._byteorder, signed=self._signed)
[docs] def get_intvalue(self) -> int: """ Returns IO value considering byteorder/signed. :return: IO value as <class 'int'> """ return int.from_bytes( self._parentdevice._ba_devdata[self._slc_address], byteorder=self._byteorder, signed=self._signed, )
[docs] def set_intvalue(self, value: int) -> None: """ Sets IO considering byteorder/signed. :param value: <class 'int'> Value """ if type(value) == int: self.set_value( value.to_bytes( self._length, byteorder=self._byteorder, signed=self._signed, ) ) else: raise TypeError( "'{0}' need a <class 'int'> value, but {1} was given" "".format(self._name, type(value)) )
byteorder = property(IOBase._get_byteorder, _set_byteorder) defaultvalue = property(get_intdefaultvalue) signed = property(_get_signed, _set_signed) value = property(get_intvalue, set_intvalue)
[docs] class IntIOCounter(IntIO): """Extends the IntIO class with the .reset() function for counters.""" __slots__ = ("__ioctl_arg",)
[docs] def __init__(self, counter_id, parentdevice, valuelist, iotype, byteorder, signed): """ Instantiation of the IntIOCounter class. :param counter_id: ID for the counter to which the IO belongs (0-15) :ref: :func:`IOBase.__init__(...)` """ if not isinstance(counter_id, int): raise TypeError("counter_id must be <class 'int'>") if not 0 <= counter_id <= 15: raise ValueError("counter_id must be 0 - 15") # Device position + empty + Counter_ID # ID-Bits: 7|6|5|4|3|2|1|0|15|14|13|12|11|10|9|8 self.__ioctl_arg = ( parentdevice._position.to_bytes(1, "little") + b"\x00" + (1 << counter_id).to_bytes(2, "little") ) """ IOCTL fills this struct, which has one byte free in memory after uint8_t due to padding. Therefore 4 bytes must be passed where the bitfield has little byteorder!!! typedef struct SDIOResetCounterStr { uint8_t i8uAddress; // Address of module uint16_t i16uBitfield; // bitfield, if bit n is 1, reset } SDIOResetCounter; """ # Load base class super().__init__(parentdevice, valuelist, iotype, byteorder, signed)
[docs] def reset(self) -> None: """Resets the counter of the input.""" if self._parentdevice._modio._monitoring: raise RuntimeError("can not reset counter, while system is in monitoring mode") if self._parentdevice._modio._simulator: raise RuntimeError("can not reset counter, while system is in simulator mode") if self._parentdevice._modio._run_on_pi: # IOCTL on the RevPi with self._parentdevice._modio._myfh_lck: try: # Execute counter reset (function K+20) ioctl(self._parentdevice._modio._myfh, 19220, self.__ioctl_arg) except Exception as e: self._parentdevice._modio._gotioerror("iorst", e) elif hasattr(self._parentdevice._modio._myfh, "ioctl"): # IOCTL over network with self._parentdevice._modio._myfh_lck: try: self._parentdevice._modio._myfh.ioctl(19220, self.__ioctl_arg) except Exception as e: self._parentdevice._modio._gotioerror("net_iorst", e) else: # Simulate IOCTL in file try: # Execute set value (function K+20) self._parentdevice._modio._simulate_ioctl(19220, self.__ioctl_arg) except Exception as e: self._parentdevice._modio._gotioerror("file_iorst", e)
class IntIOReplaceable(IntIO): """Extends the IntIO class with the .replace_io function.""" __slots__ = () def replace_io(self, name: str, frm: str, **kwargs) -> None: """ Replaces existing IO with new one. If the kwargs for byteorder and defaultvalue are not specified, the system takes the data from the replaced IO. Only a single format character 'frm' may be passed. From this, the required length in bytes is calculated and the data type is determined. Possible values are: - Bits / Bytes: ?, c, s - Integer : bB, hH, iI, lL, qQ - Float : e, f, d An exception is the 's' format. Here, multiple bytes can be combined into one long IO. The formatting must be '8s' for e.g. 8 bytes - NOT 'ssssssss'! If more bytes are needed by the formatting than the original IO has, the following IOs will also be used and removed. :param name: Name of the new input :param frm: struct formatting (1 character) or 'NUMBERs' e.g. '8s' :param kwargs: Additional parameters - bmk: internal designation for IO - bit: Registers IO as <class 'bool'> at the specified bit in the byte - byteorder: Byteorder for the IO, default=little - wordorder: Wordorder is applied before byteorder - defaultvalue: Default value for IO - event: Register function for event handling - delay: Delay in ms for triggering when value remains the same - edge: Execute event on RISING, FALLING or BOTH value change - as_thread: Executes the event function as RevPiCallback thread - prefire: Trigger with current value when mainloop starts `<https://docs.python.org/3/library/struct.html#format-characters>`_ """ # Create StructIO io_new = StructIO(self, name, frm, **kwargs) # Insert StructIO into IO list self._parentdevice._modio.io._private_register_new_io_object(io_new) # Optional Event eintragen reg_event = kwargs.get("event", None) if reg_event is not None: io_new.reg_event( reg_event, kwargs.get("delay", 0), kwargs.get("edge", BOTH), kwargs.get("as_thread", False), )
[docs] class RelaisOutput(IOBase): """ Class for relais outputs to access the cycle counters. This class extends the function of <class 'IOBase'> to the function 'get_cycles' and the property 'cycles' to retrieve the relay cycle counters. :ref: :class:`IOBase` """
[docs] def __init__(self, parentdevice, valuelist, iotype, byteorder, signed): """ Extend <class 'IOBase'> with functions to access cycle counters. :ref: :func:`IOBase.__init__(...)` """ super().__init__(parentdevice, valuelist, iotype, byteorder, signed) """ typedef struct SROGetCountersStr { /* Address of module in current configuration */ uint8_t i8uAddress; uint32_t counter[REVPI_RO_NUM_RELAY_COUNTERS]; } SROGetCounters; """ # Device position + padding + four counter with 4 byte each self.__ioctl_arg_format = "<BIIII" self.__ioctl_arg = struct.pack( self.__ioctl_arg_format, parentdevice._position, 0, 0, 0, 0, )
[docs] def get_switching_cycles(self): """ Get the number of switching cycles from this relay. If each relay output is represented as BOOL, this function returns a single integer value. If all relays are displayed as a BYTE, this function returns a tuple that contains the values of all relay outputs. The setting is determined by PiCtory and the selected output variant by the RO device. This function is only available locally on a Revolution Pi. This function cannot be used via RevPiNetIO. :return: Integer of switching cycles as single value or tuple of all """ # Using ioctl request K+29 = 19229 if self._parentdevice._modio._run_on_pi: # IOCTL to piControl on the RevPi with self._parentdevice._modio._myfh_lck: try: ioctl_return_value = ioctl( self._parentdevice._modio._myfh, 19229, self.__ioctl_arg, ) except Exception as e: # If not implemented, we return the max value and set an error ioctl_return_value = b"\xff" * struct.calcsize(self.__ioctl_arg_format) self._parentdevice._modio._gotioerror("rocounter", e) elif hasattr(self._parentdevice._modio._myfh, "ioctl"): # IOCTL over network """ The ioctl function over the network does not return a value. Only the successful execution of the ioctl call is checked and reported back. If a new function has been implemented in RevPiPyLoad, the subsequent source code can be activated. with self._parentdevice._modio._myfh_lck: try: ioctl_return_value = self._parentdevice._modio._myfh.ioctl( 19229, self.__ioctl_arg ) except Exception as e: self._parentdevice._modio._gotioerror("net_rocounter", e) """ raise RuntimeError("Can not be called over network via RevPiNetIO") else: # Simulate IOCTL on a regular file returns the value of relais index ioctl_return_value = self.__ioctl_arg if self._bitaddress == -1: # Return cycle values of all relais as tuple, if this is a BYTE output # Remove fist element, which is the ioctl request value return struct.unpack(self.__ioctl_arg_format, ioctl_return_value)[1:] else: # Return cycle value of just one relais as int, if this is a BOOL output # Increase bit-address by 1 to ignore first element, which is the ioctl request value return struct.unpack(self.__ioctl_arg_format, ioctl_return_value)[self._bitaddress + 1]
switching_cycles = property(get_switching_cycles)
class IntRelaisOutput(IntIO, RelaisOutput): """ Class for relais outputs to access the cycle counters. This class combines the function of <class 'IntIO'> and <class 'RelaisOutput'> to add the function 'get_cycles' and the property 'cycles' to retrieve the relay cycle counters. Since both classes inherit from BaseIO, both __init__ functions are called and the logic is combined. In this case, there is only one 'self' object of IOBase, which of both classes in inheritance is extended with this. :ref: :class:`IOBase` """ pass
[docs] class StructIO(IOBase): """ Class for accessing data via a defined struct. It provides the values in the desired formatting via struct. The struct format value is defined during instantiation. """ __slots__ = ( "__frm", "_parentio_address", "_parentio_defaultvalue", "_parentio_length", "_parentio_name", "_wordorder", )
[docs] def __init__(self, parentio, name: str, frm: str, **kwargs): """ Creates an IO with struct formatting. :param parentio: ParentIO object that will be replaced :param name: Name of the new IO :param frm: struct formatting (1 character) or 'NUMBERs' e.g. '8s' :param kwargs: Additional parameters: - bmk: Description for IO - bit: Registers IO as <class 'bool'> at specified bit in byte - byteorder: Byteorder for IO, default from replaced IO - wordorder: Wordorder is applied before byteorder - defaultvalue: Default value for IO, default from replaced IO """ # Check struct formatting regex = rematch("^([0-9]*s|[cbB?hHiIlLqQefd])$", frm) if regex is not None: # Check and take over byteorder byteorder = kwargs.get("byteorder", parentio._byteorder) if byteorder not in ("little", "big"): raise ValueError("byteorder must be 'little' or 'big'") bofrm = "<" if byteorder == "little" else ">" self._wordorder = kwargs.get("wordorder", None) # Remember parent name for export self._parentio_name = parentio._name if frm == "?": if self._wordorder: raise ValueError("you can not use wordorder for bit based ios") bitaddress = kwargs.get("bit", 0) max_bits = parentio._length * 8 if not (0 <= bitaddress < max_bits): raise ValueError( "bitaddress must be a value between 0 and {0}".format(max_bits - 1) ) bitlength = 1 # Bitwise replacement requires this information additionally if parentio._byteorder == byteorder: self._parentio_defaultvalue = parentio._defaultvalue else: self._parentio_defaultvalue = parentio._defaultvalue[::-1] self._parentio_address = parentio.address self._parentio_length = parentio._length else: byte_length = struct.calcsize(bofrm + frm) bitaddress = "" bitlength = byte_length * 8 self._parentio_address = None self._parentio_defaultvalue = None self._parentio_length = None if self._wordorder: if self._wordorder not in ("little", "big"): raise ValueError("wordorder must be 'little' or 'big'") if byte_length % 2 != 0: raise ValueError( "the byte length of new io must must be even to use wordorder" ) # [name,default,anzbits,adressbyte,export,adressid,bmk,bitaddress] valuelist = [ name, # May only be None for StructIO, is only calculated then kwargs.get("defaultvalue", None), bitlength, parentio._slc_address.start, False, str(parentio._slc_address.start).rjust(4, "0"), kwargs.get("bmk", ""), bitaddress, ] else: raise ValueError( "parameter frm has to be a single sign from [cbB?hHiIlLqQefd] " "or 'COUNTs' e.g. '8s'" ) # Instantiate base class super().__init__( parentio._parentdevice, valuelist, parentio._iotype, byteorder, frm == frm.lower() ) self.__frm = bofrm + frm if "export" in kwargs: # Use export property to remember given value for export self.export = kwargs["export"] else: # User could change parent IO settings before replace to force # export, so use parent settings for the new IO self._export = parentio._export # Check space for new IO if not ( self._slc_address.start >= parentio._parentdevice._dict_slc[parentio._iotype].start and self._slc_address.stop <= parentio._parentdevice._dict_slc[parentio._iotype].stop ): raise BufferError("registered value does not fit process image scope")
[docs] def __call__(self, value=None): """ Get or set the structured IO value using function call syntax. Handles byte and word order conversion based on configuration. :param value: If None, returns current value unpacked using struct format; otherwise packs and sets the value :return: Current IO value unpacked according to struct format when called without arguments """ if value is None: # Inline get_structdefaultvalue() if self._bitshift: return self.get_value() if self._wordorder == "little" and self._length > 2: return struct.unpack( self.__frm, self._swap_word_order(self.get_value()), )[0] return struct.unpack(self.__frm, self.get_value())[0] else: # Inline set_structvalue() if self._bitshift: self.set_value(value) elif self._wordorder == "little" and self._length > 2: self.set_value(self._swap_word_order(struct.pack(self.__frm, value))) else: self.set_value(struct.pack(self.__frm, value))
def _get_frm(self) -> str: """ Retrieves the struct formatting. :return: struct formatting """ return self.__frm[1:] def _get_signed(self) -> bool: """ Retrieves whether the value should be treated as signed. :return: True if signed """ return self._signed @staticmethod def _swap_word_order(bytes_to_swap) -> bytes: """ Swap word order of given bytes. :param bytes_to_swap: Already length checked bytes to swap words :return: Bytes with swapped word order """ array_length = len(bytes_to_swap) swap_array = bytearray(bytes_to_swap) for i in range(0, array_length // 2, 2): swap_array[-i - 2 : array_length - i], swap_array[i : i + 2] = ( swap_array[i : i + 2], swap_array[-i - 2 : array_length - i], ) return bytes(swap_array)
[docs] def get_structdefaultvalue(self): """ Returns the default value with struct formatting. :return: Default value of struct formatting type """ if self._bitshift: return self._defaultvalue if self._wordorder == "little" and self._length > 2: return struct.unpack( self.__frm, self._swap_word_order(self._defaultvalue), )[0] return struct.unpack(self.__frm, self._defaultvalue)[0]
[docs] def get_wordorder(self) -> str: """ Returns the wordorder for this IO. :return: "little", "big" or "ignored" """ return self._wordorder or "ignored"
[docs] def get_structvalue(self): """ Returns the value with struct formatting. :return: Value of struct formatting type """ if self._bitshift: return self.get_value() if self._wordorder == "little" and self._length > 2: return struct.unpack( self.__frm, self._swap_word_order(self.get_value()), )[0] return struct.unpack(self.__frm, self.get_value())[0]
[docs] def set_structvalue(self, value): """ Sets the value with struct formatting. :param value: Value of struct formatting type """ if self._bitshift: self.set_value(value) elif self._wordorder == "little" and self._length > 2: self.set_value(self._swap_word_order(struct.pack(self.__frm, value))) else: self.set_value(struct.pack(self.__frm, value))
defaultvalue = property(get_structdefaultvalue) frm = property(_get_frm) signed = property(_get_signed) value = property(get_structvalue, set_structvalue) wordorder = property(get_wordorder)
[docs] class MemIO(IOBase): """ Creates an IO for the memory values in piCtory. This type is only intended for read access and can return various data types via .value. This also provides access to strings that are assigned in piCtory. """
[docs] def get_variantvalue(self): """ Get the default value as either string or integer based on bit length. For values > 64 bits, returns as decoded string. Otherwise returns as integer. :return: Default value as string (if > 64 bits) or integer """ val = bytes(self._defaultvalue) if self._bitlength > 64: # STRING try: val = val.strip(b"\x00").decode() except Exception: pass return val else: # INT return int.from_bytes(val, self._byteorder, signed=self._signed)
defaultvalue = property(get_variantvalue) value = property(get_variantvalue)