Source code for revpimodio2.helper

# -*- coding: utf-8 -*-
"""RevPiModIO helper classes and tools."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv2"

import queue
import warnings
from math import ceil
from threading import Event, Lock, Thread
from time import sleep
from timeit import default_timer

from ._internal import RISING, FALLING, BOTH
from .io import IOBase


[docs] class EventCallback(Thread): """Thread for internal calling of event functions. The event function that this thread calls will receive the thread itself as a parameter. This must be considered when defining the function, e.g., "def event(th):". For extensive functions, this can be evaluated to prevent duplicate starts. The name of the IO object can be retrieved via EventCallback.ioname, which triggered the event. EventCallback.iovalue returns the value of the IO object at the time of triggering. The thread provides the EventCallback.exit event as an abort condition for the called function. By calling the EventCallback.stop() function, the exit event is set and can be used to abort loops. A wait function can also be implemented with the .exit() event: "th.exit.wait(0.5)" - waits 500ms or aborts immediately if .stop() is called on the thread. while not th.exit.is_set(): # Work with IOs th.exit.wait(0.5) """ __slots__ = "daemon", "exit", "func", "ioname", "iovalue"
[docs] def __init__(self, func, name: str, value): """ Init EventCallback class. :param func: Function that should be called at startup :param name: IO name :param value: IO value at the time of the event """ super().__init__() self.daemon = True self.exit = Event() self.func = func self.ioname = name self.iovalue = value
[docs] def run(self): """Calls the registered function.""" self.func(self)
[docs] def stop(self): """Sets the exit event that can be used to terminate the function.""" self.exit.set()
[docs] class Cycletools: """ Toolbox for cycle loop function. This class contains tools for cycle functions, such as clock flags and edge flags. Note that all edge flags have the value True on the first cycle! The Cycletools.first flag can be used to determine if it is the first cycle. Clock flags flag1c, flag5c, flag10c, etc. have the numerically specified value for the specified number of cycles, alternating between False and True. Example: flag5c has the value False for 5 cycles and True for the next 5 cycles. Edge flags flank5c, flank10c, etc. always have the value True for one cycle at the numerically specified cycle, otherwise False. Example: flank5c always has the value True every 5 cycles. These flags can be used, for example, to make lamps connected to outputs blink synchronously. """ __slots__ = ( "__cycle", "__cycletime", "__ucycle", "__dict_ton", "__dict_tof", "__dict_tp", "__dict_change", "_start_timer", "core", "device", "first", "io", "last", "var", "flag1c", "flag5c", "flag10c", "flag15c", "flag20c", "flank5c", "flank10c", "flank15c", "flank20c", )
[docs] def __init__(self, cycletime, revpi_object): """Init Cycletools class.""" self.__cycle = 0 self.__cycletime = cycletime self.__ucycle = 0 self.__dict_change = {} self.__dict_ton = {} self.__dict_tof = {} self.__dict_tp = {} self._start_timer = 0.0 # Access to core and io self.core = revpi_object.core self.device = revpi_object.device self.io = revpi_object.io # Clock flags self.first = True self.flag1c = False self.flag5c = False self.flag10c = False self.flag15c = False self.flag20c = False self.last = False # Edge flags self.flank5c = True self.flank10c = True self.flank15c = True self.flank20c = True # User data class Var: """Add remanent variables here.""" pass self.var = Var()
def _docycle(self) -> None: """Cycle operations.""" # Turn-off delay for tof in self.__dict_tof: if self.__dict_tof[tof] > 0: self.__dict_tof[tof] -= 1 # Turn-on delay for ton in self.__dict_ton: if self.__dict_ton[ton][1]: if self.__dict_ton[ton][0] > 0: self.__dict_ton[ton][0] -= 1 self.__dict_ton[ton][1] = False else: self.__dict_ton[ton][0] = -1 # Pulse for tp in self.__dict_tp: if self.__dict_tp[tp][1]: if self.__dict_tp[tp][0] > 0: self.__dict_tp[tp][0] -= 1 else: self.__dict_tp[tp][1] = False else: self.__dict_tp[tp][0] = -1 # Edge flags self.flank5c = False self.flank10c = False self.flank15c = False self.flank20c = False # Logische Flags self.first = False self.flag1c = not self.flag1c # Calculated flags self.__cycle += 1 if self.__cycle == 5: self.__ucycle += 1 if self.__ucycle == 3: self.flank15c = True self.flag15c = not self.flag15c self.__ucycle = 0 if self.flag5c: if self.flag10c: self.flank20c = True self.flag20c = not self.flag20c self.flank10c = True self.flag10c = not self.flag10c self.flank5c = True self.flag5c = not self.flag5c self.__cycle = 0 # Process changed values for io in self.__dict_change: self.__dict_change[io] = io.get_value()
[docs] def changed(self, io: IOBase, edge=BOTH): """ Check change of IO value from last to this cycle. It will always be False on the first use of this function with an IO object. :param io: IO to check for changes to last cycle :param edge: Check for rising or falling on bit io objects :return: True, if IO value changed """ if io in self.__dict_change: if edge == BOTH: return self.__dict_change[io] != io.get_value() else: value = io.get_value() return self.__dict_change[io] != value and ( value and edge == RISING or not value and edge == FALLING ) else: if not isinstance(io, IOBase): raise TypeError("parameter 'io' must be an io object") if not (edge == BOTH or type(io.value) == bool): raise ValueError("parameter 'edge' can be used with bit io objects only") self.__dict_change[io] = None return False
[docs] def get_tof(self, name: str) -> bool: """ Value of the off-delay. :param name: Unique name of the timer :return: Value <class 'bool'> of the off-delay """ return self.__dict_tof.get(name, 0) > 0
[docs] def get_tofc(self, name: str) -> bool: """ Value of the off-delay. :param name: Unique name of the timer :return: Value <class 'bool'> of the off-delay """ return self.__dict_tof.get(name, 0) > 0
[docs] def set_tof(self, name: str, milliseconds: int) -> None: """ Starts an off-delay timer when called. :param name: Unique name for accessing the timer :param milliseconds: Delay in milliseconds """ self.__dict_tof[name] = ceil(milliseconds / self.__cycletime)
[docs] def set_tofc(self, name: str, cycles: int) -> None: """ Starts an off-delay timer when called. :param name: Unique name for accessing the timer :param cycles: Number of cycles for the delay if not restarted """ self.__dict_tof[name] = cycles
[docs] def get_ton(self, name: str) -> bool: """ On-delay. :param name: Unique name of the timer :return: Value <class 'bool'> of the on-delay """ return self.__dict_ton.get(name, [-1])[0] == 0
[docs] def get_tonc(self, name: str) -> bool: """ On-delay. :param name: Unique name of the timer :return: Value <class 'bool'> of the on-delay """ return self.__dict_ton.get(name, [-1])[0] == 0
[docs] def set_ton(self, name: str, milliseconds: int) -> None: """ Starts an on-delay timer. :param name: Unique name for accessing the timer :param milliseconds: Milliseconds for the delay if restarted """ if self.__dict_ton.get(name, [-1])[0] == -1: self.__dict_ton[name] = [ceil(milliseconds / self.__cycletime), True] else: self.__dict_ton[name][1] = True
[docs] def set_tonc(self, name: str, cycles: int) -> None: """ Starts an on-delay timer. :param name: Unique name for accessing the timer :param cycles: Number of cycles for the delay if restarted """ if self.__dict_ton.get(name, [-1])[0] == -1: self.__dict_ton[name] = [cycles, True] else: self.__dict_ton[name][1] = True
[docs] def get_tp(self, name: str) -> bool: """ Pulse timer. :param name: Unique name of the timer :return: Value <class 'bool'> of the pulse """ return self.__dict_tp.get(name, [-1])[0] > 0
[docs] def get_tpc(self, name: str) -> bool: """ Pulse timer. :param name: Unique name of the timer :return: Value <class 'bool'> of the pulse """ return self.__dict_tp.get(name, [-1])[0] > 0
[docs] def set_tp(self, name: str, milliseconds: int) -> None: """ Starts a pulse timer. :param name: Unique name for accessing the timer :param milliseconds: Milliseconds the pulse should be active """ if self.__dict_tp.get(name, [-1])[0] == -1: self.__dict_tp[name] = [ceil(milliseconds / self.__cycletime), True] else: self.__dict_tp[name][1] = True
[docs] def set_tpc(self, name: str, cycles: int) -> None: """ Starts a pulse timer. :param name: Unique name for accessing the timer :param cycles: Number of cycles the pulse should be active """ if self.__dict_tp.get(name, [-1])[0] == -1: self.__dict_tp[name] = [cycles, True] else: self.__dict_tp[name][1] = True
@property def runtime(self) -> float: """ Runtime im milliseconds of cycle function till now. This property will return the actual runtime of the function. So on the beginning of your function it will be about 0 and will rise during the runtime to the max in the last line of your function. """ return (default_timer() - self._start_timer) * 1000
[docs] class ProcimgWriter(Thread): """ Class for synchronization thread. This class is started as a thread if the process image should be synchronized cyclically. This function is mainly used for event handling. """ __slots__ = ( "__dict_delay", "__eventth", "_eventqth", "__eventwork", "_eventq", "_modio", "_refresh", "_work", "daemon", "lck_refresh", "newdata", )
[docs] def __init__(self, parentmodio): """Init ProcimgWriter class.""" super().__init__() self.__dict_delay = {} self.__eventth = Thread(target=self.__exec_th) self._eventqth = queue.Queue() self.__eventwork = False self._eventq = queue.Queue() self._modio = parentmodio self._refresh = 0.05 self._work = Event() self.daemon = True self.lck_refresh = Lock() self.newdata = Event()
def __check_change(self, dev) -> None: """Finds changes for event monitoring.""" for io_event in dev._dict_events: if dev._ba_datacp[io_event._slc_address] == dev._ba_devdata[io_event._slc_address]: continue if io_event._bitshift: boolcp = dev._ba_datacp[io_event._slc_address.start] & io_event._bitshift boolor = dev._ba_devdata[io_event._slc_address.start] & io_event._bitshift if boolor == boolcp: continue for regfunc in dev._dict_events[io_event]: if ( regfunc.edge == BOTH or regfunc.edge == RISING and boolor or regfunc.edge == FALLING and not boolor ): if regfunc.delay == 0: if regfunc.as_thread: self._eventqth.put((regfunc, io_event._name, io_event.value), False) else: self._eventq.put((regfunc, io_event._name, io_event.value), False) else: # Insert delayed event into dict tup_fire = ( regfunc, io_event._name, io_event.value, io_event, ) if regfunc.overwrite or tup_fire not in self.__dict_delay: self.__dict_delay[tup_fire] = ceil( regfunc.delay / 1000 / self._refresh ) else: for regfunc in dev._dict_events[io_event]: if regfunc.delay == 0: if regfunc.as_thread: self._eventqth.put((regfunc, io_event._name, io_event.value), False) else: self._eventq.put((regfunc, io_event._name, io_event.value), False) else: # Insert delayed event into dict tup_fire = ( regfunc, io_event._name, io_event.value, io_event, ) if regfunc.overwrite or tup_fire not in self.__dict_delay: self.__dict_delay[tup_fire] = ceil(regfunc.delay / 1000 / self._refresh) # Copy the bytes after processing all IOs (lock is still active) dev._ba_datacp = dev._ba_devdata[:] def __exec_th(self) -> None: """Runs as thread that starts events as threads.""" while self.__eventwork: try: tup_fireth = self._eventqth.get(timeout=1) th = EventCallback(tup_fireth[0].func, tup_fireth[1], tup_fireth[2]) th.start() self._eventqth.task_done() except queue.Empty: pass def _collect_events(self, value: bool) -> bool: """ Enables or disables event monitoring. :param value: True activates / False deactivates :return: True, if request was successful """ if type(value) != bool: raise TypeError("value must be <class 'bool'>") # Only start if system is running if not self.is_alive(): self.__eventwork = False return False if self.__eventwork != value: with self.lck_refresh: self.__eventwork = value if not value: # Only empty when deactivating self._eventqth = queue.Queue() self._eventq = queue.Queue() self.__dict_delay = {} # Thread management if value and not self.__eventth.is_alive(): self.__eventth = Thread(target=self.__exec_th) self.__eventth.daemon = True self.__eventth.start() return True
[docs] def get_refresh(self) -> int: """ Returns cycle time. :return: <class 'int'> cycle time in milliseconds """ return int(self._refresh * 1000)
[docs] def run(self): """Starts automatic process image synchronization.""" fh = self._modio._create_myfh() mrk_delay = self._refresh mrk_warn = True bytesbuff = bytearray(self._modio._length) while not self._work.is_set(): ot = default_timer() # At this point, we slept and have the rest of delay from last cycle if not self.lck_refresh.acquire(timeout=mrk_delay): warnings.warn( "cycle time of {0} ms exceeded in your cycle function" "".format(int(self._refresh * 1000)), RuntimeWarning, ) mrk_delay = self._refresh # Only reachable through cycleloop - no delayed events continue try: fh.seek(0) fh.readinto(bytesbuff) for dev in self._modio._lst_refresh: with dev._filelock: if dev._shared_procimg: # Set modified outputs one by one for io in dev._shared_write: if not io._write_to_procimg(): raise IOError("error on _write_to_procimg") dev._shared_write.clear() # Read all device bytes, because it is shared fh.seek(dev.offset) bytesbuff[dev._slc_devoff] = fh.read(len(dev._ba_devdata)) if self._modio._monitoring or dev._shared_procimg: # Inputs and outputs in buffer dev._ba_devdata[:] = bytesbuff[dev._slc_devoff] if ( self.__eventwork and len(dev._dict_events) > 0 and dev._ba_datacp != dev._ba_devdata ): self.__check_change(dev) else: # Inputs in buffer, outputs in process image dev._ba_devdata[dev._slc_inp] = bytesbuff[dev._slc_inpoff] if ( self.__eventwork and len(dev._dict_events) > 0 and dev._ba_datacp != dev._ba_devdata ): self.__check_change(dev) fh.seek(dev._slc_outoff.start) fh.write(dev._ba_devdata[dev._slc_out]) if self._modio._buffedwrite: fh.flush() except IOError as e: self._modio._gotioerror("autorefresh", e, mrk_warn) mrk_warn = self._modio._debug == -1 self.lck_refresh.release() continue else: if not mrk_warn: if self._modio._debug == 0: warnings.warn("recover from io errors on process image", RuntimeWarning) else: warnings.warn( "recover from io errors on process image - total " "count of {0} errors now" "".format(self._modio._ioerror), RuntimeWarning, ) mrk_warn = True # Wake all self.lck_refresh.release() self.newdata.set() finally: # Check delayed events if self.__eventwork: for tup_fire in tuple(self.__dict_delay.keys()): if tup_fire[0].overwrite and tup_fire[3].value != tup_fire[2]: del self.__dict_delay[tup_fire] else: self.__dict_delay[tup_fire] -= 1 if self.__dict_delay[tup_fire] <= 0: # Put event to queue and delete delayed event if tup_fire[0].as_thread: self._eventqth.put(tup_fire, False) else: self._eventq.put(tup_fire, False) del self.__dict_delay[tup_fire] mrk_delay = default_timer() % self._refresh # Second default_timer call include calculation time from above if default_timer() - ot > self._refresh: warnings.warn( "io refresh time of {0} ms exceeded!".format(int(self._refresh * 1000)), RuntimeWarning, ) mrk_delay = 0.0 else: # Sleep and not .wait (.wait uses system clock) sleep(self._refresh - mrk_delay) # Wake all again at the end self._collect_events(False) self.newdata.set() fh.close()
[docs] def stop(self): """Terminates automatic process image synchronization.""" self._work.set()
[docs] def set_refresh(self, value): """Sets the cycle time in milliseconds. @param value <class 'int'> Milliseconds""" if type(value) == int and 5 <= value <= 2000: self._refresh = value / 1000 else: raise ValueError("refresh time must be 5 to 2000 milliseconds")
refresh = property(get_refresh, set_refresh)