Files
2025-11-30 08:30:10 +08:00

10 KiB

SimPy Real-Time Simulations

This guide covers real-time simulation capabilities in SimPy, where simulation time is synchronized with wall-clock time.

Overview

Real-time simulations synchronize simulation time with actual wall-clock time. This is useful for:

  • Hardware-in-the-loop (HIL) testing
  • Human interaction with simulations
  • Algorithm behavior analysis under real-time constraints
  • System integration testing
  • Demonstration purposes

RealtimeEnvironment

Replace the standard Environment with simpy.rt.RealtimeEnvironment to enable real-time synchronization.

Basic Usage

import simpy.rt

def process(env):
    while True:
        print(f'Tick at {env.now}')
        yield env.timeout(1)

# Real-time environment with 1:1 time mapping
env = simpy.rt.RealtimeEnvironment(factor=1.0)
env.process(process(env))
env.run(until=5)

Constructor Parameters

simpy.rt.RealtimeEnvironment(
    initial_time=0,      # Starting simulation time
    factor=1.0,          # Real time per simulation time unit
    strict=True          # Raise errors on timing violations
)

Time Scaling with Factor

The factor parameter controls how simulation time maps to real time.

Factor Examples

import simpy.rt
import time

def timed_process(env, label):
    start = time.time()
    print(f'{label}: Starting at {env.now}')
    yield env.timeout(2)
    elapsed = time.time() - start
    print(f'{label}: Completed at {env.now} (real time: {elapsed:.2f}s)')

# Factor = 1.0: 1 simulation time unit = 1 second
print('Factor = 1.0 (2 sim units = 2 seconds)')
env = simpy.rt.RealtimeEnvironment(factor=1.0)
env.process(timed_process(env, 'Normal speed'))
env.run()

# Factor = 0.5: 1 simulation time unit = 0.5 seconds
print('\nFactor = 0.5 (2 sim units = 1 second)')
env = simpy.rt.RealtimeEnvironment(factor=0.5)
env.process(timed_process(env, 'Double speed'))
env.run()

# Factor = 2.0: 1 simulation time unit = 2 seconds
print('\nFactor = 2.0 (2 sim units = 4 seconds)')
env = simpy.rt.RealtimeEnvironment(factor=2.0)
env.process(timed_process(env, 'Half speed'))
env.run()

Factor interpretation:

  • factor=1.0 → 1 simulation time unit takes 1 real second
  • factor=0.1 → 1 simulation time unit takes 0.1 real seconds (10x faster)
  • factor=60 → 1 simulation time unit takes 60 real seconds (1 minute)

Strict Mode

strict=True (Default)

Raises RuntimeError if computation exceeds allocated real-time budget.

import simpy.rt
import time

def heavy_computation(env):
    print(f'Starting computation at {env.now}')
    yield env.timeout(1)

    # Simulate heavy computation (exceeds 1 second budget)
    time.sleep(1.5)

    print(f'Computation done at {env.now}')

env = simpy.rt.RealtimeEnvironment(factor=1.0, strict=True)
env.process(heavy_computation(env))

try:
    env.run()
except RuntimeError as e:
    print(f'Error: {e}')

strict=False

Allows simulation to run slower than intended without crashing.

import simpy.rt
import time

def heavy_computation(env):
    print(f'Starting at {env.now}')
    yield env.timeout(1)

    # Heavy computation
    time.sleep(1.5)

    print(f'Done at {env.now}')

env = simpy.rt.RealtimeEnvironment(factor=1.0, strict=False)
env.process(heavy_computation(env))
env.run()

print('Simulation completed (slower than real-time)')

Use strict=False when:

  • Development and debugging
  • Computation time is unpredictable
  • Acceptable to run slower than target rate
  • Analyzing worst-case behavior

Hardware-in-the-Loop Example

import simpy.rt

class HardwareInterface:
    """Simulated hardware interface."""

    def __init__(self):
        self.sensor_value = 0

    def read_sensor(self):
        """Simulate reading from hardware sensor."""
        import random
        self.sensor_value = random.uniform(20.0, 30.0)
        return self.sensor_value

    def write_actuator(self, value):
        """Simulate writing to hardware actuator."""
        print(f'Actuator set to {value:.2f}')

def control_loop(env, hardware, setpoint):
    """Simple control loop running in real-time."""
    while True:
        # Read sensor
        sensor_value = hardware.read_sensor()
        print(f'[{env.now}] Sensor: {sensor_value:.2f}°C')

        # Simple proportional control
        error = setpoint - sensor_value
        control_output = error * 0.1

        # Write actuator
        hardware.write_actuator(control_output)

        # Control loop runs every 0.5 seconds
        yield env.timeout(0.5)

# Real-time environment: 1 sim unit = 1 second
env = simpy.rt.RealtimeEnvironment(factor=1.0, strict=False)
hardware = HardwareInterface()
setpoint = 25.0

env.process(control_loop(env, hardware, setpoint))
env.run(until=5)

Human Interaction Example

import simpy.rt

def interactive_process(env):
    """Process that waits for simulated user input."""
    print('Simulation started. Events will occur in real-time.')

    yield env.timeout(2)
    print(f'[{env.now}] Event 1: System startup')

    yield env.timeout(3)
    print(f'[{env.now}] Event 2: Initialization complete')

    yield env.timeout(2)
    print(f'[{env.now}] Event 3: Ready for operation')

# Real-time environment for human-paced demonstration
env = simpy.rt.RealtimeEnvironment(factor=1.0)
env.process(interactive_process(env))
env.run()

Monitoring Real-Time Performance

import simpy.rt
import time

class RealTimeMonitor:
    def __init__(self):
        self.step_times = []
        self.drift_values = []

    def record_step(self, sim_time, real_time, expected_real_time):
        self.step_times.append(sim_time)
        drift = real_time - expected_real_time
        self.drift_values.append(drift)

    def report(self):
        if self.drift_values:
            avg_drift = sum(self.drift_values) / len(self.drift_values)
            max_drift = max(abs(d) for d in self.drift_values)
            print(f'\nReal-time performance:')
            print(f'Average drift: {avg_drift*1000:.2f} ms')
            print(f'Maximum drift: {max_drift*1000:.2f} ms')

def monitored_process(env, monitor, start_time, factor):
    for i in range(5):
        step_start = time.time()
        yield env.timeout(1)

        real_elapsed = time.time() - start_time
        expected_elapsed = env.now * factor
        monitor.record_step(env.now, real_elapsed, expected_elapsed)

        print(f'Sim time: {env.now}, Real time: {real_elapsed:.2f}s, ' +
              f'Expected: {expected_elapsed:.2f}s')

start = time.time()
factor = 1.0
env = simpy.rt.RealtimeEnvironment(factor=factor, strict=False)
monitor = RealTimeMonitor()

env.process(monitored_process(env, monitor, start, factor))
env.run()
monitor.report()

Mixed Real-Time and Fast Simulation

import simpy.rt

def background_simulation(env):
    """Fast background simulation."""
    for i in range(100):
        yield env.timeout(0.01)
    print(f'Background simulation completed at {env.now}')

def real_time_display(env):
    """Real-time display updates."""
    for i in range(5):
        print(f'Display update at {env.now}')
        yield env.timeout(1)

# Note: This is conceptual - SimPy doesn't directly support mixed modes
# Consider running separate simulations or using strict=False
env = simpy.rt.RealtimeEnvironment(factor=1.0, strict=False)
env.process(background_simulation(env))
env.process(real_time_display(env))
env.run()

Converting Standard to Real-Time

Converting a standard simulation to real-time is straightforward:

import simpy
import simpy.rt

def process(env):
    print(f'Event at {env.now}')
    yield env.timeout(1)
    print(f'Event at {env.now}')
    yield env.timeout(1)
    print(f'Event at {env.now}')

# Standard simulation (runs instantly)
print('Standard simulation:')
env = simpy.Environment()
env.process(process(env))
env.run()

# Real-time simulation (2 real seconds)
print('\nReal-time simulation:')
env_rt = simpy.rt.RealtimeEnvironment(factor=1.0)
env_rt.process(process(env_rt))
env_rt.run()

Best Practices

  1. Factor selection: Choose factor based on hardware/human constraints

    • Human interaction: factor=1.0 (1:1 time mapping)
    • Fast hardware: factor=0.01 (100x faster)
    • Slow processes: factor=60 (1 sim unit = 1 minute)
  2. Strict mode usage:

    • Use strict=True for timing validation
    • Use strict=False for development and variable workloads
  3. Computation budget: Ensure process logic executes faster than timeout duration

  4. Error handling: Wrap real-time runs in try-except for timing violations

  5. Testing strategy:

    • Develop with standard Environment (fast iteration)
    • Test with RealtimeEnvironment (validation)
    • Deploy with appropriate factor and strict settings
  6. Performance monitoring: Track drift between simulation and real time

  7. Graceful degradation: Use strict=False when timing guarantees aren't critical

Common Patterns

Periodic Real-Time Tasks

import simpy.rt

def periodic_task(env, name, period, duration):
    """Task that runs periodically in real-time."""
    while True:
        start = env.now
        print(f'{name}: Starting at {start}')

        # Simulate work
        yield env.timeout(duration)

        print(f'{name}: Completed at {env.now}')

        # Wait for next period
        elapsed = env.now - start
        wait_time = period - elapsed
        if wait_time > 0:
            yield env.timeout(wait_time)

env = simpy.rt.RealtimeEnvironment(factor=1.0)
env.process(periodic_task(env, 'Task', period=2.0, duration=0.5))
env.run(until=6)

Synchronized Multi-Device Control

import simpy.rt

def device_controller(env, device_id, update_rate):
    """Control loop for individual device."""
    while True:
        print(f'Device {device_id}: Update at {env.now}')
        yield env.timeout(update_rate)

# All devices synchronized to real-time
env = simpy.rt.RealtimeEnvironment(factor=1.0)

# Different update rates for different devices
env.process(device_controller(env, 'A', 1.0))
env.process(device_controller(env, 'B', 0.5))
env.process(device_controller(env, 'C', 2.0))

env.run(until=5)

Limitations

  1. Performance: Real-time simulation adds overhead; not suitable for high-frequency events
  2. Synchronization: Single-threaded; all processes share same time base
  3. Precision: Limited by Python's time resolution and system scheduling
  4. Strict mode: May raise errors frequently with computationally intensive processes
  5. Platform-dependent: Timing accuracy varies across operating systems