Helper Classes

Helper classes for cyclic and event-driven programming.

Cycletools

class revpimodio2.helper.Cycletools(cycletime, revpi_object)[source]

Bases: object

Toolbox for cycle loop function.

This class contains tools for cycle functions, such as clock flags and edge flags. Note that all edge flags have the value True on the first cycle! The Cycletools.first flag can be used to determine if it is the first cycle.

Clock flags flag1c, flag5c, flag10c, etc. have the numerically specified value for the specified number of cycles, alternating between False and True.

Example: flag5c has the value False for 5 cycles and True for the next 5 cycles.

Edge flags flank5c, flank10c, etc. always have the value True for one cycle at the numerically specified cycle, otherwise False.

Example: flank5c always has the value True every 5 cycles.

These flags can be used, for example, to make lamps connected to outputs blink synchronously.

Toolkit provided to cyclic functions via .cycleloop().

This class provides tools for cyclic functions including timing flags and edge markers. Note that edge markers (flank flags) are all True during the first cycle!

Attributes:

Reference to RevPiModIO core object

Reference to RevPiModIO device object

Reference to RevPiModIO io object

True only on first cycle

True when shutdown signal received

Current function execution time in seconds

Container for cycle-persistent variables

Toggle Flags - Alternate between True/False:

1 cycle True, 1 cycle False

2 cycles True, 2 cycles False

5 cycles True, 5 cycles False

10 cycles True, 10 cycles False

20 cycles True, 20 cycles False

Flank Flags - True every nth cycle:

True every 5 cycles

True every 10 cycles

True every 15 cycles

True every 20 cycles

Example:

def main(ct: revpimodio2.Cycletools):
    if ct.first:
        # Initialize
        ct.var.counter = 0

    # Main logic
    if ct.changed(ct.io.sensor):
        ct.var.counter += 1

    # Blink LED using timing flag
    ct.io.led.value = ct.flag5c

    if ct.last:
        # Cleanup
        print(f"Final: {ct.var.counter}")
__init__(cycletime, revpi_object)[source]

Init Cycletools class.

core
device
io
first
flag1c
flag5c
flag10c
flag15c
flag20c
last
flank5c
flank10c
flank15c
flank20c
var
changed(io: IOBase, edge=33)[source]

Check change of IO value from last to this cycle.

It will always be False on the first use of this function with an IO object.

Parameters:
  • io – IO to check for changes to last cycle

  • edge – Check for rising or falling on bit io objects

Returns:

True, if IO value changed

get_tof(name: str) bool[source]

Value of the off-delay.

Parameters:

name – Unique name of the timer

Returns:

Value <class ‘bool’> of the off-delay

get_tofc(name: str) bool[source]

Value of the off-delay.

Parameters:

name – Unique name of the timer

Returns:

Value <class ‘bool’> of the off-delay

set_tof(name: str, milliseconds: int) None[source]

Starts an off-delay timer when called.

Parameters:
  • name – Unique name for accessing the timer

  • milliseconds – Delay in milliseconds

set_tofc(name: str, cycles: int) None[source]

Starts an off-delay timer when called.

Parameters:
  • name – Unique name for accessing the timer

  • cycles – Number of cycles for the delay if not restarted

get_ton(name: str) bool[source]

On-delay.

Parameters:

name – Unique name of the timer

Returns:

Value <class ‘bool’> of the on-delay

get_tonc(name: str) bool[source]

On-delay.

Parameters:

name – Unique name of the timer

Returns:

Value <class ‘bool’> of the on-delay

set_ton(name: str, milliseconds: int) None[source]

Starts an on-delay timer.

Parameters:
  • name – Unique name for accessing the timer

  • milliseconds – Milliseconds for the delay if restarted

set_tonc(name: str, cycles: int) None[source]

Starts an on-delay timer.

Parameters:
  • name – Unique name for accessing the timer

  • cycles – Number of cycles for the delay if restarted

get_tp(name: str) bool[source]

Pulse timer.

Parameters:

name – Unique name of the timer

Returns:

Value <class ‘bool’> of the pulse

get_tpc(name: str) bool[source]

Pulse timer.

Parameters:

name – Unique name of the timer

Returns:

Value <class ‘bool’> of the pulse

set_tp(name: str, milliseconds: int) None[source]

Starts a pulse timer.

Parameters:
  • name – Unique name for accessing the timer

  • milliseconds – Milliseconds the pulse should be active

set_tpc(name: str, cycles: int) None[source]

Starts a pulse timer.

Parameters:
  • name – Unique name for accessing the timer

  • cycles – Number of cycles the pulse should be active

property runtime: float

Runtime im milliseconds of cycle function till now.

This property will return the actual runtime of the function. So on the beginning of your function it will be about 0 and will rise during the runtime to the max in the last line of your function.

Change Detection

Timer Functions

On-Delay Timers

Off-Delay Timers

Pulse Timers

EventCallback

class revpimodio2.helper.EventCallback(func, name: str, value)[source]

Bases: Thread

Thread for internal calling of event functions.

The event function that this thread calls will receive the thread itself as a parameter. This must be considered when defining the function, e.g., “def event(th):”. For extensive functions, this can be evaluated to prevent duplicate starts. The name of the IO object can be retrieved via EventCallback.ioname, which triggered the event. EventCallback.iovalue returns the value of the IO object at the time of triggering. The thread provides the EventCallback.exit event as an abort condition for the called function. By calling the EventCallback.stop() function, the exit event is set and can be used to abort loops. A wait function can also be implemented with the .exit() event: “th.exit.wait(0.5)” - waits 500ms or aborts immediately if .stop() is called on the thread.

while not th.exit.is_set():

# Work with IOs th.exit.wait(0.5)

Thread for internal event function calls.

This class is passed to threaded event handlers registered with as_thread=True. The event function receives this thread object as a parameter to access event information and control execution.

Attributes:

Name of IO that triggered the event

Value of IO when event was triggered

Threading event for abort conditions

Example:

def threaded_handler(eventcallback: revpimodio2.EventCallback):
    print(f"{eventcallback.ioname} = {eventcallback.iovalue}")

    # Interruptible wait (3 seconds)
    if eventcallback.exit.wait(3):
        print("Wait interrupted!")
        return

    # Check if stop was called
    if eventcallback.exit.is_set():
        return

# Register as threaded event
rpi.io.button.reg_event(threaded_handler, as_thread=True)
__init__(func, name: str, value)[source]

Init EventCallback class.

Parameters:
  • func – Function that should be called at startup

  • name – IO name

  • value – IO value at the time of the event

daemon
exit
func
ioname
iovalue
run()[source]

Calls the registered function.

stop()[source]

Sets the exit event that can be used to terminate the function.

getName()

Return a string used for identification purposes only.

This method is deprecated, use the name attribute instead.

property ident

Thread identifier of this thread or None if it has not been started.

This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.

isDaemon()

Return whether this thread is a daemon.

This method is deprecated, use the daemon attribute instead.

is_alive()

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. See also the module function enumerate().

join(timeout=None)

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating-point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

property name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

property native_id

Native integral thread ID of this thread, or None if it has not been started.

This is a non-negative integer. See the get_native_id() function. This represents the Thread ID as reported by the kernel.

setDaemon(daemonic)

Set whether this thread is a daemon.

This method is deprecated, use the .daemon property instead.

setName(name)

Set the name string for this thread.

This method is deprecated, use the name attribute instead.

start()

Start the thread’s activity.

It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

Methods

ProcimgWriter

class revpimodio2.helper.ProcimgWriter(parentmodio)[source]

Bases: Thread

Class for synchronization thread.

This class is started as a thread if the process image should be synchronized cyclically. This function is mainly used for event handling.

Internal thread for process image writing and event management.

__init__(parentmodio)[source]

Init ProcimgWriter class.

daemon
lck_refresh
newdata
get_refresh() int[source]

Returns cycle time.

Returns:

<class ‘int’> cycle time in milliseconds

run()[source]

Starts automatic process image synchronization.

stop()[source]

Terminates automatic process image synchronization.

set_refresh(value)[source]

Sets the cycle time in milliseconds. @param value <class ‘int’> Milliseconds

property refresh: int

Returns cycle time.

Returns:

<class ‘int’> cycle time in milliseconds

getName()

Return a string used for identification purposes only.

This method is deprecated, use the name attribute instead.

property ident

Thread identifier of this thread or None if it has not been started.

This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.

isDaemon()

Return whether this thread is a daemon.

This method is deprecated, use the daemon attribute instead.

is_alive()

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. See also the module function enumerate().

join(timeout=None)

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating-point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

property name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

property native_id

Native integral thread ID of this thread, or None if it has not been started.

This is a non-negative integer. See the get_native_id() function. This represents the Thread ID as reported by the kernel.

setDaemon(daemonic)

Set whether this thread is a daemon.

This method is deprecated, use the .daemon property instead.

setName(name)

Set the name string for this thread.

This method is deprecated, use the name attribute instead.

start()

Start the thread’s activity.

It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.