Files
2025-11-30 08:30:10 +08:00

476 lines
13 KiB
Markdown

# SimPy Monitoring and Data Collection
This guide covers techniques for collecting data and monitoring simulation behavior in SimPy.
## Monitoring Strategy
Before implementing monitoring, define three things:
1. **What to monitor**: Processes, resources, events, or system state
2. **When to monitor**: On change, at intervals, or at specific events
3. **How to store data**: Lists, files, databases, or real-time output
## 1. Process Monitoring
### State Variable Tracking
Track process state by recording variables when they change.
```python
import simpy
def customer(env, name, service_time, log):
arrival_time = env.now
log.append(('arrival', name, arrival_time))
yield env.timeout(service_time)
departure_time = env.now
log.append(('departure', name, departure_time))
wait_time = departure_time - arrival_time
log.append(('wait_time', name, wait_time))
env = simpy.Environment()
log = []
env.process(customer(env, 'Customer 1', 5, log))
env.process(customer(env, 'Customer 2', 3, log))
env.run()
print('Simulation log:')
for entry in log:
print(entry)
```
### Time-Series Data Collection
```python
import simpy
def system_monitor(env, system_state, data_log, interval):
while True:
data_log.append((env.now, system_state['queue_length'], system_state['utilization']))
yield env.timeout(interval)
def process(env, system_state):
while True:
system_state['queue_length'] += 1
yield env.timeout(2)
system_state['queue_length'] -= 1
system_state['utilization'] = system_state['queue_length'] / 10
yield env.timeout(3)
env = simpy.Environment()
system_state = {'queue_length': 0, 'utilization': 0.0}
data_log = []
env.process(system_monitor(env, system_state, data_log, interval=1))
env.process(process(env, system_state))
env.run(until=20)
print('Time series data:')
for time, queue, util in data_log:
print(f'Time {time}: Queue={queue}, Utilization={util:.2f}')
```
### Multiple Variable Tracking
```python
import simpy
class SimulationData:
def __init__(self):
self.timestamps = []
self.queue_lengths = []
self.processing_times = []
self.utilizations = []
def record(self, timestamp, queue_length, processing_time, utilization):
self.timestamps.append(timestamp)
self.queue_lengths.append(queue_length)
self.processing_times.append(processing_time)
self.utilizations.append(utilization)
def monitored_process(env, data):
queue_length = 0
processing_time = 0
utilization = 0.0
for i in range(5):
queue_length = i % 3
processing_time = 2 + i
utilization = queue_length / 10
data.record(env.now, queue_length, processing_time, utilization)
yield env.timeout(2)
env = simpy.Environment()
data = SimulationData()
env.process(monitored_process(env, data))
env.run()
print(f'Collected {len(data.timestamps)} data points')
```
## 2. Resource Monitoring
### Monkey-Patching Resources
Patch resource methods to intercept and log operations.
```python
import simpy
def patch_resource(resource, data_log):
"""Patch a resource to log all requests and releases."""
# Save original methods
original_request = resource.request
original_release = resource.release
# Create wrapper for request
def logged_request(*args, **kwargs):
req = original_request(*args, **kwargs)
data_log.append(('request', resource._env.now, len(resource.queue)))
return req
# Create wrapper for release
def logged_release(*args, **kwargs):
result = original_release(*args, **kwargs)
data_log.append(('release', resource._env.now, len(resource.queue)))
return result
# Replace methods
resource.request = logged_request
resource.release = logged_release
def user(env, name, resource):
with resource.request() as req:
yield req
print(f'{name} using resource at {env.now}')
yield env.timeout(3)
print(f'{name} releasing resource at {env.now}')
env = simpy.Environment()
resource = simpy.Resource(env, capacity=1)
log = []
patch_resource(resource, log)
env.process(user(env, 'User 1', resource))
env.process(user(env, 'User 2', resource))
env.run()
print('\nResource log:')
for entry in log:
print(entry)
```
### Resource Subclassing
Create custom resource classes with built-in monitoring.
```python
import simpy
class MonitoredResource(simpy.Resource):
def __init__(self, env, capacity):
super().__init__(env, capacity)
self.data = []
self.utilization_data = []
def request(self, *args, **kwargs):
req = super().request(*args, **kwargs)
queue_length = len(self.queue)
utilization = self.count / self.capacity
self.data.append(('request', self._env.now, queue_length, utilization))
self.utilization_data.append((self._env.now, utilization))
return req
def release(self, *args, **kwargs):
result = super().release(*args, **kwargs)
queue_length = len(self.queue)
utilization = self.count / self.capacity
self.data.append(('release', self._env.now, queue_length, utilization))
self.utilization_data.append((self._env.now, utilization))
return result
def average_utilization(self):
if not self.utilization_data:
return 0.0
return sum(u for _, u in self.utilization_data) / len(self.utilization_data)
def user(env, name, resource):
with resource.request() as req:
yield req
print(f'{name} using resource at {env.now}')
yield env.timeout(2)
env = simpy.Environment()
resource = MonitoredResource(env, capacity=2)
for i in range(5):
env.process(user(env, f'User {i+1}', resource))
env.run()
print(f'\nAverage utilization: {resource.average_utilization():.2%}')
print(f'Total operations: {len(resource.data)}')
```
### Container Level Monitoring
```python
import simpy
class MonitoredContainer(simpy.Container):
def __init__(self, env, capacity, init=0):
super().__init__(env, capacity, init)
self.level_data = [(0, init)]
def put(self, amount):
result = super().put(amount)
self.level_data.append((self._env.now, self.level))
return result
def get(self, amount):
result = super().get(amount)
self.level_data.append((self._env.now, self.level))
return result
def producer(env, container, amount, interval):
while True:
yield env.timeout(interval)
yield container.put(amount)
print(f'Produced {amount}. Level: {container.level} at {env.now}')
def consumer(env, container, amount, interval):
while True:
yield env.timeout(interval)
yield container.get(amount)
print(f'Consumed {amount}. Level: {container.level} at {env.now}')
env = simpy.Environment()
container = MonitoredContainer(env, capacity=100, init=50)
env.process(producer(env, container, 20, 3))
env.process(consumer(env, container, 15, 4))
env.run(until=20)
print('\nLevel history:')
for time, level in container.level_data:
print(f'Time {time}: Level={level}')
```
## 3. Event Tracing
### Environment Step Monitoring
Monitor all events by patching the environment's step function.
```python
import simpy
def trace(env, callback):
"""Trace all events processed by the environment."""
def _trace_step():
# Get next event before it's processed
if env._queue:
time, priority, event_id, event = env._queue[0]
callback(time, priority, event_id, event)
# Call original step
return original_step()
original_step = env.step
env.step = _trace_step
def event_callback(time, priority, event_id, event):
print(f'Event: time={time}, priority={priority}, id={event_id}, type={type(event).__name__}')
def process(env, name):
print(f'{name}: Starting at {env.now}')
yield env.timeout(5)
print(f'{name}: Done at {env.now}')
env = simpy.Environment()
trace(env, event_callback)
env.process(process(env, 'Process 1'))
env.process(process(env, 'Process 2'))
env.run()
```
### Event Scheduling Monitor
Track when events are scheduled.
```python
import simpy
class MonitoredEnvironment(simpy.Environment):
def __init__(self):
super().__init__()
self.scheduled_events = []
def schedule(self, event, priority=simpy.core.NORMAL, delay=0):
super().schedule(event, priority, delay)
scheduled_time = self.now + delay
self.scheduled_events.append((scheduled_time, priority, type(event).__name__))
def process(env, name, delay):
print(f'{name}: Scheduling timeout for {delay} at {env.now}')
yield env.timeout(delay)
print(f'{name}: Resumed at {env.now}')
env = MonitoredEnvironment()
env.process(process(env, 'Process 1', 5))
env.process(process(env, 'Process 2', 3))
env.run()
print('\nScheduled events:')
for time, priority, event_type in env.scheduled_events:
print(f'Time {time}, Priority {priority}, Type {event_type}')
```
## 4. Statistical Monitoring
### Queue Statistics
```python
import simpy
class QueueStatistics:
def __init__(self):
self.arrival_times = []
self.departure_times = []
self.queue_lengths = []
self.wait_times = []
def record_arrival(self, time, queue_length):
self.arrival_times.append(time)
self.queue_lengths.append(queue_length)
def record_departure(self, arrival_time, departure_time):
self.departure_times.append(departure_time)
self.wait_times.append(departure_time - arrival_time)
def average_wait_time(self):
return sum(self.wait_times) / len(self.wait_times) if self.wait_times else 0
def average_queue_length(self):
return sum(self.queue_lengths) / len(self.queue_lengths) if self.queue_lengths else 0
def customer(env, resource, stats):
arrival_time = env.now
stats.record_arrival(arrival_time, len(resource.queue))
with resource.request() as req:
yield req
departure_time = env.now
stats.record_departure(arrival_time, departure_time)
yield env.timeout(2)
env = simpy.Environment()
resource = simpy.Resource(env, capacity=1)
stats = QueueStatistics()
for i in range(5):
env.process(customer(env, resource, stats))
env.run()
print(f'Average wait time: {stats.average_wait_time():.2f}')
print(f'Average queue length: {stats.average_queue_length():.2f}')
```
## 5. Data Export
### CSV Export
```python
import simpy
import csv
def export_to_csv(data, filename):
with open(filename, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['Time', 'Metric', 'Value'])
writer.writerows(data)
def monitored_simulation(env, data_log):
for i in range(10):
data_log.append((env.now, 'queue_length', i % 3))
data_log.append((env.now, 'utilization', (i % 3) / 10))
yield env.timeout(1)
env = simpy.Environment()
data = []
env.process(monitored_simulation(env, data))
env.run()
export_to_csv(data, 'simulation_data.csv')
print('Data exported to simulation_data.csv')
```
### Real-time Plotting (requires matplotlib)
```python
import simpy
import matplotlib.pyplot as plt
class RealTimePlotter:
def __init__(self):
self.times = []
self.values = []
def update(self, time, value):
self.times.append(time)
self.values.append(value)
def plot(self, title='Simulation Results'):
plt.figure(figsize=(10, 6))
plt.plot(self.times, self.values)
plt.xlabel('Time')
plt.ylabel('Value')
plt.title(title)
plt.grid(True)
plt.show()
def monitored_process(env, plotter):
value = 0
for i in range(20):
value = value * 0.9 + (i % 5)
plotter.update(env.now, value)
yield env.timeout(1)
env = simpy.Environment()
plotter = RealTimePlotter()
env.process(monitored_process(env, plotter))
env.run()
plotter.plot('Process Value Over Time')
```
## Best Practices
1. **Minimize overhead**: Only monitor what's necessary; excessive logging can slow simulations
2. **Structured data**: Use classes or named tuples for complex data points
3. **Time-stamping**: Always include timestamps with monitored data
4. **Aggregation**: For long simulations, aggregate data rather than storing every event
5. **Lazy evaluation**: Consider collecting raw data and computing statistics after simulation
6. **Memory management**: For very long simulations, periodically flush data to disk
7. **Validation**: Verify monitoring code doesn't affect simulation behavior
8. **Separation of concerns**: Keep monitoring code separate from simulation logic
9. **Reusable components**: Create generic monitoring classes that can be reused across simulations