11 KiB
SimPy Process Interaction
This guide covers the mechanisms for processes to interact and synchronize in SimPy simulations.
Interaction Mechanisms Overview
SimPy provides three primary ways for processes to interact:
- Event-based passivation/reactivation - Shared events for signaling
- Waiting for process termination - Yielding process objects
- Interruption - Forcefully resuming paused processes
1. Event-Based Passivation and Reactivation
Processes can share events to coordinate their execution.
Basic Signal Pattern
import simpy
def controller(env, signal_event):
print(f'Controller: Preparing at {env.now}')
yield env.timeout(5)
print(f'Controller: Sending signal at {env.now}')
signal_event.succeed()
def worker(env, signal_event):
print(f'Worker: Waiting for signal at {env.now}')
yield signal_event
print(f'Worker: Received signal, starting work at {env.now}')
yield env.timeout(3)
print(f'Worker: Work complete at {env.now}')
env = simpy.Environment()
signal = env.event()
env.process(controller(env, signal))
env.process(worker(env, signal))
env.run()
Use cases:
- Start signals for coordinated operations
- Completion notifications
- Broadcasting state changes
Multiple Waiters
Multiple processes can wait for the same signal event.
import simpy
def broadcaster(env, signal):
yield env.timeout(5)
print(f'Broadcasting signal at {env.now}')
signal.succeed(value='Go!')
def listener(env, name, signal):
print(f'{name}: Waiting at {env.now}')
msg = yield signal
print(f'{name}: Received "{msg}" at {env.now}')
yield env.timeout(2)
print(f'{name}: Done at {env.now}')
env = simpy.Environment()
broadcast_signal = env.event()
env.process(broadcaster(env, broadcast_signal))
for i in range(3):
env.process(listener(env, f'Listener {i+1}', broadcast_signal))
env.run()
Barrier Synchronization
import simpy
class Barrier:
def __init__(self, env, n):
self.env = env
self.n = n
self.count = 0
self.event = env.event()
def wait(self):
self.count += 1
if self.count >= self.n:
self.event.succeed()
return self.event
def worker(env, barrier, name, work_time):
print(f'{name}: Working at {env.now}')
yield env.timeout(work_time)
print(f'{name}: Reached barrier at {env.now}')
yield barrier.wait()
print(f'{name}: Passed barrier at {env.now}')
env = simpy.Environment()
barrier = Barrier(env, 3)
env.process(worker(env, barrier, 'Worker A', 3))
env.process(worker(env, barrier, 'Worker B', 5))
env.process(worker(env, barrier, 'Worker C', 7))
env.run()
2. Waiting for Process Termination
Processes are events themselves, so you can yield them to wait for completion.
Sequential Process Execution
import simpy
def task(env, name, duration):
print(f'{name}: Starting at {env.now}')
yield env.timeout(duration)
print(f'{name}: Completed at {env.now}')
return f'{name} result'
def sequential_coordinator(env):
# Execute tasks sequentially
result1 = yield env.process(task(env, 'Task 1', 5))
print(f'Coordinator: {result1}')
result2 = yield env.process(task(env, 'Task 2', 3))
print(f'Coordinator: {result2}')
result3 = yield env.process(task(env, 'Task 3', 4))
print(f'Coordinator: {result3}')
env = simpy.Environment()
env.process(sequential_coordinator(env))
env.run()
Parallel Process Execution
import simpy
def task(env, name, duration):
print(f'{name}: Starting at {env.now}')
yield env.timeout(duration)
print(f'{name}: Completed at {env.now}')
return f'{name} result'
def parallel_coordinator(env):
# Start all tasks
task1 = env.process(task(env, 'Task 1', 5))
task2 = env.process(task(env, 'Task 2', 3))
task3 = env.process(task(env, 'Task 3', 4))
# Wait for all to complete
results = yield task1 & task2 & task3
print(f'All tasks completed at {env.now}')
print(f'Task 1 result: {task1.value}')
print(f'Task 2 result: {task2.value}')
print(f'Task 3 result: {task3.value}')
env = simpy.Environment()
env.process(parallel_coordinator(env))
env.run()
First-to-Complete Pattern
import simpy
def server(env, name, processing_time):
print(f'{name}: Starting request at {env.now}')
yield env.timeout(processing_time)
print(f'{name}: Completed at {env.now}')
return name
def load_balancer(env):
# Send request to multiple servers
server1 = env.process(server(env, 'Server 1', 5))
server2 = env.process(server(env, 'Server 2', 3))
server3 = env.process(server(env, 'Server 3', 7))
# Wait for first to respond
result = yield server1 | server2 | server3
# Get the winner
winner = list(result.values())[0]
print(f'Load balancer: {winner} responded first at {env.now}')
env = simpy.Environment()
env.process(load_balancer(env))
env.run()
3. Process Interruption
Processes can be interrupted using process.interrupt(), which throws an Interrupt exception.
Basic Interruption
import simpy
def worker(env):
try:
print(f'Worker: Starting long task at {env.now}')
yield env.timeout(10)
print(f'Worker: Task completed at {env.now}')
except simpy.Interrupt as interrupt:
print(f'Worker: Interrupted at {env.now}')
print(f'Interrupt cause: {interrupt.cause}')
def interrupter(env, target_process):
yield env.timeout(5)
print(f'Interrupter: Interrupting worker at {env.now}')
target_process.interrupt(cause='Higher priority task')
env = simpy.Environment()
worker_process = env.process(worker(env))
env.process(interrupter(env, worker_process))
env.run()
Resumable Interruption
Process can re-yield the same event after interruption to continue waiting.
import simpy
def resumable_worker(env):
work_left = 10
while work_left > 0:
try:
print(f'Worker: Working ({work_left} units left) at {env.now}')
start = env.now
yield env.timeout(work_left)
work_left = 0
print(f'Worker: Completed at {env.now}')
except simpy.Interrupt:
work_left -= (env.now - start)
print(f'Worker: Interrupted! {work_left} units left at {env.now}')
def interrupter(env, worker_proc):
yield env.timeout(3)
worker_proc.interrupt()
yield env.timeout(2)
worker_proc.interrupt()
env = simpy.Environment()
worker_proc = env.process(resumable_worker(env))
env.process(interrupter(env, worker_proc))
env.run()
Interrupt with Custom Cause
import simpy
def machine(env, name):
while True:
try:
print(f'{name}: Operating at {env.now}')
yield env.timeout(5)
except simpy.Interrupt as interrupt:
if interrupt.cause == 'maintenance':
print(f'{name}: Maintenance required at {env.now}')
yield env.timeout(2)
print(f'{name}: Maintenance complete at {env.now}')
elif interrupt.cause == 'emergency':
print(f'{name}: Emergency stop at {env.now}')
break
def maintenance_scheduler(env, machine_proc):
yield env.timeout(7)
machine_proc.interrupt(cause='maintenance')
yield env.timeout(10)
machine_proc.interrupt(cause='emergency')
env = simpy.Environment()
machine_proc = env.process(machine(env, 'Machine 1'))
env.process(maintenance_scheduler(env, machine_proc))
env.run()
Preemptive Resource with Interruption
import simpy
def user(env, name, resource, priority, duration):
with resource.request(priority=priority) as req:
try:
yield req
print(f'{name} (priority {priority}): Got resource at {env.now}')
yield env.timeout(duration)
print(f'{name}: Done at {env.now}')
except simpy.Interrupt:
print(f'{name}: Preempted at {env.now}')
env = simpy.Environment()
resource = simpy.PreemptiveResource(env, capacity=1)
env.process(user(env, 'Low priority user', resource, priority=10, duration=10))
env.process(user(env, 'High priority user', resource, priority=1, duration=5))
env.run()
Advanced Patterns
Producer-Consumer with Signaling
import simpy
class Buffer:
def __init__(self, env, capacity):
self.env = env
self.capacity = capacity
self.items = []
self.item_available = env.event()
def put(self, item):
if len(self.items) < self.capacity:
self.items.append(item)
if not self.item_available.triggered:
self.item_available.succeed()
return True
return False
def get(self):
if self.items:
return self.items.pop(0)
return None
def producer(env, buffer):
item_id = 0
while True:
yield env.timeout(2)
item = f'Item {item_id}'
if buffer.put(item):
print(f'Producer: Added {item} at {env.now}')
item_id += 1
def consumer(env, buffer):
while True:
if buffer.items:
item = buffer.get()
print(f'Consumer: Retrieved {item} at {env.now}')
yield env.timeout(3)
else:
print(f'Consumer: Waiting for items at {env.now}')
yield buffer.item_available
buffer.item_available = env.event()
env = simpy.Environment()
buffer = Buffer(env, capacity=5)
env.process(producer(env, buffer))
env.process(consumer(env, buffer))
env.run(until=20)
Handshake Protocol
import simpy
def sender(env, request_event, acknowledge_event):
for i in range(3):
print(f'Sender: Sending request {i} at {env.now}')
request_event.succeed(value=f'Request {i}')
yield acknowledge_event
print(f'Sender: Received acknowledgment at {env.now}')
# Reset events for next iteration
request_event = env.event()
acknowledge_event = env.event()
yield env.timeout(1)
def receiver(env, request_event, acknowledge_event):
for i in range(3):
request = yield request_event
print(f'Receiver: Got {request} at {env.now}')
yield env.timeout(2) # Process request
acknowledge_event.succeed()
print(f'Receiver: Sent acknowledgment at {env.now}')
# Reset for next iteration
request_event = env.event()
acknowledge_event = env.event()
env = simpy.Environment()
request = env.event()
ack = env.event()
env.process(sender(env, request, ack))
env.process(receiver(env, request, ack))
env.run()
Best Practices
-
Choose the right mechanism:
- Use events for signals and broadcasts
- Use process yields for sequential/parallel workflows
- Use interrupts for preemption and emergency handling
-
Exception handling: Always wrap interrupt-prone code in try-except blocks
-
Event lifecycle: Remember that events can only be triggered once; create new events for repeated signaling
-
Process references: Store process objects if you need to interrupt them later
-
Cause information: Use interrupt causes to communicate why interruption occurred
-
Resumable patterns: Track progress to enable resumption after interruption
-
Avoid deadlocks: Ensure at least one process can make progress at any time