Event Programming
Event-driven programming uses callbacks triggered by hardware state changes.
When to Use Event-Driven Programming
Event-driven programming is ideal for:
Handling user interactions
Processing occasional events
Background tasks
System integration
Low CPU usage requirements
Advantages:
Consumes CPU only when events occur
Natural for user interfaces
Simple asynchronous operation
Efficient for sporadic events
Considerations:
Non-deterministic timing
Must handle concurrent events carefully
Less intuitive for continuous control
Basic Structure
Simple Event Handler
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def on_button_change(ioname, iovalue):
"""Called when button changes."""
print(f"{ioname} = {iovalue}")
rpi.io.led.value = iovalue
# Register event
rpi.io.button.reg_event(on_button_change)
# Run main loop
rpi.handlesignalend()
rpi.mainloop()
The callback function receives:
ioname- Name of the IO that changediovalue- New value of the IO
Event Registration
Value Change Events
Register callbacks for IO value changes:
def on_change(ioname, iovalue):
print(f"{ioname} changed to {iovalue}")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Any change
rpi.io.sensor.reg_event(on_change)
# Rising edge only
rpi.io.button.reg_event(on_change, edge=revpimodio2.RISING)
# Falling edge only
rpi.io.button.reg_event(on_change, edge=revpimodio2.FALLING)
rpi.handlesignalend()
rpi.mainloop()
Edge types:
revpimodio2.RISING- False to True transitionrevpimodio2.FALLING- True to False transitionrevpimodio2.BOTH- Any change (default)
Multiple Events
Register multiple callbacks on one IO:
def on_press(ioname, iovalue):
print("Pressed!")
def on_release(ioname, iovalue):
print("Released!")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Different callbacks for different edges
rpi.io.button.reg_event(on_press, edge=revpimodio2.RISING)
rpi.io.button.reg_event(on_release, edge=revpimodio2.FALLING)
rpi.handlesignalend()
rpi.mainloop()
Or register one callback on multiple IOs:
def any_sensor_changed(ioname, iovalue):
print(f"{ioname} changed to {iovalue}")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Same callback for multiple sensors
for sensor in ["sensor1", "sensor2", "sensor3"]:
if sensor in rpi.io:
rpi.io[sensor].reg_event(any_sensor_changed)
rpi.handlesignalend()
rpi.mainloop()
Debouncing
Add debounce delays to filter noise and false triggers:
def on_stable_press(ioname, iovalue):
"""Called only after button is stable for 50ms."""
print(f"Confirmed: {iovalue}")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# 50ms debounce delay
rpi.io.noisy_button.reg_event(on_stable_press, delay=50)
rpi.handlesignalend()
rpi.mainloop()
How debouncing works:
IO value changes
RevPiModIO waits for
delaymillisecondsIf value is still changed, callback is triggered
If value changed back, callback is not triggered
Typical debounce times:
Mechanical switches: 20-50ms
Relays: 10-20ms
Analog sensors: 100-500ms
Debouncing with Edge Detection
def on_confirmed_press(ioname, iovalue):
"""Called only for stable button press."""
print("Confirmed press")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Rising edge with 30ms debounce
rpi.io.button.reg_event(
on_confirmed_press,
edge=revpimodio2.RISING,
delay=30
)
rpi.handlesignalend()
rpi.mainloop()
Timer Events
The timer is started when the IO value changes and executes the passed function - even if the IO value has changed in the meantime. If the timer has not expired and the condition is met again, the timer is NOT reset to the delay value or started a second time.
def periodic_task(ioname, iovalue):
"""Called after 500ms."""
print(f"Periodic task: {iovalue}")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Execute every 500ms
rpi.io.dummy.reg_timerevent(periodic_task, 500)
rpi.handlesignalend()
rpi.mainloop()
Timer Event Parameters
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def blink_led(ioname, iovalue):
"""Toggle LED every 500ms."""
rpi.io.blink_led.value = not rpi.io.blink_led.value
def log_temperature(ioname, iovalue):
"""Log temperature every 5 seconds."""
temp = rpi.io.temperature.value
print(f"Temperature: {temp}°C")
# Blink every 500ms, trigger immediately
rpi.io.blink_led.reg_timerevent(blink_led, 500, prefire=True)
# Log every 5 seconds, don't trigger immediately
rpi.io.temperature.reg_timerevent(log_temperature, 5000, prefire=False)
rpi.handlesignalend()
rpi.mainloop()
Parameters:
interval- Delay in milliseconds.prefire- If True, trigger immediately after starting the mainloop.
Threaded Events
Use threaded events for long-running operations that would block the main loop:
def long_task(eventcallback: revpimodio2.EventCallback):
"""Threaded handler for time-consuming tasks."""
print(f"Starting task for {eventcallback.ioname}")
for i in range(10):
# Check if stop requested
if eventcallback.exit.is_set():
print("Task cancelled")
return
# Interruptible wait (1 second)
eventcallback.exit.wait(1)
print(f"Progress: {(i+1)*10}%")
print("Task complete")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Register as threaded event
rpi.io.trigger.reg_event(
long_task,
as_thread=True,
edge=revpimodio2.RISING
)
rpi.handlesignalend()
rpi.mainloop()
EventCallback Object
Threaded callbacks receive an EventCallback object with:
eventcallback.ioname- Name of the IOeventcallback.iovalue- Value that triggered eventeventcallback.exit-threading.Eventfor cancellation
Important: Always check eventcallback.exit.is_set() periodically to allow graceful shutdown.
Interruptible Sleep
Use eventcallback.exit.wait() instead of time.sleep() for interruptible delays:
def background_task(eventcallback: revpimodio2.EventCallback):
"""Long task with interruptible waits."""
while not eventcallback.exit.is_set():
# Do some work
process_data()
# Wait 5 seconds or until exit requested
if eventcallback.exit.wait(5):
break # Exit was requested
rpi = revpimodio2.RevPiModIO(autorefresh=True)
rpi.io.trigger.reg_event(background_task, as_thread=True)
rpi.handlesignalend()
rpi.mainloop()
Unregistering Events
Remove event callbacks when no longer needed:
def my_callback(ioname, iovalue):
print(f"{ioname} = {iovalue}")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Register event
rpi.io.button.reg_event(my_callback)
# Unregister specific callback
rpi.io.button.unreg_event(my_callback)
# Unregister all events for this IO
rpi.io.button.unreg_event()
Practical Examples
Sensor Logging
import revpimodio2
from datetime import datetime
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def log_sensor_change(ioname, iovalue):
"""Log sensor changes with timestamp."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"{timestamp} - {ioname}: {iovalue}")
# Log all sensor changes
for io_name in ["sensor1", "sensor2", "temperature"]:
if io_name in rpi.io:
rpi.io[io_name].reg_event(log_sensor_change)
rpi.handlesignalend()
rpi.mainloop()
Threaded Data Processing
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def process_batch(eventcallback: revpimodio2.EventCallback):
"""Process data batch in background thread."""
print(f"Starting batch processing...")
batch_size = 100
for i in range(batch_size):
if eventcallback.exit.is_set():
print("Processing cancelled")
return
# Simulate processing
eventcallback.exit.wait(0.1)
if i % 10 == 0:
print(f"Progress: {i}/{batch_size}")
print("Batch processing complete")
rpi.io.done_led.value = True
# Trigger on button press
rpi.io.start_batch.reg_event(
process_batch,
as_thread=True,
edge=revpimodio2.RISING
)
rpi.handlesignalend()
rpi.mainloop()
Best Practices
Keep Callbacks Fast
Event callbacks should complete quickly:
# Good - Fast callback
def good_callback(ioname, iovalue):
rpi.io.output.value = iovalue
# Poor - Blocking callback
def poor_callback(ioname, iovalue):
time.sleep(5) # Blocks event loop!
rpi.io.output.value = iovalue
For slow operations, use threaded events:
def slow_task(eventcallback):
# Long operation in separate thread
process_data()
rpi.io.trigger.reg_event(slow_task, as_thread=True)
Handle Errors Gracefully
Protect callbacks from exceptions:
def safe_callback(ioname, iovalue):
try:
result = risky_operation(iovalue)
rpi.io.output.value = result
except Exception as e:
print(f"Error in callback: {e}")
rpi.io.output.value = False # Safe state
Clean Up Threads
Threaded events are automatically cleaned up on exit, but you can manually unregister:
def long_task(eventcallback):
while not eventcallback.exit.is_set():
work()
eventcallback.exit.wait(1)
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Register
rpi.io.trigger.reg_event(long_task, as_thread=True)
# Later: unregister to stop thread
rpi.io.trigger.unreg_event(long_task)
See Also
Basics - Core concepts and configuration
Cyclic Programming - Cyclic programming patterns
Advanced - Combining paradigms and advanced topics
Helper Classes - EventCallback API reference