Advanced
Advanced features, patterns, and best practices for RevPiModIO.
Custom IOs (Gateway Modules)
Gateway modules (ModbusTCP, Profinet, etc.) allow defining custom IOs dynamically.
Understanding Gateway IOs
Gateway modules provide raw memory regions that you can map to custom IOs with specific data types and addresses.
Defining Custom IOs
Use the replace_io() method to define custom IOs on gateway modules.
Gateway modules provide generic IOs (like Input_1, Output_1, etc.) that you can replace with custom definitions:
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Replace a gateway IO with custom definition
# Gateway IOs have default names like Input_1, Output_1, etc.
rpi.io.Input_1.replace_io(
"temperature", # New IO name
"h", # struct format: signed short
)
# Use the custom IO by its new name
temp = rpi.io.temperature.value / 10.0 # Scale to degrees
print(f"Temperature: {temp}°C")
rpi.exit()
Parameters:
name- Name for the new IO (will be accessible viarpi.io.name)frm- Struct format character (see format codes below)defaultvalue- Optional: Default value for the IObyteorder- Optional: Byte order ('little'or'big'), default is'little'bit- Optional: Bit position for boolean IOs (0-7)event- Optional: Register event callback on creationdelay- Optional: Event debounce delay in millisecondsedge- Optional: Event edge trigger (RISING, FALLING, or BOTH)
Note: The memory address is inherited from the IO being replaced (e.g., Input_1). The new IO uses the same address in the process image.
Struct Format Codes
Common format codes for replace_io (see Python struct format characters for complete reference):
'b'- signed byte (-128 to 127)'B'- unsigned byte (0 to 255)'h'- signed short (-32768 to 32767)'H'- unsigned short (0 to 65535)'i'- signed int (-2147483648 to 2147483647)'I'- unsigned int (0 to 4294967295)'f'- float (32-bit)'d'- float (64-bit)
Multiple Custom IOs
Define multiple custom IOs programmatically by replacing generic gateway IOs:
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Replace multiple gateway IOs with custom definitions
# Assuming a gateway module with Input_1, Input_2, Output_1, Output_2
rpi.io.Input_1.replace_io("temperature", "h")
rpi.io.Input_2.replace_io("humidity", "h")
rpi.io.Output_1.replace_io("setpoint", "h", defaultvalue=700)
rpi.io.Output_2.replace_io("control_word", "H", defaultvalue=0)
# Use all custom IOs by their new names
temp = rpi.io.temperature.value / 10.0
humidity = rpi.io.humidity.value / 10.0
print(f"Temp: {temp}°C, Humidity: {humidity}%")
# Write to output registers
rpi.io.setpoint.value = 750 # 75.0°C
rpi.io.control_word.value = 0x0001 # Enable bit
rpi.exit()
Using Configuration Files
For complex IO configurations, use the replace_io_file parameter to load custom IOs from a file:
# Load custom IOs from configuration file
rpi = revpimodio2.RevPiModIO(
autorefresh=True,
replace_io_file="replace_ios.conf"
)
# Custom IOs are now available
temp = rpi.io.temperature.value / 10.0
print(f"Temperature: {temp}°C")
rpi.exit()
Configuration File Format:
Create an INI-style configuration file (replace_ios.conf):
[temperature]
replace = Input_1
frm = h
[humidity]
replace = Input_2
frm = h
[setpoint]
replace = Output_1
frm = h
defaultvalue = 700
[control_word]
replace = Output_2
frm = H
byteorder = big
Configuration Parameters:
replace- Name of the gateway IO to replace (required)frm- Struct format character (required)bit- Bit position for boolean IOs (0-7)byteorder- Byte order:littleorbig(default:little)wordorder- Word order for multi-word valuesdefaultvalue- Default value for the IObmk- Internal designation/bookmarkexport- Export flag for RevPiPyLoad/RevPiPyControl
Exporting Configuration:
Export your current custom IOs to a file:
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Define custom IOs by replacing gateway IOs
rpi.io.Input_1.replace_io("temperature", "h", defaultvalue=0)
rpi.io.Input_2.replace_io("humidity", "h", defaultvalue=0)
# Export to configuration file
rpi.export_replaced_ios("my_config.conf")
rpi.exit()
This is useful for:
Sharing IO configurations across multiple programs
Integration with RevPiPyLoad and RevPiPyControl
Version control of IO definitions
Declarative IO configuration
Watchdog Management
The hardware watchdog monitors your program and resets the system if it stops responding. If you use the software watchdog will will only works if you use RevPiPyControl as runtime for your python program.
How the Watchdog Works
The watchdog requires periodic toggling. If not toggled within the timeout period, the system resets.
Important: Only enable the watchdog when your program logic is working correctly.
Cyclic Watchdog Toggle
import revpimodio2
def main_cycle(ct):
# Toggle every 10 cycles (200ms @ 20ms)
if ct.flank10c:
ct.core.wd_toggle()
# Your control logic
ct.io.output.value = ct.io.input.value
revpimodio2.run_plc(main_cycle)
Event-Driven Watchdog Toggle
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def toggle_wd(ioname, iovalue):
"""Toggle watchdog every 500ms."""
rpi.core.wd_toggle()
# Register timer event for watchdog
rpi.core.wd.reg_timerevent(toggle_wd, 500, prefire=True)
# Your event handlers
def on_button(ioname, iovalue):
rpi.io.led.value = iovalue
rpi.io.button.reg_event(on_button)
rpi.handlesignalend()
rpi.mainloop()
Performance Optimization
Keep Cycle Logic Fast
Minimize processing time in each cycle:
def optimized_cycle(ct):
# Good: Heavy work only when needed
if ct.flank100c:
expensive_calculation()
# Good: Keep cycle logic minimal
ct.io.output.value = ct.io.input.value
# Bad: Don't do this every cycle
# expensive_calculation() # 100ms processing!
Guidelines:
Keep cycle time ≥20ms for stability
Avoid blocking operations (network, file I/O)
Use flank flags for expensive operations
Profile your cycle function if experiencing timing issues
Choose Appropriate Cycle Time
Match cycle time to application requirements:
# Fast control (motion, high-speed counting)
rpi.cycletime = 20 # 50 Hz
# Normal control (most applications)
rpi.cycletime = 50 # 20 Hz
# Slow monitoring (temperature, status)
rpi.cycletime = 100 # 10 Hz
Trade-offs:
Faster = Higher CPU usage, better responsiveness
Slower = Lower CPU usage, adequate for most tasks
Minimize Event Callbacks
Keep event callbacks lightweight:
# Good: Fast callback
def good_callback(ioname, iovalue):
rpi.io.output.value = iovalue
# Poor: Slow callback blocks event loop
def poor_callback(ioname, iovalue):
time.sleep(1) # Blocks!
complex_calculation() # Slow!
rpi.io.output.value = iovalue
# Better: Use threaded events for slow work
def threaded_callback(eventcallback):
complex_calculation()
rpi.io.output.value = result
rpi.io.trigger.reg_event(threaded_callback, as_thread=True)
Error Handling
Graceful Error Recovery
Always implement safe failure modes:
def safe_cycle(ct):
try:
value = ct.io.sensor.value
result = process(value)
ct.io.output.value = result
except ValueError as e:
print(f"Sensor error: {e}")
ct.io.output.value = 0 # Safe default
except Exception as e:
print(f"Unexpected error: {e}")
ct.io.output.value = False # Safe state
Resource Cleanup
Always clean up resources:
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
try:
# Your program logic
rpi.cycleloop(main_cycle)
except KeyboardInterrupt:
print("Interrupted by user")
except Exception as e:
print(f"Error: {e}")
finally:
# Always clean up
rpi.setdefaultvalues() # Reset outputs to defaults
rpi.exit()
Monitor I/O Errors
Track and handle I/O errors:
maxioerrors = 10 # Exception after 10 errors
def main_cycle(ct):
# Check error count periodically
if ct.flank20c:
if rpi.core.ioerrorcount > maxioerrors:
print(f"Warning: {rpi.core.ioerrorcount} I/O errors detected")
ct.io.warning_led.value = True
# Normal logic
ct.io.output.value = ct.io.input.value
revpimodio2.run_plc(main_cycle)
Best Practices
Naming Conventions
Use descriptive IO names in piCtory:
# Good - Clear intent
if rpi.io.emergency_stop.value:
rpi.io.motor.value = False
rpi.io.alarm.value = True
# Poor - Generic names
if rpi.io.I_15.value:
rpi.io.O_3.value = False
rpi.io.O_7.value = True
Code Organization
Structure your code for maintainability:
import revpimodio2
# Constants
TEMP_HIGH_THRESHOLD = 75
TEMP_LOW_THRESHOLD = 65
# Initialize
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def initialize(ct):
"""Initialize system state."""
ct.var.cooling_active = False
ct.var.alarm_active = False
ct.io.motor.value = False
def monitor_temperature(ct):
"""Temperature monitoring logic."""
temp = ct.io.temperature.value
if temp > TEMP_HIGH_THRESHOLD:
ct.io.cooling.value = True
ct.var.cooling_active = True
if temp < TEMP_LOW_THRESHOLD:
ct.io.cooling.value = False
ct.var.cooling_active = False
def main_cycle(ct):
"""Main control loop."""
if ct.first:
initialize(ct)
monitor_temperature(ct)
if ct.last:
ct.io.cooling.value = False
# Run
try:
rpi.cycleloop(main_cycle)
finally:
rpi.exit()
Documentation
Document complex logic:
def control_cycle(ct):
"""Control cycle for temperature management.
State machine:
- IDLE: Waiting for start
- HEATING: Active heating to setpoint
- COOLING: Active cooling from overshoot
- ERROR: Fault condition
Hysteresis: ±5°C around setpoint
"""
if ct.first:
ct.var.state = "IDLE"
ct.var.setpoint = 70.0
# State machine implementation
# ...
Logging
Implement proper logging:
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def main_cycle(ct):
if ct.first:
logging.info("System started")
ct.var.error_count = 0
# Log errors
if ct.io.error_sensor.value:
ct.var.error_count += 1
logging.error(f"Error detected: {ct.var.error_count}")
# Log status periodically
if ct.flank100c:
logging.info(f"Temperature: {ct.io.temperature.value}°C")
if ct.last:
logging.info("System stopped")
Security Considerations
Validate External Input
Always validate external inputs:
def on_setpoint_change(ioname, iovalue):
"""Validate setpoint range."""
if 0 <= iovalue <= 100:
rpi.io.setpoint.value = iovalue
rpi.io.error_led.value = False
else:
print(f"Invalid setpoint: {iovalue}")
rpi.io.error_led.value = True
Fail-Safe Defaults
Use safe defaults for critical outputs:
def main_cycle(ct):
if ct.first:
# Safe defaults
ct.io.motor.value = False
ct.io.heater.value = False
ct.io.valve.value = False
try:
# Control logic
control_logic(ct)
except Exception as e:
# Revert to safe state on error
ct.io.motor.value = False
ct.io.heater.value = False
See Also
Basics - Core concepts and configuration
Cyclic Programming - Cyclic programming patterns
Event Programming - Event-driven programming patterns
API Reference - API reference