Initial commit

This commit is contained in:
Zhongwei Li
2025-11-29 17:58:08 +08:00
commit 9f0794c603
13 changed files with 4258 additions and 0 deletions

406
skill/assets/worker_demo.py Normal file
View File

@@ -0,0 +1,406 @@
"""
File Processor - Textual TUI Worker Example
Demonstrates:
- Worker patterns for background tasks
- Progress updates from workers
- Worker cancellation
- Error handling in workers
- Multiple concurrent workers
- Thread-safe UI updates
"""
from textual.app import App, ComposeResult
from textual.containers import Container, Vertical, Horizontal
from textual.widgets import Header, Footer, Button, ProgressBar, Log, Static, Label
from textual.worker import Worker, WorkerState
from pathlib import Path
import asyncio
import time
import hashlib
class FileProcessor(App):
"""File processing app demonstrating worker patterns."""
CSS = """
Screen {
background: $background;
}
Header {
background: $primary;
}
#main {
padding: 1 2;
height: 1fr;
}
#controls {
height: auto;
margin-bottom: 1;
}
Button {
margin: 0 1;
}
#progress-container {
border: solid $primary;
padding: 1;
margin-bottom: 1;
height: auto;
background: $surface;
}
#progress-label {
margin-bottom: 1;
text-style: bold;
color: $accent;
}
ProgressBar {
margin-bottom: 1;
}
#stats {
height: 5;
border: solid $accent;
padding: 1;
margin-bottom: 1;
background: $panel;
}
.stat-line {
margin: 0;
}
#log {
border: solid $primary;
height: 1fr;
background: $surface;
}
"""
BINDINGS = [
("s", "start", "Start"),
("c", "cancel", "Cancel"),
("r", "reset", "Reset"),
("q", "quit", "Quit"),
]
def __init__(self) -> None:
super().__init__()
self.main_worker: Worker | None = None
self.processed_count = 0
self.total_files = 0
self.start_time = 0.0
def compose(self) -> ComposeResult:
"""Compose the UI."""
yield Header(show_clock=True)
with Container(id="main"):
with Horizontal(id="controls"):
yield Button("Start Processing", variant="success", id="start")
yield Button("Cancel", variant="error", id="cancel", disabled=True)
yield Button("Reset", id="reset")
with Container(id="progress-container"):
yield Label("Progress", id="progress-label")
yield ProgressBar(total=100, show_percentage=True, id="progress")
with Container(id="stats"):
yield Static("Status: Idle", classes="stat-line", id="status")
yield Static("Files Processed: 0 / 0", classes="stat-line", id="files")
yield Static("Time Elapsed: 0s", classes="stat-line", id="time")
yield Static("Speed: 0 files/sec", classes="stat-line", id="speed")
yield Log(id="log", auto_scroll=True)
yield Footer()
def on_mount(self) -> None:
"""Initialize on mount."""
log = self.query_one("#log", Log)
log.write_line("[bold cyan]File Processor Started[/]")
log.write_line("Click 'Start Processing' or press 'S' to begin")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "start":
self.action_start()
elif event.button.id == "cancel":
self.action_cancel()
elif event.button.id == "reset":
self.action_reset()
def action_start(self) -> None:
"""Start file processing worker."""
if self.main_worker and not self.main_worker.is_finished:
self.log_message("[yellow]Processing already in progress[/]")
return
# Disable start button, enable cancel
self.query_one("#start", Button).disabled = True
self.query_one("#cancel", Button).disabled = False
# Reset counters
self.processed_count = 0
self.total_files = 100 # Simulated file count
self.start_time = time.time()
# Start worker
self.log_message("[bold green]Starting file processing...[/]")
self.main_worker = self.run_worker(
self.process_files(),
name="file_processor",
group="processing"
)
def action_cancel(self) -> None:
"""Cancel the running worker."""
if self.main_worker and not self.main_worker.is_finished:
self.main_worker.cancel()
self.log_message("[bold red]Cancelling worker...[/]")
def action_reset(self) -> None:
"""Reset the UI."""
if self.main_worker and not self.main_worker.is_finished:
self.log_message("[yellow]Cannot reset while processing[/]")
return
self.processed_count = 0
self.total_files = 0
progress = self.query_one("#progress", ProgressBar)
progress.update(progress=0)
self.update_stats()
self.log_message("[cyan]Reset complete[/]")
async def process_files(self) -> None:
"""
Worker task: Process files in background.
Demonstrates:
- Long-running async operations
- Progress updates via call_from_thread
- Cancellation checking
- Error handling
"""
try:
for i in range(self.total_files):
# Check if cancelled
if self.main_worker and self.main_worker.is_cancelled:
self.log_message("[red]Processing cancelled![/]")
self.finalize_processing(cancelled=True)
return
# Simulate file processing (async I/O)
await asyncio.sleep(0.05)
# Simulate some work
filename = f"file_{i:04d}.txt"
hash_val = hashlib.md5(filename.encode()).hexdigest()[:8]
# Update progress (thread-safe)
self.processed_count = i + 1
progress = (self.processed_count / self.total_files) * 100
# Update UI from worker
self.update_progress(progress, filename, hash_val)
# Log every 10th file
if (i + 1) % 10 == 0:
self.log_message(
f"[green]Processed {self.processed_count}/{self.total_files} files[/]"
)
# Completed successfully
self.log_message("[bold green]✓ All files processed successfully![/]")
self.finalize_processing(cancelled=False)
except Exception as e:
self.log_message(f"[bold red]Error: {e}[/]")
self.finalize_processing(cancelled=False)
raise
def update_progress(self, progress: float, filename: str, hash_val: str) -> None:
"""Update progress bar and stats (called from worker)."""
progress_bar = self.query_one("#progress", ProgressBar)
progress_bar.update(progress=progress)
# Update stats
self.update_stats()
# Update status
status = self.query_one("#status", Static)
status.update(f"Status: Processing {filename} [{hash_val}]")
def update_stats(self) -> None:
"""Update statistics display."""
files_stat = self.query_one("#files", Static)
files_stat.update(f"Files Processed: {self.processed_count} / {self.total_files}")
elapsed = time.time() - self.start_time if self.start_time > 0 else 0
time_stat = self.query_one("#time", Static)
time_stat.update(f"Time Elapsed: {elapsed:.1f}s")
speed = self.processed_count / elapsed if elapsed > 0 else 0
speed_stat = self.query_one("#speed", Static)
speed_stat.update(f"Speed: {speed:.1f} files/sec")
def finalize_processing(self, cancelled: bool) -> None:
"""Finalize after processing completes or is cancelled."""
# Re-enable buttons
self.query_one("#start", Button).disabled = False
self.query_one("#cancel", Button).disabled = True
# Update status
status = self.query_one("#status", Static)
if cancelled:
status.update("Status: Cancelled")
else:
status.update("Status: Complete")
def log_message(self, message: str) -> None:
"""Thread-safe log message."""
log = self.query_one("#log", Log)
log.write_line(message)
def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
"""Handle worker state changes."""
if event.worker.name != "file_processor":
return
if event.state == WorkerState.PENDING:
self.log_message("[cyan]Worker: Pending...[/]")
elif event.state == WorkerState.RUNNING:
self.log_message("[cyan]Worker: Running[/]")
elif event.state == WorkerState.CANCELLED:
self.log_message("[red]Worker: Cancelled[/]")
elif event.state == WorkerState.ERROR:
self.log_message(f"[bold red]Worker: Error - {event.worker.error}[/]")
elif event.state == WorkerState.SUCCESS:
self.log_message("[green]Worker: Completed successfully[/]")
class MultiWorkerDemo(App):
"""Demonstrates running multiple concurrent workers."""
CSS = """
Screen {
background: $background;
}
#main {
padding: 1 2;
}
#controls {
height: auto;
margin-bottom: 1;
}
Button {
margin: 0 1;
}
.worker-card {
border: solid $primary;
padding: 1;
margin-bottom: 1;
background: $surface;
}
.worker-card .title {
text-style: bold;
color: $accent;
margin-bottom: 1;
}
#log {
border: solid $primary;
height: 20;
}
"""
BINDINGS = [
("s", "start_all", "Start All"),
("c", "cancel_all", "Cancel All"),
("q", "quit", "Quit"),
]
def compose(self) -> ComposeResult:
yield Header()
with Container(id="main"):
with Horizontal(id="controls"):
yield Button("Start All Workers", variant="success", id="start")
yield Button("Cancel All", variant="error", id="cancel")
with Container(classes="worker-card"):
yield Label("Worker 1: Data Fetch", classes="title")
yield ProgressBar(total=100, id="worker1")
with Container(classes="worker-card"):
yield Label("Worker 2: Data Process", classes="title")
yield ProgressBar(total=100, id="worker2")
with Container(classes="worker-card"):
yield Label("Worker 3: Data Export", classes="title")
yield ProgressBar(total=100, id="worker3")
yield Log(id="log", auto_scroll=True)
yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "start":
self.action_start_all()
elif event.button.id == "cancel":
self.action_cancel_all()
def action_start_all(self) -> None:
"""Start all workers concurrently."""
self.run_worker(self.worker_task(1, 3.0), name="worker1", group="tasks")
self.run_worker(self.worker_task(2, 5.0), name="worker2", group="tasks")
self.run_worker(self.worker_task(3, 4.0), name="worker3", group="tasks")
log = self.query_one("#log", Log)
log.write_line("[bold cyan]Started all workers concurrently[/]")
def action_cancel_all(self) -> None:
"""Cancel all workers in the group."""
for worker in self.workers:
if worker.group == "tasks":
worker.cancel()
log = self.query_one("#log", Log)
log.write_line("[bold red]Cancelled all workers[/]")
async def worker_task(self, worker_num: int, duration: float) -> None:
"""Simulated worker task."""
log = self.query_one("#log", Log)
log.write_line(f"[cyan]Worker {worker_num} started[/]")
progress_bar = self.query_one(f"#worker{worker_num}", ProgressBar)
steps = 100
for i in range(steps):
await asyncio.sleep(duration / steps)
progress_bar.update(progress=i + 1)
log.write_line(f"[green]Worker {worker_num} completed![/]")
if __name__ == "__main__":
# Run the single worker demo
app = FileProcessor()
app.run()
# Or run the multi-worker demo
# app = MultiWorkerDemo()
# app.run()