# SimPy Monitoring and Data Collection This guide covers techniques for collecting data and monitoring simulation behavior in SimPy. ## Monitoring Strategy Before implementing monitoring, define three things: 1. **What to monitor**: Processes, resources, events, or system state 2. **When to monitor**: On change, at intervals, or at specific events 3. **How to store data**: Lists, files, databases, or real-time output ## 1. Process Monitoring ### State Variable Tracking Track process state by recording variables when they change. ```python import simpy def customer(env, name, service_time, log): arrival_time = env.now log.append(('arrival', name, arrival_time)) yield env.timeout(service_time) departure_time = env.now log.append(('departure', name, departure_time)) wait_time = departure_time - arrival_time log.append(('wait_time', name, wait_time)) env = simpy.Environment() log = [] env.process(customer(env, 'Customer 1', 5, log)) env.process(customer(env, 'Customer 2', 3, log)) env.run() print('Simulation log:') for entry in log: print(entry) ``` ### Time-Series Data Collection ```python import simpy def system_monitor(env, system_state, data_log, interval): while True: data_log.append((env.now, system_state['queue_length'], system_state['utilization'])) yield env.timeout(interval) def process(env, system_state): while True: system_state['queue_length'] += 1 yield env.timeout(2) system_state['queue_length'] -= 1 system_state['utilization'] = system_state['queue_length'] / 10 yield env.timeout(3) env = simpy.Environment() system_state = {'queue_length': 0, 'utilization': 0.0} data_log = [] env.process(system_monitor(env, system_state, data_log, interval=1)) env.process(process(env, system_state)) env.run(until=20) print('Time series data:') for time, queue, util in data_log: print(f'Time {time}: Queue={queue}, Utilization={util:.2f}') ``` ### Multiple Variable Tracking ```python import simpy class SimulationData: def __init__(self): self.timestamps = [] self.queue_lengths = [] self.processing_times = [] self.utilizations = [] def record(self, timestamp, queue_length, processing_time, utilization): self.timestamps.append(timestamp) self.queue_lengths.append(queue_length) self.processing_times.append(processing_time) self.utilizations.append(utilization) def monitored_process(env, data): queue_length = 0 processing_time = 0 utilization = 0.0 for i in range(5): queue_length = i % 3 processing_time = 2 + i utilization = queue_length / 10 data.record(env.now, queue_length, processing_time, utilization) yield env.timeout(2) env = simpy.Environment() data = SimulationData() env.process(monitored_process(env, data)) env.run() print(f'Collected {len(data.timestamps)} data points') ``` ## 2. Resource Monitoring ### Monkey-Patching Resources Patch resource methods to intercept and log operations. ```python import simpy def patch_resource(resource, data_log): """Patch a resource to log all requests and releases.""" # Save original methods original_request = resource.request original_release = resource.release # Create wrapper for request def logged_request(*args, **kwargs): req = original_request(*args, **kwargs) data_log.append(('request', resource._env.now, len(resource.queue))) return req # Create wrapper for release def logged_release(*args, **kwargs): result = original_release(*args, **kwargs) data_log.append(('release', resource._env.now, len(resource.queue))) return result # Replace methods resource.request = logged_request resource.release = logged_release def user(env, name, resource): with resource.request() as req: yield req print(f'{name} using resource at {env.now}') yield env.timeout(3) print(f'{name} releasing resource at {env.now}') env = simpy.Environment() resource = simpy.Resource(env, capacity=1) log = [] patch_resource(resource, log) env.process(user(env, 'User 1', resource)) env.process(user(env, 'User 2', resource)) env.run() print('\nResource log:') for entry in log: print(entry) ``` ### Resource Subclassing Create custom resource classes with built-in monitoring. ```python import simpy class MonitoredResource(simpy.Resource): def __init__(self, env, capacity): super().__init__(env, capacity) self.data = [] self.utilization_data = [] def request(self, *args, **kwargs): req = super().request(*args, **kwargs) queue_length = len(self.queue) utilization = self.count / self.capacity self.data.append(('request', self._env.now, queue_length, utilization)) self.utilization_data.append((self._env.now, utilization)) return req def release(self, *args, **kwargs): result = super().release(*args, **kwargs) queue_length = len(self.queue) utilization = self.count / self.capacity self.data.append(('release', self._env.now, queue_length, utilization)) self.utilization_data.append((self._env.now, utilization)) return result def average_utilization(self): if not self.utilization_data: return 0.0 return sum(u for _, u in self.utilization_data) / len(self.utilization_data) def user(env, name, resource): with resource.request() as req: yield req print(f'{name} using resource at {env.now}') yield env.timeout(2) env = simpy.Environment() resource = MonitoredResource(env, capacity=2) for i in range(5): env.process(user(env, f'User {i+1}', resource)) env.run() print(f'\nAverage utilization: {resource.average_utilization():.2%}') print(f'Total operations: {len(resource.data)}') ``` ### Container Level Monitoring ```python import simpy class MonitoredContainer(simpy.Container): def __init__(self, env, capacity, init=0): super().__init__(env, capacity, init) self.level_data = [(0, init)] def put(self, amount): result = super().put(amount) self.level_data.append((self._env.now, self.level)) return result def get(self, amount): result = super().get(amount) self.level_data.append((self._env.now, self.level)) return result def producer(env, container, amount, interval): while True: yield env.timeout(interval) yield container.put(amount) print(f'Produced {amount}. Level: {container.level} at {env.now}') def consumer(env, container, amount, interval): while True: yield env.timeout(interval) yield container.get(amount) print(f'Consumed {amount}. Level: {container.level} at {env.now}') env = simpy.Environment() container = MonitoredContainer(env, capacity=100, init=50) env.process(producer(env, container, 20, 3)) env.process(consumer(env, container, 15, 4)) env.run(until=20) print('\nLevel history:') for time, level in container.level_data: print(f'Time {time}: Level={level}') ``` ## 3. Event Tracing ### Environment Step Monitoring Monitor all events by patching the environment's step function. ```python import simpy def trace(env, callback): """Trace all events processed by the environment.""" def _trace_step(): # Get next event before it's processed if env._queue: time, priority, event_id, event = env._queue[0] callback(time, priority, event_id, event) # Call original step return original_step() original_step = env.step env.step = _trace_step def event_callback(time, priority, event_id, event): print(f'Event: time={time}, priority={priority}, id={event_id}, type={type(event).__name__}') def process(env, name): print(f'{name}: Starting at {env.now}') yield env.timeout(5) print(f'{name}: Done at {env.now}') env = simpy.Environment() trace(env, event_callback) env.process(process(env, 'Process 1')) env.process(process(env, 'Process 2')) env.run() ``` ### Event Scheduling Monitor Track when events are scheduled. ```python import simpy class MonitoredEnvironment(simpy.Environment): def __init__(self): super().__init__() self.scheduled_events = [] def schedule(self, event, priority=simpy.core.NORMAL, delay=0): super().schedule(event, priority, delay) scheduled_time = self.now + delay self.scheduled_events.append((scheduled_time, priority, type(event).__name__)) def process(env, name, delay): print(f'{name}: Scheduling timeout for {delay} at {env.now}') yield env.timeout(delay) print(f'{name}: Resumed at {env.now}') env = MonitoredEnvironment() env.process(process(env, 'Process 1', 5)) env.process(process(env, 'Process 2', 3)) env.run() print('\nScheduled events:') for time, priority, event_type in env.scheduled_events: print(f'Time {time}, Priority {priority}, Type {event_type}') ``` ## 4. Statistical Monitoring ### Queue Statistics ```python import simpy class QueueStatistics: def __init__(self): self.arrival_times = [] self.departure_times = [] self.queue_lengths = [] self.wait_times = [] def record_arrival(self, time, queue_length): self.arrival_times.append(time) self.queue_lengths.append(queue_length) def record_departure(self, arrival_time, departure_time): self.departure_times.append(departure_time) self.wait_times.append(departure_time - arrival_time) def average_wait_time(self): return sum(self.wait_times) / len(self.wait_times) if self.wait_times else 0 def average_queue_length(self): return sum(self.queue_lengths) / len(self.queue_lengths) if self.queue_lengths else 0 def customer(env, resource, stats): arrival_time = env.now stats.record_arrival(arrival_time, len(resource.queue)) with resource.request() as req: yield req departure_time = env.now stats.record_departure(arrival_time, departure_time) yield env.timeout(2) env = simpy.Environment() resource = simpy.Resource(env, capacity=1) stats = QueueStatistics() for i in range(5): env.process(customer(env, resource, stats)) env.run() print(f'Average wait time: {stats.average_wait_time():.2f}') print(f'Average queue length: {stats.average_queue_length():.2f}') ``` ## 5. Data Export ### CSV Export ```python import simpy import csv def export_to_csv(data, filename): with open(filename, 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['Time', 'Metric', 'Value']) writer.writerows(data) def monitored_simulation(env, data_log): for i in range(10): data_log.append((env.now, 'queue_length', i % 3)) data_log.append((env.now, 'utilization', (i % 3) / 10)) yield env.timeout(1) env = simpy.Environment() data = [] env.process(monitored_simulation(env, data)) env.run() export_to_csv(data, 'simulation_data.csv') print('Data exported to simulation_data.csv') ``` ### Real-time Plotting (requires matplotlib) ```python import simpy import matplotlib.pyplot as plt class RealTimePlotter: def __init__(self): self.times = [] self.values = [] def update(self, time, value): self.times.append(time) self.values.append(value) def plot(self, title='Simulation Results'): plt.figure(figsize=(10, 6)) plt.plot(self.times, self.values) plt.xlabel('Time') plt.ylabel('Value') plt.title(title) plt.grid(True) plt.show() def monitored_process(env, plotter): value = 0 for i in range(20): value = value * 0.9 + (i % 5) plotter.update(env.now, value) yield env.timeout(1) env = simpy.Environment() plotter = RealTimePlotter() env.process(monitored_process(env, plotter)) env.run() plotter.plot('Process Value Over Time') ``` ## Best Practices 1. **Minimize overhead**: Only monitor what's necessary; excessive logging can slow simulations 2. **Structured data**: Use classes or named tuples for complex data points 3. **Time-stamping**: Always include timestamps with monitored data 4. **Aggregation**: For long simulations, aggregate data rather than storing every event 5. **Lazy evaluation**: Consider collecting raw data and computing statistics after simulation 6. **Memory management**: For very long simulations, periodically flush data to disk 7. **Validation**: Verify monitoring code doesn't affect simulation behavior 8. **Separation of concerns**: Keep monitoring code separate from simulation logic 9. **Reusable components**: Create generic monitoring classes that can be reused across simulations