Source code for revpimodio2.helper
# -*- coding: utf-8 -*-
"""RevPiModIO helper classes and tools."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv2"
import queue
import warnings
from math import ceil
from threading import Event, Lock, Thread
from time import sleep
from timeit import default_timer
from ._internal import RISING, FALLING, BOTH
from .io import IOBase
[docs]
class EventCallback(Thread):
"""Thread for internal calling of event functions.
The event function that this thread calls will receive the thread itself
as a parameter. This must be considered when defining the function, e.g.,
"def event(th):". For extensive functions, this can be evaluated to
prevent duplicate starts.
The name of the IO object can be retrieved via EventCallback.ioname,
which triggered the event. EventCallback.iovalue returns the value of
the IO object at the time of triggering.
The thread provides the EventCallback.exit event as an abort condition
for the called function.
By calling the EventCallback.stop() function, the exit event is set
and can be used to abort loops.
A wait function can also be implemented with the .exit() event:
"th.exit.wait(0.5)" - waits 500ms or aborts immediately if .stop()
is called on the thread.
while not th.exit.is_set():
# Work with IOs
th.exit.wait(0.5)
"""
__slots__ = "daemon", "exit", "func", "ioname", "iovalue"
[docs]
def __init__(self, func, name: str, value):
"""
Init EventCallback class.
:param func: Function that should be called at startup
:param name: IO name
:param value: IO value at the time of the event
"""
super().__init__()
self.daemon = True
self.exit = Event()
self.func = func
self.ioname = name
self.iovalue = value
[docs]
def stop(self):
"""Sets the exit event that can be used to terminate the function."""
self.exit.set()
[docs]
class Cycletools:
"""
Toolbox for cycle loop function.
This class contains tools for cycle functions, such as clock flags and edge flags.
Note that all edge flags have the value True on the first cycle! The Cycletools.first
flag can be used to determine if it is the first cycle.
Clock flags flag1c, flag5c, flag10c, etc. have the numerically specified
value for the specified number of cycles, alternating between False and True.
Example: flag5c has the value False for 5 cycles and True for the next 5 cycles.
Edge flags flank5c, flank10c, etc. always have the value True for one cycle
at the numerically specified cycle, otherwise False.
Example: flank5c always has the value True every 5 cycles.
These flags can be used, for example, to make lamps connected to outputs blink synchronously.
"""
__slots__ = (
"__cycle",
"__cycletime",
"__ucycle",
"__dict_ton",
"__dict_tof",
"__dict_tp",
"__dict_change",
"_start_timer",
"core",
"device",
"first",
"io",
"last",
"var",
"flag1c",
"flag5c",
"flag10c",
"flag15c",
"flag20c",
"flank5c",
"flank10c",
"flank15c",
"flank20c",
)
[docs]
def __init__(self, cycletime, revpi_object):
"""Init Cycletools class."""
self.__cycle = 0
self.__cycletime = cycletime
self.__ucycle = 0
self.__dict_change = {}
self.__dict_ton = {}
self.__dict_tof = {}
self.__dict_tp = {}
self._start_timer = 0.0
# Access to core and io
self.core = revpi_object.core
self.device = revpi_object.device
self.io = revpi_object.io
# Clock flags
self.first = True
self.flag1c = False
self.flag5c = False
self.flag10c = False
self.flag15c = False
self.flag20c = False
self.last = False
# Edge flags
self.flank5c = True
self.flank10c = True
self.flank15c = True
self.flank20c = True
# User data
class Var:
"""Add remanent variables here."""
pass
self.var = Var()
def _docycle(self) -> None:
"""Cycle operations."""
# Turn-off delay
for tof in self.__dict_tof:
if self.__dict_tof[tof] > 0:
self.__dict_tof[tof] -= 1
# Turn-on delay
for ton in self.__dict_ton:
if self.__dict_ton[ton][1]:
if self.__dict_ton[ton][0] > 0:
self.__dict_ton[ton][0] -= 1
self.__dict_ton[ton][1] = False
else:
self.__dict_ton[ton][0] = -1
# Pulse
for tp in self.__dict_tp:
if self.__dict_tp[tp][1]:
if self.__dict_tp[tp][0] > 0:
self.__dict_tp[tp][0] -= 1
else:
self.__dict_tp[tp][1] = False
else:
self.__dict_tp[tp][0] = -1
# Edge flags
self.flank5c = False
self.flank10c = False
self.flank15c = False
self.flank20c = False
# Logische Flags
self.first = False
self.flag1c = not self.flag1c
# Calculated flags
self.__cycle += 1
if self.__cycle == 5:
self.__ucycle += 1
if self.__ucycle == 3:
self.flank15c = True
self.flag15c = not self.flag15c
self.__ucycle = 0
if self.flag5c:
if self.flag10c:
self.flank20c = True
self.flag20c = not self.flag20c
self.flank10c = True
self.flag10c = not self.flag10c
self.flank5c = True
self.flag5c = not self.flag5c
self.__cycle = 0
# Process changed values
for io in self.__dict_change:
self.__dict_change[io] = io.get_value()
[docs]
def changed(self, io: IOBase, edge=BOTH):
"""
Check change of IO value from last to this cycle.
It will always be False on the first use of this function with an
IO object.
:param io: IO to check for changes to last cycle
:param edge: Check for rising or falling on bit io objects
:return: True, if IO value changed
"""
if io in self.__dict_change:
if edge == BOTH:
return self.__dict_change[io] != io.get_value()
else:
value = io.get_value()
return self.__dict_change[io] != value and (
value and edge == RISING or not value and edge == FALLING
)
else:
if not isinstance(io, IOBase):
raise TypeError("parameter 'io' must be an io object")
if not (edge == BOTH or type(io.value) == bool):
raise ValueError("parameter 'edge' can be used with bit io objects only")
self.__dict_change[io] = None
return False
[docs]
def get_tof(self, name: str) -> bool:
"""
Value of the off-delay.
:param name: Unique name of the timer
:return: Value <class 'bool'> of the off-delay
"""
return self.__dict_tof.get(name, 0) > 0
[docs]
def get_tofc(self, name: str) -> bool:
"""
Value of the off-delay.
:param name: Unique name of the timer
:return: Value <class 'bool'> of the off-delay
"""
return self.__dict_tof.get(name, 0) > 0
[docs]
def set_tof(self, name: str, milliseconds: int) -> None:
"""
Starts an off-delay timer when called.
:param name: Unique name for accessing the timer
:param milliseconds: Delay in milliseconds
"""
self.__dict_tof[name] = ceil(milliseconds / self.__cycletime)
[docs]
def set_tofc(self, name: str, cycles: int) -> None:
"""
Starts an off-delay timer when called.
:param name: Unique name for accessing the timer
:param cycles: Number of cycles for the delay if not restarted
"""
self.__dict_tof[name] = cycles
[docs]
def get_ton(self, name: str) -> bool:
"""
On-delay.
:param name: Unique name of the timer
:return: Value <class 'bool'> of the on-delay
"""
return self.__dict_ton.get(name, [-1])[0] == 0
[docs]
def get_tonc(self, name: str) -> bool:
"""
On-delay.
:param name: Unique name of the timer
:return: Value <class 'bool'> of the on-delay
"""
return self.__dict_ton.get(name, [-1])[0] == 0
[docs]
def set_ton(self, name: str, milliseconds: int) -> None:
"""
Starts an on-delay timer.
:param name: Unique name for accessing the timer
:param milliseconds: Milliseconds for the delay if restarted
"""
if self.__dict_ton.get(name, [-1])[0] == -1:
self.__dict_ton[name] = [ceil(milliseconds / self.__cycletime), True]
else:
self.__dict_ton[name][1] = True
[docs]
def set_tonc(self, name: str, cycles: int) -> None:
"""
Starts an on-delay timer.
:param name: Unique name for accessing the timer
:param cycles: Number of cycles for the delay if restarted
"""
if self.__dict_ton.get(name, [-1])[0] == -1:
self.__dict_ton[name] = [cycles, True]
else:
self.__dict_ton[name][1] = True
[docs]
def get_tp(self, name: str) -> bool:
"""
Pulse timer.
:param name: Unique name of the timer
:return: Value <class 'bool'> of the pulse
"""
return self.__dict_tp.get(name, [-1])[0] > 0
[docs]
def get_tpc(self, name: str) -> bool:
"""
Pulse timer.
:param name: Unique name of the timer
:return: Value <class 'bool'> of the pulse
"""
return self.__dict_tp.get(name, [-1])[0] > 0
[docs]
def set_tp(self, name: str, milliseconds: int) -> None:
"""
Starts a pulse timer.
:param name: Unique name for accessing the timer
:param milliseconds: Milliseconds the pulse should be active
"""
if self.__dict_tp.get(name, [-1])[0] == -1:
self.__dict_tp[name] = [ceil(milliseconds / self.__cycletime), True]
else:
self.__dict_tp[name][1] = True
[docs]
def set_tpc(self, name: str, cycles: int) -> None:
"""
Starts a pulse timer.
:param name: Unique name for accessing the timer
:param cycles: Number of cycles the pulse should be active
"""
if self.__dict_tp.get(name, [-1])[0] == -1:
self.__dict_tp[name] = [cycles, True]
else:
self.__dict_tp[name][1] = True
@property
def runtime(self) -> float:
"""
Runtime im milliseconds of cycle function till now.
This property will return the actual runtime of the function. So on the
beginning of your function it will be about 0 and will rise during
the runtime to the max in the last line of your function.
"""
return (default_timer() - self._start_timer) * 1000
[docs]
class ProcimgWriter(Thread):
"""
Class for synchronization thread.
This class is started as a thread if the process image should be
synchronized cyclically. This function is mainly used for event
handling.
"""
__slots__ = (
"__dict_delay",
"__eventth",
"_eventqth",
"__eventwork",
"_eventq",
"_modio",
"_refresh",
"_work",
"daemon",
"lck_refresh",
"newdata",
)
[docs]
def __init__(self, parentmodio):
"""Init ProcimgWriter class."""
super().__init__()
self.__dict_delay = {}
self.__eventth = Thread(target=self.__exec_th)
self._eventqth = queue.Queue()
self.__eventwork = False
self._eventq = queue.Queue()
self._modio = parentmodio
self._refresh = 0.05
self._work = Event()
self.daemon = True
self.lck_refresh = Lock()
self.newdata = Event()
def __check_change(self, dev) -> None:
"""Finds changes for event monitoring."""
for io_event in dev._dict_events:
if dev._ba_datacp[io_event._slc_address] == dev._ba_devdata[io_event._slc_address]:
continue
if io_event._bitshift:
boolcp = dev._ba_datacp[io_event._slc_address.start] & io_event._bitshift
boolor = dev._ba_devdata[io_event._slc_address.start] & io_event._bitshift
if boolor == boolcp:
continue
for regfunc in dev._dict_events[io_event]:
if (
regfunc.edge == BOTH
or regfunc.edge == RISING
and boolor
or regfunc.edge == FALLING
and not boolor
):
if regfunc.delay == 0:
if regfunc.as_thread:
self._eventqth.put((regfunc, io_event._name, io_event.value), False)
else:
self._eventq.put((regfunc, io_event._name, io_event.value), False)
else:
# Insert delayed event into dict
tup_fire = (
regfunc,
io_event._name,
io_event.value,
io_event,
)
if regfunc.overwrite or tup_fire not in self.__dict_delay:
self.__dict_delay[tup_fire] = ceil(
regfunc.delay / 1000 / self._refresh
)
else:
for regfunc in dev._dict_events[io_event]:
if regfunc.delay == 0:
if regfunc.as_thread:
self._eventqth.put((regfunc, io_event._name, io_event.value), False)
else:
self._eventq.put((regfunc, io_event._name, io_event.value), False)
else:
# Insert delayed event into dict
tup_fire = (
regfunc,
io_event._name,
io_event.value,
io_event,
)
if regfunc.overwrite or tup_fire not in self.__dict_delay:
self.__dict_delay[tup_fire] = ceil(regfunc.delay / 1000 / self._refresh)
# Copy the bytes after processing all IOs (lock is still active)
dev._ba_datacp = dev._ba_devdata[:]
def __exec_th(self) -> None:
"""Runs as thread that starts events as threads."""
while self.__eventwork:
try:
tup_fireth = self._eventqth.get(timeout=1)
th = EventCallback(tup_fireth[0].func, tup_fireth[1], tup_fireth[2])
th.start()
self._eventqth.task_done()
except queue.Empty:
pass
def _collect_events(self, value: bool) -> bool:
"""
Enables or disables event monitoring.
:param value: True activates / False deactivates
:return: True, if request was successful
"""
if type(value) != bool:
raise TypeError("value must be <class 'bool'>")
# Only start if system is running
if not self.is_alive():
self.__eventwork = False
return False
if self.__eventwork != value:
with self.lck_refresh:
self.__eventwork = value
if not value:
# Only empty when deactivating
self._eventqth = queue.Queue()
self._eventq = queue.Queue()
self.__dict_delay = {}
# Thread management
if value and not self.__eventth.is_alive():
self.__eventth = Thread(target=self.__exec_th)
self.__eventth.daemon = True
self.__eventth.start()
return True
[docs]
def get_refresh(self) -> int:
"""
Returns cycle time.
:return: <class 'int'> cycle time in milliseconds
"""
return int(self._refresh * 1000)
[docs]
def run(self):
"""Starts automatic process image synchronization."""
fh = self._modio._create_myfh()
mrk_delay = self._refresh
mrk_warn = True
bytesbuff = bytearray(self._modio._length)
while not self._work.is_set():
ot = default_timer()
# At this point, we slept and have the rest of delay from last cycle
if not self.lck_refresh.acquire(timeout=mrk_delay):
warnings.warn(
"cycle time of {0} ms exceeded in your cycle function"
"".format(int(self._refresh * 1000)),
RuntimeWarning,
)
mrk_delay = self._refresh
# Only reachable through cycleloop - no delayed events
continue
try:
fh.seek(0)
fh.readinto(bytesbuff)
for dev in self._modio._lst_refresh:
with dev._filelock:
if dev._shared_procimg:
# Set modified outputs one by one
for io in dev._shared_write:
if not io._write_to_procimg():
raise IOError("error on _write_to_procimg")
dev._shared_write.clear()
# Read all device bytes, because it is shared
fh.seek(dev.offset)
bytesbuff[dev._slc_devoff] = fh.read(len(dev._ba_devdata))
if self._modio._monitoring or dev._shared_procimg:
# Inputs and outputs in buffer
dev._ba_devdata[:] = bytesbuff[dev._slc_devoff]
if (
self.__eventwork
and len(dev._dict_events) > 0
and dev._ba_datacp != dev._ba_devdata
):
self.__check_change(dev)
else:
# Inputs in buffer, outputs in process image
dev._ba_devdata[dev._slc_inp] = bytesbuff[dev._slc_inpoff]
if (
self.__eventwork
and len(dev._dict_events) > 0
and dev._ba_datacp != dev._ba_devdata
):
self.__check_change(dev)
fh.seek(dev._slc_outoff.start)
fh.write(dev._ba_devdata[dev._slc_out])
if self._modio._buffedwrite:
fh.flush()
except IOError as e:
self._modio._gotioerror("autorefresh", e, mrk_warn)
mrk_warn = self._modio._debug == -1
self.lck_refresh.release()
continue
else:
if not mrk_warn:
if self._modio._debug == 0:
warnings.warn("recover from io errors on process image", RuntimeWarning)
else:
warnings.warn(
"recover from io errors on process image - total "
"count of {0} errors now"
"".format(self._modio._ioerror),
RuntimeWarning,
)
mrk_warn = True
# Wake all
self.lck_refresh.release()
self.newdata.set()
finally:
# Check delayed events
if self.__eventwork:
for tup_fire in tuple(self.__dict_delay.keys()):
if tup_fire[0].overwrite and tup_fire[3].value != tup_fire[2]:
del self.__dict_delay[tup_fire]
else:
self.__dict_delay[tup_fire] -= 1
if self.__dict_delay[tup_fire] <= 0:
# Put event to queue and delete delayed event
if tup_fire[0].as_thread:
self._eventqth.put(tup_fire, False)
else:
self._eventq.put(tup_fire, False)
del self.__dict_delay[tup_fire]
mrk_delay = default_timer() % self._refresh
# Second default_timer call include calculation time from above
if default_timer() - ot > self._refresh:
warnings.warn(
"io refresh time of {0} ms exceeded!".format(int(self._refresh * 1000)),
RuntimeWarning,
)
mrk_delay = 0.0
else:
# Sleep and not .wait (.wait uses system clock)
sleep(self._refresh - mrk_delay)
# Wake all again at the end
self._collect_events(False)
self.newdata.set()
fh.close()
[docs]
def set_refresh(self, value):
"""Sets the cycle time in milliseconds.
@param value <class 'int'> Milliseconds"""
if type(value) == int and 5 <= value <= 2000:
self._refresh = value / 1000
else:
raise ValueError("refresh time must be 5 to 2000 milliseconds")
refresh = property(get_refresh, set_refresh)