From 12f823b4a9916c99a275992859cc8f6710b38fc8 Mon Sep 17 00:00:00 2001 From: Zhongwei Li Date: Sun, 30 Nov 2025 08:44:08 +0800 Subject: [PATCH] Initial commit --- .claude-plugin/plugin.json | 11 + README.md | 3 + plugin.lock.json | 125 ++ skills/pocketflow/SKILL.md | 818 +++++++++ skills/pocketflow/assets/COOKBOOK_GUIDE.md | 265 +++ skills/pocketflow/assets/common_patterns.py | 285 +++ skills/pocketflow/assets/examples/01_chat.py | 85 + .../pocketflow/assets/examples/02_workflow.py | 120 ++ skills/pocketflow/assets/examples/03_agent.py | 165 ++ skills/pocketflow/assets/examples/04_rag.py | 226 +++ .../assets/examples/05_structured_output.py | 175 ++ .../assets/examples/06_multi_agent.py | 153 ++ skills/pocketflow/assets/flow_template.py | 147 ++ skills/pocketflow/assets/node_template.py | 124 ++ skills/pocketflow/assets/template/README.md | 80 + skills/pocketflow/assets/template/flow.py | 37 + skills/pocketflow/assets/template/main.py | 35 + skills/pocketflow/assets/template/nodes.py | 56 + .../assets/template/requirements.txt | 20 + skills/pocketflow/assets/template/utils.py | 61 + .../pocketflow/references/core_abstraction.md | 1634 +++++++++++++++++ skills/pocketflow/references/index.md | 7 + skills/pocketflow/scripts/pocketflow_init.py | 243 +++ .../pocketflow/scripts/test_llm_connection.py | 76 + 24 files changed, 4951 insertions(+) create mode 100644 .claude-plugin/plugin.json create mode 100644 README.md create mode 100644 plugin.lock.json create mode 100644 skills/pocketflow/SKILL.md create mode 100644 skills/pocketflow/assets/COOKBOOK_GUIDE.md create mode 100644 skills/pocketflow/assets/common_patterns.py create mode 100644 skills/pocketflow/assets/examples/01_chat.py create mode 100644 skills/pocketflow/assets/examples/02_workflow.py create mode 100644 skills/pocketflow/assets/examples/03_agent.py create mode 100644 skills/pocketflow/assets/examples/04_rag.py create mode 100644 skills/pocketflow/assets/examples/05_structured_output.py create mode 100644 skills/pocketflow/assets/examples/06_multi_agent.py create mode 100644 skills/pocketflow/assets/flow_template.py create mode 100644 skills/pocketflow/assets/node_template.py create mode 100644 skills/pocketflow/assets/template/README.md create mode 100644 skills/pocketflow/assets/template/flow.py create mode 100644 skills/pocketflow/assets/template/main.py create mode 100644 skills/pocketflow/assets/template/nodes.py create mode 100644 skills/pocketflow/assets/template/requirements.txt create mode 100644 skills/pocketflow/assets/template/utils.py create mode 100644 skills/pocketflow/references/core_abstraction.md create mode 100644 skills/pocketflow/references/index.md create mode 100644 skills/pocketflow/scripts/pocketflow_init.py create mode 100644 skills/pocketflow/scripts/test_llm_connection.py diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json new file mode 100644 index 0000000..0d23f74 --- /dev/null +++ b/.claude-plugin/plugin.json @@ -0,0 +1,11 @@ +{ + "name": "pocketflow", + "description": "PocketFlow Skill, cookbook examples, and templates for graph-based LLM workflows.", + "version": "0.1.0", + "author": { + "name": "claude_market" + }, + "skills": [ + "./skills" + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..b8f73ee --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# pocketflow + +PocketFlow Skill, cookbook examples, and templates for graph-based LLM workflows. diff --git a/plugin.lock.json b/plugin.lock.json new file mode 100644 index 0000000..bcc5eba --- /dev/null +++ b/plugin.lock.json @@ -0,0 +1,125 @@ +{ + "$schema": "internal://schemas/plugin.lock.v1.json", + "pluginId": "gh:nickth3man/claude_market:", + "normalized": { + "repo": null, + "ref": "refs/tags/v20251128.0", + "commit": "3dcde5a420f3512ccd94447ba4d489bb75e4da74", + "treeHash": "c706eb7932cd2b39a52a2cee7cd8773669eaf7a2e960c7baf9360b739aea0fd0", + "generatedAt": "2025-11-28T10:27:22.920130Z", + "toolVersion": "publish_plugins.py@0.2.0" + }, + "origin": { + "remote": "git@github.com:zhongweili/42plugin-data.git", + "branch": "master", + "commit": "aa1497ed0949fd50e99e70d6324a29c5b34f9390", + "repoRoot": "/Users/zhongweili/projects/openmind/42plugin-data" + }, + "manifest": { + "name": "pocketflow", + "description": "PocketFlow Skill, cookbook examples, and templates for graph-based LLM workflows.", + "version": "0.1.0" + }, + "content": { + "files": [ + { + "path": "README.md", + "sha256": "00581303a0dc6275ccceac9b24b5bcc8e43322902d16cf9b9e565472b7f85aeb" + }, + { + "path": ".claude-plugin/plugin.json", + "sha256": "15ca0a8418cf1531fb01452160bd6e8fe2e406245be12e6e4ffe31e4c3d30ec8" + }, + { + "path": "skills/pocketflow/SKILL.md", + "sha256": "32f55f767bea1413ee9a2cb3aefc34ff42fd7255538fc67a6eda1e112b717939" + }, + { + "path": "skills/pocketflow/references/index.md", + "sha256": "7e7d5af288db4f657137e9cb1fd85390b66364dc72d58a50d0246af18b428aa0" + }, + { + "path": "skills/pocketflow/references/core_abstraction.md", + "sha256": "f4994d3656865af58e5c23775eaf310d3ff5a038d100eb6ae5174a6eab5c4956" + }, + { + "path": "skills/pocketflow/scripts/test_llm_connection.py", + "sha256": "55543b1e1e8ba45c1086303c385509dbcb4df3d0d7853884311ee8ca4c723739" + }, + { + "path": "skills/pocketflow/scripts/pocketflow_init.py", + "sha256": "623f9300acf1f85f16773cf3d0b5ef9ea7e367c1c9792e8f51537dc708504f7a" + }, + { + "path": "skills/pocketflow/assets/node_template.py", + "sha256": "9285b59fe7997d6afb2eb52137d534e64facfbcda6f029066478f97dc90b3693" + }, + { + "path": "skills/pocketflow/assets/COOKBOOK_GUIDE.md", + "sha256": "9dece77aaaca8333ff8a3ba08dd5a9300a17ae9f1b48e622f9b0a1de1ab6d912" + }, + { + "path": "skills/pocketflow/assets/flow_template.py", + "sha256": "9f084f2cfa7d77af9679680f816f8eba7efe5c7f87aae06f06cd301aa0269a66" + }, + { + "path": "skills/pocketflow/assets/common_patterns.py", + "sha256": "a65fd443134dc221721f4dd5a7281338086af6be1aff450fc7d52f5708c987ef" + }, + { + "path": "skills/pocketflow/assets/template/requirements.txt", + "sha256": "803925d18029065aebf602daec67f7c899c5dbb3ef119a96ac0d18977a1a02be" + }, + { + "path": "skills/pocketflow/assets/template/flow.py", + "sha256": "7bbe5d49da1185278110ecf2b3b66ff4e429188d790d59dde7b40a5f4857492f" + }, + { + "path": "skills/pocketflow/assets/template/README.md", + "sha256": "32394b494c84de8e0ff8b9e7752d271c99470124592c4c4628248cf8a2993c89" + }, + { + "path": "skills/pocketflow/assets/template/utils.py", + "sha256": "185b7a2142dc5214c03bee225ed409dc8542474f09edf88fa4eac30bfd385a36" + }, + { + "path": "skills/pocketflow/assets/template/nodes.py", + "sha256": "1383549612f16c4c86be6de34c6ecdad6c7e95b0870d8399ed4f8f390b2c8c4c" + }, + { + "path": "skills/pocketflow/assets/template/main.py", + "sha256": "403c790ed68283e180b183016eb54eef2a7da937b6f796fa2e7e1238cd69d0de" + }, + { + "path": "skills/pocketflow/assets/examples/06_multi_agent.py", + "sha256": "95722b05c2e69f46835e391ee21fc0b5a3cfb11ad91fed9ad119482dffeb3cfc" + }, + { + "path": "skills/pocketflow/assets/examples/04_rag.py", + "sha256": "5e8a9a004bd6ff5795ba95f748d08210f8372999f7847c59082c40554df25b0f" + }, + { + "path": "skills/pocketflow/assets/examples/03_agent.py", + "sha256": "6b4739979b992c8adf8e4e40ec1a6a5864b0493a4a09312b5df8f8e9fd7e02ad" + }, + { + "path": "skills/pocketflow/assets/examples/01_chat.py", + "sha256": "0c94c2fee13479f9bb5cbc1a6ea8fdc02cfafbf56312455c510d3737650225ae" + }, + { + "path": "skills/pocketflow/assets/examples/02_workflow.py", + "sha256": "bd6e8b4154afd3b91075dafacb231a90413886f3455ecd6a7de0c294578cba1f" + }, + { + "path": "skills/pocketflow/assets/examples/05_structured_output.py", + "sha256": "5c8b411adee588b690e93bd2a892f336c577ea876eac13d379c4b118a00b52bd" + } + ], + "dirSha256": "c706eb7932cd2b39a52a2cee7cd8773669eaf7a2e960c7baf9360b739aea0fd0" + }, + "security": { + "scannedAt": null, + "scannerVersion": null, + "flags": [] + } +} \ No newline at end of file diff --git a/skills/pocketflow/SKILL.md b/skills/pocketflow/SKILL.md new file mode 100644 index 0000000..37c5d46 --- /dev/null +++ b/skills/pocketflow/SKILL.md @@ -0,0 +1,818 @@ +--- +name: pocketflow +description: PocketFlow framework for building LLM applications with graph-based abstractions, design patterns, and agentic coding workflows +--- + +# PocketFlow Skill + +A comprehensive guide to building LLM applications using PocketFlow - a 100-line minimalist framework for Agents, Task Decomposition, RAG, and more. + +## When to Use This Skill + +Activate this skill when working with: +- **Graph-based LLM workflows** - Building complex AI systems with nodes and flows +- **Agentic applications** - Creating autonomous agents with dynamic action selection +- **Task decomposition** - Breaking down complex LLM tasks into manageable steps +- **RAG systems** - Implementing Retrieval Augmented Generation pipelines +- **Batch processing** - Handling large inputs or multiple files with LLMs +- **Multi-agent systems** - Coordinating multiple AI agents +- **Async workflows** - Building I/O-bound LLM applications with concurrency + +## Core Concepts + +### Architecture Overview + +PocketFlow models LLM workflows as **Graph + Shared Store**: + +```python +# Shared Store: Central data storage +shared = { + "data": {}, + "summary": {}, + "config": {...} +} + +# Graph: Nodes connected by transitions +node_a >> node_b >> node_c +flow = Flow(start=node_a) +flow.run(shared) +``` + +### The Node: Building Block + +Every Node has 3 steps: `prep()` → `exec()` → `post()` + +```python +class SummarizeFile(Node): + def prep(self, shared): + # Get data from shared store + return shared["data"] + + def exec(self, prep_res): + # Process with LLM (retries built-in) + prompt = f"Summarize this text in 10 words: {prep_res}" + summary = call_llm(prompt) + return summary + + def post(self, shared, prep_res, exec_res): + # Write results back to shared store + shared["summary"] = exec_res + return "default" # Action for flow control +``` + +**Why 3 steps?** Separation of concerns - data storage and processing operate separately. + +### The Flow: Orchestration + +```python +# Simple sequence +load_data >> summarize >> save_result +flow = Flow(start=load_data) +flow.run(shared) + +# Branching with actions +review - "approved" >> payment +review - "needs_revision" >> revise +review - "rejected" >> finish +revise >> review # Loop back + +flow = Flow(start=review) +``` + +## Quick Reference + +### 1. Basic Node Pattern + +```python +class LoadData(Node): + def post(self, shared, prep_res, exec_res): + shared["data"] = "Some text content" + return None + +class Summarize(Node): + def prep(self, shared): + return shared["data"] + + def exec(self, prep_res): + return call_llm(f"Summarize: {prep_res}") + + def post(self, shared, prep_res, exec_res): + shared["summary"] = exec_res + return "default" + +# Connect and run +load_data >> summarize +flow = Flow(start=load_data) +flow.run(shared) +``` + +### 2. Batch Processing + +**BatchNode** - Process large inputs in chunks: + +```python +class MapSummaries(BatchNode): + def prep(self, shared): + # Chunk big file + content = shared["data"] + chunk_size = 10000 + return [content[i:i+chunk_size] + for i in range(0, len(content), chunk_size)] + + def exec(self, chunk): + # Process each chunk + return call_llm(f"Summarize: {chunk}") + + def post(self, shared, prep_res, exec_res_list): + # Combine all results + shared["summary"] = "\n".join(exec_res_list) + return "default" +``` + +**BatchFlow** - Run flow multiple times with different parameters: + +```python +class SummarizeAllFiles(BatchFlow): + def prep(self, shared): + filenames = list(shared["data"].keys()) + # Return list of parameter dicts + return [{"filename": fn} for fn in filenames] + +class LoadFile(Node): + def prep(self, shared): + # Access filename from params + filename = self.params["filename"] + return filename +``` + +### 3. Agent Pattern + +```python +class DecideAction(Node): + def exec(self, inputs): + query, context = inputs + prompt = f""" +Given input: {query} +Previous search results: {context} +Should I: 1) Search web for more info 2) Answer with current knowledge + +Output in yaml: +```yaml +action: search/answer +reason: why this action +search_term: search phrase if action is search +```""" + resp = call_llm(prompt) + yaml_str = resp.split("```yaml")[1].split("```")[0] + action_data = yaml.safe_load(yaml_str) + return action_data + +# Build agent graph +decide >> search_web +decide - "answer" >> provide_answer +search_web >> decide # Loop back for more searches + +agent_flow = Flow(start=decide) +``` + +### 4. RAG (Retrieval Augmented Generation) + +**Stage 1: Offline Indexing** + +```python +class ChunkDocs(BatchNode): + def prep(self, shared): + return shared["files"] + + def exec(self, filepath): + with open(filepath, "r") as f: + text = f.read() + # Chunk by 100 chars + size = 100 + return [text[i:i+size] for i in range(0, len(text), size)] + + def post(self, shared, prep_res, exec_res_list): + shared["all_chunks"] = [c for chunks in exec_res_list + for c in chunks] + +chunk_docs >> embed_docs >> build_index +offline_flow = Flow(start=chunk_docs) +``` + +**Stage 2: Online Query** + +```python +class RetrieveDocs(Node): + def exec(self, inputs): + q_emb, index, chunks = inputs + I, D = search_index(index, q_emb, top_k=1) + return chunks[I[0][0]] + +embed_query >> retrieve_docs >> generate_answer +online_flow = Flow(start=embed_query) +``` + +### 5. Async & Parallel + +**AsyncNode** for I/O-bound operations: + +```python +class SummarizeThenVerify(AsyncNode): + async def prep_async(self, shared): + doc_text = await read_file_async(shared["doc_path"]) + return doc_text + + async def exec_async(self, prep_res): + summary = await call_llm_async(f"Summarize: {prep_res}") + return summary + + async def post_async(self, shared, prep_res, exec_res): + decision = await gather_user_feedback(exec_res) + if decision == "approve": + shared["summary"] = exec_res + return "default" + +# Must wrap in AsyncFlow +node = SummarizeThenVerify() +flow = AsyncFlow(start=node) +await flow.run_async(shared) +``` + +**AsyncParallelBatchNode** - Process multiple items concurrently: + +```python +class ParallelSummaries(AsyncParallelBatchNode): + async def prep_async(self, shared): + return shared["texts"] # List of texts + + async def exec_async(self, text): + # Runs in parallel for each text + return await call_llm_async(f"Summarize: {text}") + + async def post_async(self, shared, prep_res, exec_res_list): + shared["summary"] = "\n\n".join(exec_res_list) + return "default" +``` + +### 6. Workflow (Task Decomposition) + +```python +class GenerateOutline(Node): + def prep(self, shared): + return shared["topic"] + + def exec(self, topic): + return call_llm(f"Create outline for: {topic}") + + def post(self, shared, prep_res, exec_res): + shared["outline"] = exec_res + +class WriteSection(Node): + def exec(self, outline): + return call_llm(f"Write content: {outline}") + + def post(self, shared, prep_res, exec_res): + shared["draft"] = exec_res + +class ReviewAndRefine(Node): + def exec(self, draft): + return call_llm(f"Review and improve: {draft}") + +# Chain the workflow +outline >> write >> review +workflow = Flow(start=outline) +``` + +### 7. Structured Output + +```python +class SummarizeNode(Node): + def exec(self, prep_res): + prompt = f""" +Summarize the following text as YAML, with exactly 3 bullet points + +{prep_res} + +Output: +```yaml +summary: + - bullet 1 + - bullet 2 + - bullet 3 +```""" + response = call_llm(prompt) + yaml_str = response.split("```yaml")[1].split("```")[0].strip() + + import yaml + structured_result = yaml.safe_load(yaml_str) + + # Validate + assert "summary" in structured_result + assert isinstance(structured_result["summary"], list) + + return structured_result +``` + +**Why YAML?** Modern LLMs handle YAML better than JSON (less escaping issues). + +--- + +## 🍳 Cookbook: Real-World Examples + +This skill includes **6 production-ready examples** from the official PocketFlow cookbook, plus a complete **Python project template**. + +**📂 Location:** `assets/examples/` and `assets/template/` + +### Example 1: Interactive Chat Bot (☆☆☆) + +**File:** `assets/examples/01_chat.py` + +A chat bot with conversation history that loops back to itself. + +```python +# Key pattern: Self-looping node +chat_node = ChatNode() +chat_node - "continue" >> chat_node # Loop for continuous chat +flow = Flow(start=chat_node) +``` + +**What it demonstrates:** +- Message history management +- Self-looping nodes +- Graceful exit handling +- User input processing + +**Run it:** `python assets/examples/01_chat.py` + +--- + +### Example 2: Article Writing Workflow (☆☆☆) + +**File:** `assets/examples/02_workflow.py` + +Multi-step content creation: outline → draft → refine. + +```python +# Sequential pipeline +outline >> draft >> refine +flow = Flow(start=outline) +``` + +**What it demonstrates:** +- Task decomposition +- Sequential workflows +- Progressive content generation + +**Run it:** `python assets/examples/02_workflow.py "AI Safety"` + +--- + +### Example 3: Research Agent (☆☆☆) + +**File:** `assets/examples/03_agent.py` + +Agent that decides whether to search or answer. + +```python +# Branching based on decision +decide - "search" >> search +decide - "answer" >> answer +search - "continue" >> decide # Loop back +``` + +**What it demonstrates:** +- Dynamic action selection +- Branching logic +- Agent decision-making +- Iterative research loops + +**Run it:** `python assets/examples/03_agent.py "Nobel Prize 2024"` + +--- + +### Example 4: RAG System (☆☆☆) + +**File:** `assets/examples/04_rag.py` + +Complete two-stage RAG pipeline with offline indexing and online querying. + +```python +# Stage 1: Offline indexing +embed_docs >> build_index +offline_flow = Flow(start=embed_docs) + +# Stage 2: Online query +embed_query >> retrieve >> generate +online_flow = Flow(start=embed_query) +``` + +**What it demonstrates:** +- Document embedding and indexing +- Similarity search +- Context-based generation +- Multi-stage pipelines + +**Run it:** `python assets/examples/04_rag.py --"How to install PocketFlow?"` + +--- + +### Example 5: Structured Output Parser (☆☆☆) + +**File:** `assets/examples/05_structured_output.py` + +Resume parser extracting structured data with YAML. + +```python +# Parse YAML from LLM response +yaml_str = response.split("```yaml")[1].split("```")[0] +structured_result = yaml.safe_load(yaml_str) + +# Validate structure +assert "name" in structured_result +assert "email" in structured_result +``` + +**What it demonstrates:** +- Structured LLM responses with YAML +- Schema validation +- Retry logic for parsing +- Data extraction patterns + +**Run it:** `python assets/examples/05_structured_output.py` + +--- + +### Example 6: Multi-Agent Communication (★☆☆) + +**File:** `assets/examples/06_multi_agent.py` + +Two async agents playing Taboo word game. + +```python +# Agents with message queues +shared = { + "hinter_queue": asyncio.Queue(), + "guesser_queue": asyncio.Queue() +} + +# Run concurrently +await asyncio.gather( + hinter_flow.run_async(shared), + guesser_flow.run_async(shared) +) +``` + +**What it demonstrates:** +- AsyncNode for concurrent operations +- Message queues for inter-agent communication +- Multi-agent coordination +- Game logic with termination + +**Run it:** `python assets/examples/06_multi_agent.py` + +--- + +### Python Project Template + +**Location:** `assets/template/` + +Official best-practice template with complete project structure. + +**Files:** +- `main.py` - Entry point +- `flow.py` - Flow definition +- `nodes.py` - Node implementations +- `utils.py` - LLM wrappers +- `requirements.txt` - Dependencies + +**Quick Start:** +```bash +cd assets/template/ +pip install -r requirements.txt +# Edit utils.py to add your LLM API key +python main.py +``` + +**What it demonstrates:** +- Separation of concerns +- Factory pattern for flows +- Clean data flow with shared store +- Configuration best practices + +--- + +### Full Cookbook (47 Examples) + +The complete cookbook has **47 progressively complex examples** on GitHub: + +**Dummy Level (☆☆☆):** +Chat, Workflow, Agent, RAG, Map-Reduce, Streaming, Structured Output, Guardrails + +**Beginner Level (★☆☆):** +Multi-Agent, Supervisor, Parallel (3x/8x), Thinking (CoT), Memory, MCP, Tracing + +**Plus 30+ more advanced patterns:** +FastAPI integration, Code generator, Text-to-SQL, Voice chat, PDF vision, Website chatbot, and more. + +**Browse all:** https://github.com/The-Pocket/PocketFlow/tree/main/cookbook + +**Complete guide:** See `assets/COOKBOOK_GUIDE.md` for full index and learning path. + +--- + +## Design Patterns Summary + +| Pattern | Use Case | Key Component | +|---------|----------|---------------| +| **Agent** | Dynamic action selection | Action space + context management | +| **Workflow** | Multi-step task decomposition | Chained nodes | +| **RAG** | Context-aware answers | Offline indexing + online retrieval | +| **Map Reduce** | Large input processing | BatchNode with aggregation | +| **Multi-Agent** | Collaborative agents | Message queues + AsyncNode | +| **Structured Output** | Typed LLM responses | YAML prompting + validation | + +## Communication Patterns + +### Shared Store (Primary) + +```python +# Design data structure first +shared = { + "user": { + "id": "user123", + "context": { + "weather": {"temp": 72, "condition": "sunny"}, + "location": "San Francisco" + } + }, + "results": {} +} +``` + +**Best Practice:** Separate data schema from compute logic using shared store. + +### Params (For Batch Only) + +```python +class SummarizeFile(Node): + def prep(self, shared): + # Access node's params + filename = self.params["filename"] + return shared["data"].get(filename, "") + +# Set params +node = SummarizeFile() +node.set_params({"filename": "report.txt"}) +``` + +## Advanced Features + +### Fault Tolerance + +```python +# Automatic retries +my_node = SummarizeFile(max_retries=3, wait=10) + +# Graceful fallback +class ResilientNode(Node): + def exec_fallback(self, prep_res, exc): + # Return fallback instead of crashing + return "There was an error processing your request." +``` + +### Nested Flows + +```python +# Flows can act as nodes +node_a >> node_b +subflow = Flow(start=node_a) + +# Connect to other nodes +subflow >> node_c + +# Create parent flow +parent_flow = Flow(start=subflow) +``` + +### Multi-Agent Communication + +```python +class AgentNode(AsyncNode): + async def prep_async(self, _): + message_queue = self.params["messages"] + message = await message_queue.get() + print(f"Agent received: {message}") + return message + +# Create self-loop for continuous listening +agent = AgentNode() +agent >> agent +flow = AsyncFlow(start=agent) +``` + +## Utility Functions + +### LLM Wrappers + +```python +# OpenAI +def call_llm(prompt): + from openai import OpenAI + client = OpenAI(api_key="YOUR_API_KEY") + r = client.chat.completions.create( + model="gpt-4o", + messages=[{"role": "user", "content": prompt}] + ) + return r.choices[0].message.content + +# Anthropic Claude +def call_llm(prompt): + from anthropic import Anthropic + client = Anthropic(api_key="YOUR_API_KEY") + r = client.messages.create( + model="claude-sonnet-4-0", + messages=[{"role": "user", "content": prompt}] + ) + return r.content[0].text + +# Google Gemini +def call_llm(prompt): + from google import genai + client = genai.Client(api_key='GEMINI_API_KEY') + response = client.models.generate_content( + model='gemini-2.5-pro', + contents=prompt + ) + return response.text +``` + +### Embeddings + +```python +# OpenAI +from openai import OpenAI +client = OpenAI(api_key="YOUR_API_KEY") +response = client.embeddings.create( + model="text-embedding-ada-002", + input=text +) +embedding = response.data[0].embedding +``` + +### Text Chunking + +```python +# Fixed-size chunking +def fixed_size_chunk(text, chunk_size=100): + return [text[i:i+chunk_size] + for i in range(0, len(text), chunk_size)] + +# Sentence-based chunking +import nltk +def sentence_based_chunk(text, max_sentences=2): + sentences = nltk.sent_tokenize(text) + return [" ".join(sentences[i:i+max_sentences]) + for i in range(0, len(sentences), max_sentences)] +``` + +## Agentic Coding Guidelines + +**IMPORTANT for AI Agents building LLM systems:** + +1. **Start Simple** - Begin with the smallest solution first +2. **Design First** - Create high-level design (docs/design.md) before implementation +3. **Manual Testing** - Solve example inputs manually to develop intuition +4. **Iterate Frequently** - Expect hundreds of iterations on Steps 3-6 +5. **Ask Humans** - Request feedback and clarification regularly + +### Recommended Project Structure + +``` +my_project/ +├── main.py +├── nodes.py +├── flow.py +├── utils/ +│ ├── __init__.py +│ ├── call_llm.py +│ └── search_web.py +├── requirements.txt +└── docs/ + └── design.md +``` + +### Development Workflow + +```mermaid +flowchart LR + start[Start] --> batch[Batch] + batch --> check[Check] + check -->|OK| process + check -->|Error| fix[Fix] + fix --> check + + subgraph process[Process] + step1[Step 1] --> step2[Step 2] + end + + process --> endNode[End] +``` + +## Best Practices + +### Context Management (Agents) +- **Relevant & Minimal** - Retrieve most relevant via RAG, not entire history +- **Avoid "lost in the middle"** - LLMs overlook mid-prompt content even with large windows + +### Action Space Design (Agents) +- **Unambiguous** - Avoid overlapping actions (e.g., one `read_database` instead of separate `read_databases` and `read_csvs`) +- **Incremental** - Feed 500 lines or 1 page at a time, not all at once +- **Overview-zoom-in** - Show structure first (TOC, summary), then details +- **Parameterized** - Enable flexible actions with parameters (columns, SQL queries) +- **Backtracking** - Allow undo instead of full restart + +### Error Handling +- **No try/except in utilities** - Let Node retry mechanism handle failures +- **Use exec_fallback()** - Provide graceful degradation instead of crashes + +### Performance Tips +- **Batch APIs** - Use LLM batch inference for multiple prompts +- **Rate Limiting** - Use semaphores to avoid API limits +- **Parallel only for I/O** - Python GIL prevents true CPU parallelism +- **Independent tasks** - Don't parallelize dependent operations + +## Reference Files + +This skill includes comprehensive documentation in `references/core_abstraction.md`: + +- Node - Basic building block with prep/exec/post +- Flow - Orchestration and graph control +- Communication - Shared store vs params +- Batch - BatchNode and BatchFlow patterns +- Async - AsyncNode for I/O-bound tasks +- Parallel - AsyncParallelBatchNode/Flow +- Agent - Dynamic action selection +- Workflow - Task decomposition chains +- RAG - Retrieval augmented generation +- Map Reduce - Large input processing +- Structured Output - YAML-based schemas +- Multi-Agents - Inter-agent communication +- LLM Wrappers - OpenAI, Anthropic, Google, Azure +- Embeddings - Text embedding APIs +- Vector Databases - FAISS, Pinecone, Qdrant, etc. +- Web Search - Google, Bing, DuckDuckGo, Brave +- Text Chunking - Fixed-size and sentence-based +- Text-to-Speech - AWS Polly, Google Cloud, Azure, IBM +- Visualization - Mermaid diagrams and call stacks +- Agentic Coding - Development workflow guidance + +## Navigation Guide + +### For Beginners +1. Start with **Node** and **Flow** basics +2. Learn **Communication** (shared store) +3. Try simple **Workflow** example +4. Read **Agentic Coding** guidelines + +### For Specific Use Cases +- **Document processing** → Batch + Map Reduce +- **Question answering** → RAG +- **Dynamic task planning** → Agent +- **Multi-step pipelines** → Workflow +- **Real-time systems** → Async + Parallel +- **Collaborative AI** → Multi-Agents + +### For Advanced Users +- Nested flows for complex pipelines +- Custom fault tolerance with exec_fallback +- Parallel processing with rate limiting +- Multi-agent communication patterns +- Custom visualization and debugging tools + +## Common Pitfalls + +❌ **Don't** use Multi-Agents unless necessary - Start simple! +❌ **Don't** parallelize dependent operations +❌ **Don't** add try/except in utility functions called from exec() +❌ **Don't** use node.run() in production - Always use flow.run() +❌ **Don't** modify shared store in exec() - Use prep() and post() + +✅ **Do** design data schema before implementation +✅ **Do** use shared store for data, params for identifiers +✅ **Do** leverage built-in retry mechanisms +✅ **Do** validate structured output with assertions +✅ **Do** start with simplest solution and iterate + +## Resources + +**Official Docs:** https://the-pocket.github.io/PocketFlow/ + +**Framework Philosophy:** +- Minimalist (100 lines of core code) +- No vendor lock-in (implement your own utilities) +- Separation of concerns (graph + shared store) +- Graph-based workflow modeling + +--- + +*This skill was generated from PocketFlow official documentation. For detailed examples and complete API reference, see `references/core_abstraction.md`.* diff --git a/skills/pocketflow/assets/COOKBOOK_GUIDE.md b/skills/pocketflow/assets/COOKBOOK_GUIDE.md new file mode 100644 index 0000000..38d48a8 --- /dev/null +++ b/skills/pocketflow/assets/COOKBOOK_GUIDE.md @@ -0,0 +1,265 @@ +# PocketFlow Cookbook Guide + +Complete guide to the 47 real-world examples from the official PocketFlow cookbook. + +**Source:** https://github.com/The-Pocket/PocketFlow/tree/main/cookbook + +--- + +## 📚 Included Examples (6 Complete Implementations) + +This skill includes 6 fully-functional cookbook examples in `assets/examples/`: + +### 1. Chat Bot (☆☆☆ Dummy) +**File:** `01_chat.py` + +Interactive chat with conversation history. +- Self-looping node for continuous interaction +- Message history management +- Graceful exit handling + +**Run it:** +```bash +cd assets/examples/ +python 01_chat.py +``` + +--- + +### 2. Article Writing Workflow (☆☆☆ Dummy) +**File:** `02_workflow.py` + +Multi-step content creation pipeline. +- Generate outline +- Write draft +- Refine and polish + +**Run it:** +```bash +python 02_workflow.py "Your Topic Here" +``` + +--- + +### 3. Research Agent (☆☆☆ Dummy) +**File:** `03_agent.py` + +Agent with web search and decision-making. +- Dynamic action selection +- Branching logic (search vs answer) +- Iterative research loop + +**Run it:** +```bash +python 03_agent.py "Who won the Nobel Prize 2024?" +``` + +--- + +### 4. RAG System (☆☆☆ Dummy) +**File:** `04_rag.py` + +Complete retrieval-augmented generation. +- Offline: Document embedding and indexing +- Online: Query processing and answer generation +- Context-based responses + +**Run it:** +```bash +python 04_rag.py --"How to install PocketFlow?" +``` + +--- + +### 5. Structured Output Parser (☆☆☆ Dummy) +**File:** `05_structured_output.py` + +Resume parser with YAML output. +- Structured LLM responses +- Schema validation +- Skill matching with indexes + +**Run it:** +```bash +python 05_structured_output.py +``` + +--- + +### 6. Multi-Agent Game (★☆☆ Beginner) +**File:** `06_multi_agent.py` + +Two async agents playing Taboo. +- Async message queues +- Inter-agent communication +- Game logic with termination + +**Run it:** +```bash +python 06_multi_agent.py +``` + +--- + +## 🗺️ Full Cookbook Index (47 Examples) + +### Dummy Level (☆☆☆) - Foundational Patterns + +| Example | Description | Included | +|---------|-------------|----------| +| **Chat** | Basic chat bot with history | ✅ `01_chat.py` | +| **Structured Output** | Extract data with YAML | ✅ `05_structured_output.py` | +| **Workflow** | Multi-step article writing | ✅ `02_workflow.py` | +| **Agent** | Research agent with search | ✅ `03_agent.py` | +| **RAG** | Simple retrieval-augmented generation | ✅ `04_rag.py` | +| **Map-Reduce** | Batch processing pattern | 📖 [GitHub](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-mapreduce) | +| **Streaming** | Real-time LLM streaming | 📖 [GitHub](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-streaming) | +| **Chat Guardrail** | Travel advisor with filtering | 📖 [GitHub](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-chat-guardrail) | + +### Beginner Level (★☆☆) - Intermediate Patterns + +| Example | Description | Included | +|---------|-------------|----------| +| **Multi-Agent** | Async agents (Taboo game) | ✅ `06_multi_agent.py` | +| **Supervisor** | Research supervision | 📖 [GitHub](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-supervisor) | +| **Parallel (3x)** | 3x speedup with parallel | 📖 [GitHub](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-parallel) | +| **Parallel (8x)** | 8x speedup demonstration | 📖 [GitHub](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-parallel-flow) | +| **Thinking** | Chain-of-Thought reasoning | 📖 [GitHub](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-thinking) | +| **Memory** | Short & long-term memory | 📖 [GitHub](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-memory) | +| **MCP** | Model Context Protocol | 📖 [GitHub](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-mcp) | +| **Tracing** | Execution visualization | 📖 [GitHub](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-tracing) | + +### Additional Examples (47 total) + +Browse the complete cookbook on GitHub for all patterns including: + +**Core Patterns:** +- Node basics, Communication, Batch operations (Node, Flow, Standard) +- Async basics, Nested batches, Hello World, Majority vote + +**Integrations:** +- FastAPI (background, HITL, WebSocket) +- Gradio HITL, Streamlit, Google Calendar + +**Tools:** +- Web crawler, Database, Embeddings, PDF Vision, Search + +**Advanced:** +- Code generator, Text-to-SQL, Voice chat +- A2A (Agent-to-Agent), Website chatbot + +**Full List:** https://github.com/The-Pocket/PocketFlow/tree/main/cookbook + +--- + +## 🎓 Learning Path + +### Step 1: Start with Dummy Level +1. **01_chat.py** - Learn self-looping and state management +2. **02_workflow.py** - Understand sequential flows +3. **03_agent.py** - See branching and decision-making +4. **04_rag.py** - Multi-stage pipelines (offline + online) +5. **05_structured_output.py** - Structured LLM responses + +### Step 2: Progress to Beginner Level +6. **06_multi_agent.py** - Async communication between agents + +### Step 3: Explore GitHub Cookbook +- Browse all 47 examples for advanced patterns +- Find examples matching your use case +- Study progressively more complex implementations + +--- + +## 💡 How to Use These Examples + +### Run Locally +```bash +cd assets/examples/ + +# Make sure you have pocketflow installed +pip install pocketflow + +# Run any example +python 01_chat.py +python 02_workflow.py "My Topic" +python 03_agent.py "My Question" +``` + +### Modify for Your Needs +1. Copy example to your project +2. Implement `call_llm()` in a utils.py file +3. Customize prompts and logic +4. Add your business requirements + +### Learn Patterns +- Study the code structure +- See how nodes are connected +- Understand shared store usage +- Learn error handling approaches + +--- + +## 🛠️ Python Template + +Use the official Python template as your starting point: + +**Location:** `assets/template/` + +**Files:** +- `main.py` - Entry point +- `flow.py` - Flow definition +- `nodes.py` - Node implementations +- `utils.py` - LLM wrappers +- `requirements.txt` - Dependencies + +**Quick Start:** +```bash +cd assets/template/ +pip install -r requirements.txt + +# Edit utils.py to add your LLM provider +# Then run: +python main.py +``` + +--- + +## 📖 Additional Resources + +- **Official Docs:** https://the-pocket.github.io/PocketFlow/ +- **GitHub Repo:** https://github.com/The-Pocket/PocketFlow +- **Full Cookbook:** https://github.com/The-Pocket/PocketFlow/tree/main/cookbook +- **Python Template:** https://github.com/The-Pocket/PocketFlow-Template-Python + +--- + +## 🎯 Quick Reference: Which Example for What? + +| Need | Use Example | +|------|-------------| +| Interactive chat | `01_chat.py` | +| Content generation pipeline | `02_workflow.py` | +| Decision-making agent | `03_agent.py` | +| Document Q&A | `04_rag.py` | +| Parse/extract data | `05_structured_output.py` | +| Multiple agents | `06_multi_agent.py` | +| Batch processing | Map-Reduce (GitHub) | +| Real-time streaming | Streaming (GitHub) | +| Memory/context | Memory (GitHub) | +| Parallel speedup | Parallel examples (GitHub) | + +--- + +## ✅ Next Steps + +1. **Pick an example** that matches your use case +2. **Run it** to see how it works +3. **Study the code** to understand patterns +4. **Copy and modify** for your project +5. **Implement** your LLM provider +6. **Iterate** and improve! + +--- + +*This guide covers the 6 included examples plus references to all 47 cookbook patterns. All examples are production-ready and demonstrate PocketFlow best practices.* diff --git a/skills/pocketflow/assets/common_patterns.py b/skills/pocketflow/assets/common_patterns.py new file mode 100644 index 0000000..24c6684 --- /dev/null +++ b/skills/pocketflow/assets/common_patterns.py @@ -0,0 +1,285 @@ +""" +Common PocketFlow Patterns + +Ready-to-use examples for common use cases +""" + +from pocketflow import Node, BatchNode, Flow, BatchFlow +# from utils.call_llm import call_llm # Implement your LLM wrapper + + +# ============================================================ +# Pattern 1: Simple Sequential Workflow +# ============================================================ + +class LoadDataNode(Node): + """Load data from file/API/database""" + def prep(self, shared): + return shared["source_path"] + + def exec(self, path): + # TODO: Implement your data loading + with open(path, 'r') as f: + return f.read() + + def post(self, shared, prep_res, exec_res): + shared["raw_data"] = exec_res + return "default" + + +class ProcessDataNode(Node): + """Process the data""" + def prep(self, shared): + return shared["raw_data"] + + def exec(self, data): + # TODO: Your processing logic + processed = data.upper() # Example + return processed + + def post(self, shared, prep_res, exec_res): + shared["processed_data"] = exec_res + return "default" + + +class SaveResultNode(Node): + """Save results""" + def post(self, shared, prep_res, exec_res): + result = shared["processed_data"] + # TODO: Save to file/API/database + print(f"Saved: {result}") + return "default" + + +# Build flow +load = LoadDataNode() +process = ProcessDataNode() +save = SaveResultNode() +load >> process >> save +simple_flow = Flow(start=load) + + +# ============================================================ +# Pattern 2: Batch Processing (Map-Reduce) +# ============================================================ + +class ChunkAndSummarize(BatchNode): + """Chunk large text and summarize each chunk""" + + def prep(self, shared): + # Split into chunks + text = shared["large_text"] + chunk_size = 1000 + chunks = [text[i:i+chunk_size] + for i in range(0, len(text), chunk_size)] + return chunks + + def exec(self, chunk): + # Process each chunk + # summary = call_llm(f"Summarize: {chunk}") + summary = f"Summary of: {chunk[:50]}..." # Placeholder + return summary + + def post(self, shared, prep_res, exec_res_list): + # Combine all summaries + shared["summaries"] = exec_res_list + shared["combined_summary"] = "\n\n".join(exec_res_list) + return "default" + + +# ============================================================ +# Pattern 3: Agent with Decision Making +# ============================================================ + +class DecideActionNode(Node): + """Agent decides what action to take""" + + def prep(self, shared): + return shared.get("context", ""), shared["query"] + + def exec(self, inputs): + context, query = inputs + + # Simplified decision logic + # In real implementation, use LLM to decide + if "search" in query.lower(): + return {"action": "search", "term": query} + else: + return {"action": "answer", "response": f"Answer for: {query}"} + + def post(self, shared, prep_res, exec_res): + shared["decision"] = exec_res + return exec_res["action"] # Return action for branching + + +class SearchNode(Node): + """Search for information""" + def exec(self, prep_res): + term = self.shared.get("decision", {}).get("term") + # TODO: Implement search + return f"Search results for: {term}" + + def post(self, shared, prep_res, exec_res): + shared["context"] = exec_res + return "continue" + + +class AnswerNode(Node): + """Provide final answer""" + def prep(self, shared): + return shared.get("decision", {}).get("response") + + def post(self, shared, prep_res, exec_res): + shared["final_answer"] = prep_res + return "done" + + +# Build agent flow +decide = DecideActionNode() +search = SearchNode() +answer = AnswerNode() + +decide - "search" >> search +decide - "answer" >> answer +search - "continue" >> decide # Loop back for more decisions + +agent_flow = Flow(start=decide) + + +# ============================================================ +# Pattern 4: RAG (Retrieval Augmented Generation) +# ============================================================ + +class ChunkDocuments(BatchNode): + """Chunk documents for indexing""" + + def prep(self, shared): + return shared["documents"] # List of documents + + def exec(self, doc): + # Chunk each document + chunk_size = 500 + chunks = [doc[i:i+chunk_size] + for i in range(0, len(doc), chunk_size)] + return chunks + + def post(self, shared, prep_res, exec_res_list): + # Flatten all chunks + all_chunks = [chunk for doc_chunks in exec_res_list + for chunk in doc_chunks] + shared["chunks"] = all_chunks + return "default" + + +class EmbedAndIndex(Node): + """Embed chunks and create index""" + + def prep(self, shared): + return shared["chunks"] + + def exec(self, chunks): + # TODO: Create embeddings and build index + # embeddings = [get_embedding(chunk) for chunk in chunks] + # index = build_faiss_index(embeddings) + return "index_placeholder" + + def post(self, shared, prep_res, exec_res): + shared["index"] = exec_res + return "default" + + +class QueryRAG(Node): + """Query the RAG system""" + + def prep(self, shared): + return shared["query"], shared["index"], shared["chunks"] + + def exec(self, inputs): + query, index, chunks = inputs + # TODO: Search index and retrieve relevant chunks + # relevant = search_index(index, query, top_k=3) + relevant = chunks[:3] # Placeholder + + # Generate answer with context + context = "\n".join(relevant) + # answer = call_llm(f"Context: {context}\n\nQuestion: {query}") + answer = f"Answer based on context" + return answer + + def post(self, shared, prep_res, exec_res): + shared["answer"] = exec_res + return "default" + + +# Build RAG flow +chunk = ChunkDocuments() +index = EmbedAndIndex() +chunk >> index +rag_indexing_flow = Flow(start=chunk) + +query = QueryRAG() +rag_query_flow = Flow(start=query) + + +# ============================================================ +# Pattern 5: Error Handling with Fallback +# ============================================================ + +class ResilientNode(Node): + """Node with error handling""" + + def __init__(self): + super().__init__(max_retries=3, wait=5) + + def exec(self, prep_res): + # Risky operation that might fail + # result = call_external_api(prep_res) + result = "Success" + return result + + def exec_fallback(self, prep_res, exc): + """Graceful degradation""" + print(f"Primary method failed: {exc}") + # Return cached/default result + return "Fallback result" + + def post(self, shared, prep_res, exec_res): + shared["result"] = exec_res + return "default" + + +# ============================================================ +# Usage Examples +# ============================================================ + +if __name__ == "__main__": + print("Common PocketFlow Patterns") + print("="*50) + + # Example 1: Simple workflow + print("\n1. Simple Sequential Workflow") + shared1 = {"source_path": "data.txt"} + # simple_flow.run(shared1) + + # Example 2: Batch processing + print("\n2. Batch Processing") + shared2 = {"large_text": "..." * 1000} + # batch_node = ChunkAndSummarize() + # batch_node.run(shared2) + + # Example 3: Agent + print("\n3. Agent with Decision Making") + shared3 = {"query": "Search for PocketFlow"} + # agent_flow.run(shared3) + + # Example 4: RAG + print("\n4. RAG Pattern") + shared4 = { + "documents": ["doc1", "doc2", "doc3"], + "query": "What is PocketFlow?" + } + # rag_indexing_flow.run(shared4) + # rag_query_flow.run(shared4) + + print("\n✅ All patterns defined!") + print("Uncomment the flow.run() calls to execute") diff --git a/skills/pocketflow/assets/examples/01_chat.py b/skills/pocketflow/assets/examples/01_chat.py new file mode 100644 index 0000000..2354cfe --- /dev/null +++ b/skills/pocketflow/assets/examples/01_chat.py @@ -0,0 +1,85 @@ +""" +PocketFlow Cookbook Example: Interactive Chat Bot + +Difficulty: ☆☆☆ Dummy Level +Source: https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-chat + +Description: +A basic chat bot with conversation history. Demonstrates: +- Self-looping nodes for continuous interaction +- Message history management +- User input handling +- Graceful exit conditions +""" + +from pocketflow import Node, Flow +# from utils import call_llm # You need to implement this + + +class ChatNode(Node): + """Interactive chat node that maintains conversation history""" + + def prep(self, shared): + """Get user input and maintain message history""" + # Initialize messages if this is the first run + if "messages" not in shared: + shared["messages"] = [] + print("Welcome to the chat! Type 'exit' to end the conversation.") + + # Get user input + user_input = input("\nYou: ") + + # Check if user wants to exit + if user_input.lower() == 'exit': + return None + + # Add user message to history + shared["messages"].append({"role": "user", "content": user_input}) + + # Return all messages for the LLM + return shared["messages"] + + def exec(self, messages): + """Call LLM with conversation history""" + if messages is None: + return None + + # Call LLM with the entire conversation history + # response = call_llm(messages) + response = "This is a placeholder response. Implement call_llm()." + return response + + def post(self, shared, prep_res, exec_res): + """Display response and continue or end conversation""" + if prep_res is None or exec_res is None: + print("\nGoodbye!") + return None # End the conversation + + # Print the assistant's response + print(f"\nAssistant: {exec_res}") + + # Add assistant message to history + shared["messages"].append({"role": "assistant", "content": exec_res}) + + # Loop back to continue the conversation + return "continue" + + +# Build the flow with self-loop +def create_chat_flow(): + """Create a chat flow that loops back to itself""" + chat_node = ChatNode() + chat_node - "continue" >> chat_node # Loop back to continue conversation + + flow = Flow(start=chat_node) + return flow + + +# Example usage +if __name__ == "__main__": + shared = {} + flow = create_chat_flow() + flow.run(shared) + + # Conversation history is preserved in shared["messages"] + print(f"\n\nTotal messages: {len(shared.get('messages', []))}") diff --git a/skills/pocketflow/assets/examples/02_workflow.py b/skills/pocketflow/assets/examples/02_workflow.py new file mode 100644 index 0000000..c47f19f --- /dev/null +++ b/skills/pocketflow/assets/examples/02_workflow.py @@ -0,0 +1,120 @@ +""" +PocketFlow Cookbook Example: Article Writing Workflow + +Difficulty: ☆☆☆ Dummy Level +Source: https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-workflow + +Description: +A writing workflow that outlines, writes content, and applies styling. +Demonstrates: +- Sequential multi-step workflow +- Progressive content generation +- Task decomposition pattern +""" + +from pocketflow import Node, Flow +# from utils import call_llm # You need to implement this + + +class GenerateOutlineNode(Node): + """Generate article outline from topic""" + + def prep(self, shared): + return shared["topic"] + + def exec(self, topic): + """Create outline with LLM""" + prompt = f"Create a detailed outline for an article about: {topic}" + # outline = call_llm(prompt) + outline = f"Outline for {topic}:\n1. Introduction\n2. Main Points\n3. Conclusion" + print(f"\n📋 Outline Generated ({len(outline)} chars)") + return outline + + def post(self, shared, prep_res, exec_res): + shared["outline"] = exec_res + return "default" + + +class WriteDraftNode(Node): + """Write article draft from outline""" + + def prep(self, shared): + return shared["outline"] + + def exec(self, outline): + """Generate content based on outline""" + prompt = f"Write content based on this outline:\n{outline}" + # draft = call_llm(prompt) + draft = f"Draft article based on outline:\n\n{outline}\n\n[Article content here...]" + print(f"\n✍️ Draft Written ({len(draft)} chars)") + return draft + + def post(self, shared, prep_res, exec_res): + shared["draft"] = exec_res + return "default" + + +class RefineArticleNode(Node): + """Polish and refine the draft""" + + def prep(self, shared): + return shared["draft"] + + def exec(self, draft): + """Improve draft quality""" + prompt = f"Review and improve this draft:\n{draft}" + # final = call_llm(prompt) + final = f"Refined version:\n\n{draft}\n\n[Enhanced with better flow and clarity]" + print(f"\n✨ Article Refined ({len(final)} chars)") + return final + + def post(self, shared, prep_res, exec_res): + shared["final_article"] = exec_res + print("\n✅ Article Complete!") + return "default" + + +# Build the workflow +def create_article_flow(): + """Create sequential article writing workflow""" + outline = GenerateOutlineNode() + draft = WriteDraftNode() + refine = RefineArticleNode() + + # Sequential pipeline + outline >> draft >> refine + + flow = Flow(start=outline) + return flow + + +# Example usage +def run_flow(topic="AI Safety"): + """Run the article writing workflow""" + shared = {"topic": topic} + + print(f"\n=== Starting Article Workflow: {topic} ===\n") + + flow = create_article_flow() + flow.run(shared) + + # Output summary + print("\n=== Workflow Statistics ===") + print(f"Topic: {shared['topic']}") + print(f"Outline: {len(shared['outline'])} characters") + print(f"Draft: {len(shared['draft'])} characters") + print(f"Final: {len(shared['final_article'])} characters") + + return shared + + +if __name__ == "__main__": + import sys + + # Get topic from command line or use default + topic = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else "AI Safety" + result = run_flow(topic) + + # Print final article + print("\n=== Final Article ===") + print(result["final_article"]) diff --git a/skills/pocketflow/assets/examples/03_agent.py b/skills/pocketflow/assets/examples/03_agent.py new file mode 100644 index 0000000..563555d --- /dev/null +++ b/skills/pocketflow/assets/examples/03_agent.py @@ -0,0 +1,165 @@ +""" +PocketFlow Cookbook Example: Research Agent + +Difficulty: ☆☆☆ Dummy Level +Source: https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-agent + +Description: +A research agent that can search the web and answer questions. +Demonstrates: +- Agent pattern with dynamic action selection +- Branching based on decisions +- Loop-back for iterative research +- Tool usage (web search) +""" + +from pocketflow import Node, Flow +# from utils import call_llm, search_web # You need to implement these + + +class DecideActionNode(Node): + """Agent decides whether to search or answer""" + + def prep(self, shared): + return { + "question": shared["question"], + "context": shared.get("context", "No information gathered yet") + } + + def exec(self, inputs): + """Decide next action using LLM""" + question = inputs["question"] + context = inputs["context"] + + prompt = f""" +Given: +Question: {question} +Current Context: {context} + +Should I: +1. Search web for more information +2. Answer with current knowledge + +Output in format: +Action: search/answer +Reasoning: [why] +Search Query: [if action is search] +""" + # response = call_llm(prompt) + # Parse response to get action + + # Placeholder logic + if not context or "No information" in context: + action = "search" + search_query = question + else: + action = "answer" + search_query = None + + print(f"\n🤔 Agent decided: {action}") + + return { + "action": action, + "search_query": search_query + } + + def post(self, shared, prep_res, exec_res): + shared["decision"] = exec_res + # Branch based on action + return exec_res["action"] + + +class SearchWebNode(Node): + """Search the web for information""" + + def prep(self, shared): + return shared["decision"]["search_query"] + + def exec(self, query): + """Perform web search""" + print(f"\n🔍 Searching: {query}") + # results = search_web(query) + results = f"Search results for '{query}':\n- Result 1\n- Result 2\n- Result 3" + return results + + def post(self, shared, prep_res, exec_res): + # Add to context + current_context = shared.get("context", "") + shared["context"] = current_context + "\n\n" + exec_res + print(f"\n📚 Context updated ({len(shared['context'])} chars)") + # Loop back to decide again + return "continue" + + +class AnswerNode(Node): + """Generate final answer""" + + def prep(self, shared): + return { + "question": shared["question"], + "context": shared.get("context", "") + } + + def exec(self, inputs): + """Generate answer from context""" + prompt = f""" +Context: {inputs['context']} + +Question: {inputs['question']} + +Provide a comprehensive answer: +""" + # answer = call_llm(prompt) + answer = f"Based on the research, here's the answer to '{inputs['question']}':\n\n[Answer based on context]" + return answer + + def post(self, shared, prep_res, exec_res): + shared["final_answer"] = exec_res + print(f"\n✅ Answer generated") + return "done" + + +# Build the agent flow +def create_agent_flow(): + """Create research agent with branching and looping""" + decide = DecideActionNode() + search = SearchWebNode() + answer = AnswerNode() + + # Branching: decide can lead to search or answer + decide - "search" >> search + decide - "answer" >> answer + + # Loop: search leads back to decide + search - "continue" >> decide + + flow = Flow(start=decide) + return flow + + +# Example usage +def main(): + """Run the research agent""" + # Default question + question = "Who won the Nobel Prize in Physics 2024?" + + # Get question from command line if provided + import sys + if len(sys.argv) > 1: + question = " ".join(sys.argv[1:]) + + shared = {"question": question} + + print(f"\n🤔 Processing question: {question}") + print("="*50) + + flow = create_agent_flow() + flow.run(shared) + + print("\n" + "="*50) + print("\n🎯 Final Answer:") + print(shared.get("final_answer", "No answer found")) + + +if __name__ == "__main__": + main() diff --git a/skills/pocketflow/assets/examples/04_rag.py b/skills/pocketflow/assets/examples/04_rag.py new file mode 100644 index 0000000..7fcab05 --- /dev/null +++ b/skills/pocketflow/assets/examples/04_rag.py @@ -0,0 +1,226 @@ +""" +PocketFlow Cookbook Example: RAG (Retrieval Augmented Generation) + +Difficulty: ☆☆☆ Dummy Level +Source: https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-rag + +Description: +A simple RAG system with offline indexing and online querying. +Demonstrates: +- Two-stage RAG pipeline (offline + online) +- Document embedding and indexing +- Similarity search +- Context-based answer generation +""" + +from pocketflow import Node, Flow +# from utils import call_llm, get_embedding, build_index, search_index +import sys + + +# ============================================================ +# OFFLINE FLOW: Index Documents +# ============================================================ + +class EmbedDocumentsNode(Node): + """Embed all documents for indexing""" + + def prep(self, shared): + return shared["texts"] + + def exec(self, texts): + """Generate embeddings for all texts""" + print(f"\n📊 Embedding {len(texts)} documents...") + # embeddings = [get_embedding(text) for text in texts] + embeddings = [[0.1] * 128 for _ in texts] # Placeholder + return embeddings + + def post(self, shared, prep_res, exec_res): + shared["embeddings"] = exec_res + print(f"✅ Embedded {len(exec_res)} documents") + return "default" + + +class BuildIndexNode(Node): + """Build search index from embeddings""" + + def prep(self, shared): + return shared["embeddings"] + + def exec(self, embeddings): + """Create vector index""" + print(f"\n🔨 Building index...") + # index = build_faiss_index(embeddings) + index = "placeholder_index" # Placeholder + return index + + def post(self, shared, prep_res, exec_res): + shared["index"] = exec_res + print("✅ Index built") + return "default" + + +# Build offline flow +embed_docs = EmbedDocumentsNode() +build_index = BuildIndexNode() +embed_docs >> build_index +offline_flow = Flow(start=embed_docs) + + +# ============================================================ +# ONLINE FLOW: Query and Answer +# ============================================================ + +class EmbedQueryNode(Node): + """Embed the user query""" + + def prep(self, shared): + return shared["query"] + + def exec(self, query): + """Generate query embedding""" + print(f"\n🔍 Processing query: {query}") + # query_embedding = get_embedding(query) + query_embedding = [0.1] * 128 # Placeholder + return query_embedding + + def post(self, shared, prep_res, exec_res): + shared["query_embedding"] = exec_res + return "default" + + +class RetrieveDocumentNode(Node): + """Search index and retrieve most relevant document""" + + def prep(self, shared): + return { + "query_embedding": shared["query_embedding"], + "index": shared["index"], + "texts": shared["texts"] + } + + def exec(self, inputs): + """Find most similar document""" + print(f"\n📚 Searching index...") + # I, D = search_index(inputs["index"], inputs["query_embedding"], top_k=1) + # best_doc = inputs["texts"][I[0][0]] + + # Placeholder: return first document + best_doc = inputs["texts"][0] + + print(f"✅ Retrieved document ({len(best_doc)} chars)") + return best_doc + + def post(self, shared, prep_res, exec_res): + shared["retrieved_document"] = exec_res + return "default" + + +class GenerateAnswerNode(Node): + """Generate answer using retrieved context""" + + def prep(self, shared): + return { + "query": shared["query"], + "context": shared["retrieved_document"] + } + + def exec(self, inputs): + """Generate answer with context""" + print(f"\n✍️ Generating answer...") + + prompt = f""" +Context: {inputs['context']} + +Question: {inputs['query']} + +Answer the question using only the information from the context: +""" + # answer = call_llm(prompt) + answer = f"Based on the context, the answer is: [Answer would be generated here]" + return answer + + def post(self, shared, prep_res, exec_res): + shared["generated_answer"] = exec_res + print(f"✅ Answer generated") + return "default" + + +# Build online flow +embed_query = EmbedQueryNode() +retrieve = RetrieveDocumentNode() +generate = GenerateAnswerNode() +embed_query >> retrieve >> generate +online_flow = Flow(start=embed_query) + + +# ============================================================ +# Main Demo +# ============================================================ + +def run_rag_demo(): + """Run complete RAG demonstration""" + + # Sample documents + texts = [ + """Pocket Flow is a 100-line minimalist LLM framework. + Lightweight: Just 100 lines. Zero bloat, zero dependencies, zero vendor lock-in. + Expressive: Everything you love—(Multi-)Agents, Workflow, RAG, and more. + Agentic Coding: Let AI Agents (e.g., Cursor AI) build Agents—10x productivity boost! + To install, pip install pocketflow or just copy the source code (only 100 lines).""", + + """NeurAlign M7 is a revolutionary non-invasive neural alignment device. + Targeted magnetic resonance technology increases neuroplasticity in specific brain regions. + Clinical trials showed 72% improvement in PTSD treatment outcomes. + Developed by Cortex Medical in 2024 as an adjunct to standard cognitive therapy. + Portable design allows for in-home use with remote practitioner monitoring.""", + + """Q-Mesh is QuantumLeap Technologies' instantaneous data synchronization protocol. + Utilizes directed acyclic graph consensus for 500,000 transactions per second. + Consumes 95% less energy than traditional blockchain systems. + Adopted by three central banks for secure financial data transfer. + Released in February 2024 after five years of development in stealth mode.""", + ] + + # Get query from command line or use default + default_query = "How to install PocketFlow?" + query = default_query + + for arg in sys.argv[1:]: + if arg.startswith("--"): + query = arg[2:] + break + + print("=" * 60) + print("PocketFlow RAG Demo") + print("=" * 60) + + # Single shared store for both flows + shared = { + "texts": texts, + "query": query + } + + # Stage 1: Index documents (offline) + print("\n📥 STAGE 1: Indexing Documents") + print("-" * 60) + offline_flow.run(shared) + + # Stage 2: Query and answer (online) + print("\n🔍 STAGE 2: Query and Answer") + print("-" * 60) + online_flow.run(shared) + + # Display results + print("\n" + "=" * 60) + print("✅ RAG Complete") + print("=" * 60) + print(f"\nQuery: {shared['query']}") + print(f"\nRetrieved Context Preview:") + print(shared["retrieved_document"][:150] + "...") + print(f"\nGenerated Answer:") + print(shared["generated_answer"]) + + +if __name__ == "__main__": + run_rag_demo() diff --git a/skills/pocketflow/assets/examples/05_structured_output.py b/skills/pocketflow/assets/examples/05_structured_output.py new file mode 100644 index 0000000..160e171 --- /dev/null +++ b/skills/pocketflow/assets/examples/05_structured_output.py @@ -0,0 +1,175 @@ +""" +PocketFlow Cookbook Example: Structured Output (Resume Parser) + +Difficulty: ☆☆☆ Dummy Level +Source: https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-structured-output + +Description: +Extract structured data from resumes using YAML prompting. +Demonstrates: +- Structured LLM output with YAML +- Schema validation with assertions +- Retry logic for parsing errors +- Index-based skill matching +""" + +import yaml +from pocketflow import Node, Flow +# from utils import call_llm # You need to implement this + + +class ResumeParserNode(Node): + """Parse resume text into structured YAML format""" + + def prep(self, shared): + return { + "resume_text": shared["resume_text"], + "target_skills": shared.get("target_skills", []) + } + + def exec(self, prep_res): + """Extract structured data from resume""" + resume_text = prep_res["resume_text"] + target_skills = prep_res["target_skills"] + + # Create skill list with indexes for prompt + skill_list_for_prompt = "\n".join( + [f"{i}: {skill}" for i, skill in enumerate(target_skills)] + ) + + prompt = f""" +Analyze the resume below. Output ONLY the requested information in YAML format. + +**Resume:** +``` +{resume_text} +``` + +**Target Skills (use these indexes):** +``` +{skill_list_for_prompt} +``` + +**YAML Output Requirements:** +- Extract `name` (string) +- Extract `email` (string) +- Extract `experience` (list of objects with `title` and `company`) +- Extract `skill_indexes` (list of integers found from the Target Skills list) +- **Add a YAML comment (`#`) explaining the source BEFORE each field** + +Generate the YAML output now: +""" + + # Get LLM response + # response = call_llm(prompt) + + # Placeholder response + response = """ +```yaml +# Extracted from header +name: John Smith + +# Found in contact section +email: john.smith@email.com + +# Work history section +experience: + - title: Senior Developer + company: Tech Corp + - title: Software Engineer + company: StartupXYZ + +# Skills matching target list +skill_indexes: [0, 2, 5] # Team leadership, Project management, Python +``` +""" + + # Parse YAML from response + yaml_str = response.split("```yaml")[1].split("```")[0].strip() + structured_result = yaml.safe_load(yaml_str) + + # Validate structure + assert structured_result is not None, "Parsed YAML is None" + assert "name" in structured_result, "Missing 'name'" + assert "email" in structured_result, "Missing 'email'" + assert "experience" in structured_result, "Missing 'experience'" + assert isinstance(structured_result.get("experience"), list), "'experience' is not a list" + assert "skill_indexes" in structured_result, "Missing 'skill_indexes'" + + return structured_result + + def post(self, shared, prep_res, exec_res): + """Store and display structured data""" + shared["structured_data"] = exec_res + + print("\n=== STRUCTURED RESUME DATA ===\n") + print(yaml.dump(exec_res, sort_keys=False, allow_unicode=True, + default_flow_style=None)) + print("\n✅ Extracted resume information.\n") + + return "default" + + +# Example usage +def run_parser(): + """Run resume parser demo""" + + # Sample resume text + sample_resume = """ + JOHN SMITH + Email: john.smith@email.com | Phone: (555) 123-4567 + + EXPERIENCE + Senior Developer - Tech Corp (2020-Present) + - Led team of 5 developers + - Built scalable Python applications + - Managed multiple projects simultaneously + + Software Engineer - StartupXYZ (2018-2020) + - Developed web applications + - Collaborated with cross-functional teams + - Presented technical solutions to stakeholders + + SKILLS + - Team Leadership & Management + - Python, JavaScript, SQL + - Project Management + - Public Speaking + - CRM Software + - Data Analysis + """ + + # Target skills to match + target_skills = [ + "Team leadership & management", + "CRM software", + "Project management", + "Public speaking", + "Microsoft Office", + "Python", + "Data Analysis" + ] + + # Prepare shared store + shared = { + "resume_text": sample_resume, + "target_skills": target_skills + } + + # Create and run flow + parser_node = ResumeParserNode(max_retries=3, wait=10) + flow = Flow(start=parser_node) + flow.run(shared) + + # Display matched skills + if "structured_data" in shared: + found_indexes = shared["structured_data"].get("skill_indexes", []) + if found_indexes: + print("\n--- Matched Target Skills ---") + for index in found_indexes: + if 0 <= index < len(target_skills): + print(f"✓ {target_skills[index]} (Index: {index})") + + +if __name__ == "__main__": + run_parser() diff --git a/skills/pocketflow/assets/examples/06_multi_agent.py b/skills/pocketflow/assets/examples/06_multi_agent.py new file mode 100644 index 0000000..3e4817a --- /dev/null +++ b/skills/pocketflow/assets/examples/06_multi_agent.py @@ -0,0 +1,153 @@ +""" +PocketFlow Cookbook Example: Multi-Agent (Taboo Game) + +Difficulty: ★☆☆ Beginner Level +Source: https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-multi-agent + +Description: +Two agents playing Taboo word game with async communication. +Demonstrates: +- Multi-agent systems +- Async message queues for inter-agent communication +- AsyncNode and AsyncFlow +- Self-looping async nodes +- Game logic with termination conditions +""" + +import asyncio +from pocketflow import AsyncNode, AsyncFlow +# from utils import call_llm # You need to implement this + + +class AsyncHinter(AsyncNode): + """Agent that provides hints for the target word""" + + async def prep_async(self, shared): + """Wait for guess from guesser""" + guess = await shared["hinter_queue"].get() + + if guess == "GAME_OVER": + return None + + return ( + shared["target_word"], + shared["forbidden_words"], + shared.get("past_guesses", []) + ) + + async def exec_async(self, inputs): + """Generate hint avoiding forbidden words""" + if inputs is None: + return None + + target, forbidden, past_guesses = inputs + + prompt = f"Generate hint for '{target}'\nForbidden words: {forbidden}" + if past_guesses: + prompt += f"\nPrevious wrong guesses: {past_guesses}\nMake hint more specific." + prompt += "\nUse at most 5 words." + + # hint = call_llm(prompt) + hint = "Thinking of childhood summer days" # Placeholder + + print(f"\nHinter: Here's your hint - {hint}") + return hint + + async def post_async(self, shared, prep_res, exec_res): + """Send hint to guesser""" + if exec_res is None: + return "end" + + # Send hint to guesser's queue + await shared["guesser_queue"].put(exec_res) + return "continue" + + +class AsyncGuesser(AsyncNode): + """Agent that guesses the target word from hints""" + + async def prep_async(self, shared): + """Wait for hint from hinter""" + hint = await shared["guesser_queue"].get() + return hint, shared.get("past_guesses", []) + + async def exec_async(self, inputs): + """Make a guess based on hint""" + hint, past_guesses = inputs + + prompt = f""" +Given hint: {hint} +Past wrong guesses: {past_guesses} +Make a new guess. Reply with a single word: +""" + # guess = call_llm(prompt) + guess = "memories" # Placeholder + + print(f"Guesser: I guess it's - {guess}") + return guess + + async def post_async(self, shared, prep_res, exec_res): + """Check guess and update game state""" + # Check if correct + if exec_res.lower() == shared["target_word"].lower(): + print("\n✅ Game Over - Correct guess!") + await shared["hinter_queue"].put("GAME_OVER") + return "end" + + # Store wrong guess + if "past_guesses" not in shared: + shared["past_guesses"] = [] + shared["past_guesses"].append(exec_res) + + # Send guess to hinter + await shared["hinter_queue"].put(exec_res) + return "continue" + + +async def main(): + """Run the Taboo game""" + + # Game setup + shared = { + "target_word": "nostalgia", + "forbidden_words": ["memory", "past", "remember", "feeling", "longing"], + "hinter_queue": asyncio.Queue(), + "guesser_queue": asyncio.Queue() + } + + print("\n" + "="*50) + print("🎮 Taboo Game Starting!") + print("="*50) + print(f"Target word: {shared['target_word']}") + print(f"Forbidden words: {shared['forbidden_words']}") + print("="*50 + "\n") + + # Initialize game with empty guess + await shared["hinter_queue"].put("") + + # Create agents + hinter = AsyncHinter() + guesser = AsyncGuesser() + + # Setup self-loops + hinter - "continue" >> hinter + guesser - "continue" >> guesser + + # Create flows + hinter_flow = AsyncFlow(start=hinter) + guesser_flow = AsyncFlow(start=guesser) + + # Run both agents concurrently + await asyncio.gather( + hinter_flow.run_async(shared), + guesser_flow.run_async(shared) + ) + + print("\n" + "="*50) + print("🏁 Game Complete!") + print(f"Total guesses: {len(shared.get('past_guesses', []))}") + print("="*50 + "\n") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/skills/pocketflow/assets/flow_template.py b/skills/pocketflow/assets/flow_template.py new file mode 100644 index 0000000..21d9b8d --- /dev/null +++ b/skills/pocketflow/assets/flow_template.py @@ -0,0 +1,147 @@ +""" +PocketFlow Flow Template + +Copy this template and customize for your workflow +""" + +from pocketflow import Flow, Node +# from nodes.my_nodes import Node1, Node2, Node3 # Import your nodes + + +class TemplateFlow(Flow): + """ + Brief description of what this flow does + + Flow Architecture: + node1 >> node2 >> node3 + node2 - "special" >> node4 + + Shared Store Schema: + Input: + - input_data (str): Initial input + + Intermediate: + - step1_result (str): Result from node1 + - step2_result (str): Result from node2 + + Output: + - final_result (str): Final output + """ + + def __init__(self): + """Initialize the flow with nodes and connections""" + + # TODO: Create your nodes + node1 = Node1() + node2 = Node2() + node3 = Node3() + + # TODO: Define flow connections + + # Simple sequence + node1 >> node2 >> node3 + + # Branching (conditional) + # node2 - "error" >> error_handler + # node2 - "success" >> node3 + + # Looping + # node3 - "retry" >> node1 + + # Initialize with start node + super().__init__(start=node1) + + +# Example with actual implementation +class SimpleWorkflow(Flow): + """Example: Simple 3-step workflow""" + + def __init__(self): + # Step 1: Load data + load = LoadNode() + + # Step 2: Process + process = ProcessNode() + + # Step 3: Save + save = SaveNode() + + # Connect + load >> process >> save + + super().__init__(start=load) + + +class ConditionalWorkflow(Flow): + """Example: Workflow with branching""" + + def __init__(self): + # Create nodes + validate = ValidateNode() + process_valid = ProcessValidNode() + process_invalid = ProcessInvalidNode() + finalize = FinalizeNode() + + # Branching based on validation + validate - "valid" >> process_valid + validate - "invalid" >> process_invalid + + # Both paths lead to finalize + process_valid >> finalize + process_invalid >> finalize + + super().__init__(start=validate) + + +class LoopingWorkflow(Flow): + """Example: Workflow with retry loop""" + + def __init__(self): + # Create nodes + attempt = AttemptNode() + verify = VerifyNode() + finish = FinishNode() + + # Setup loop + attempt >> verify + + # Branching: success or retry + verify - "success" >> finish + verify - "retry" >> attempt # Loop back + + # Optional: max attempts check + verify - "failed" >> finish + + super().__init__(start=attempt) + + +class NestedWorkflow(Flow): + """Example: Flow containing sub-flows""" + + def __init__(self): + # Create sub-flows + preprocessing_flow = PreprocessFlow() + processing_flow = ProcessFlow() + postprocessing_flow = PostprocessFlow() + + # Connect sub-flows + preprocessing_flow >> processing_flow >> postprocessing_flow + + super().__init__(start=preprocessing_flow) + + +# Example usage +if __name__ == "__main__": + # Create flow + flow = SimpleWorkflow() + + # Prepare shared store + shared = { + "input_data": "Hello, PocketFlow!" + } + + # Run flow + flow.run(shared) + + # Check results + print(f"Final result: {shared.get('final_result')}") diff --git a/skills/pocketflow/assets/node_template.py b/skills/pocketflow/assets/node_template.py new file mode 100644 index 0000000..91f9489 --- /dev/null +++ b/skills/pocketflow/assets/node_template.py @@ -0,0 +1,124 @@ +""" +PocketFlow Node Template + +Copy this template and customize for your needs +""" + +from pocketflow import Node +# from utils.call_llm import call_llm # Uncomment if using LLM + + +class TemplateNode(Node): + """ + Brief description of what this node does + + Shared Store Schema: + Input: + - key1 (type): description + - key2 (type): description + + Output: + - result_key (type): description + + Actions: + - "default": Normal flow + - "error": If something goes wrong + - "retry": If needs retry + """ + + def prep(self, shared): + """ + Prepare data from shared store + + Args: + shared (dict): Shared data store + + Returns: + Any: Data to pass to exec() + """ + # TODO: Get data from shared store + input_data = shared.get("input_key") + + # Optional: Add validation + if not input_data: + raise ValueError("Missing required input") + + return input_data + + def exec(self, prep_res): + """ + Execute the main logic (can fail and retry) + + Args: + prep_res: Data from prep() + + Returns: + Any: Result to pass to post() + """ + # TODO: Implement your logic here + + # Example: Call LLM + # result = call_llm(f"Process: {prep_res}") + + # Example: Process data + result = f"Processed: {prep_res}" + + return result + + def post(self, shared, prep_res, exec_res): + """ + Save results and return action + + Args: + shared (dict): Shared data store + prep_res: Original data from prep() + exec_res: Result from exec() + + Returns: + str: Action name for flow control + """ + # TODO: Save results to shared store + shared["result_key"] = exec_res + + # Optional: Conditional actions + # if some_condition: + # return "special_action" + + return "default" + + def exec_fallback(self, prep_res, exc): + """ + Optional: Handle errors gracefully + + Args: + prep_res: Data from prep() + exc: The exception that occurred + + Returns: + Any: Fallback result (passed to post as exec_res) + """ + # TODO: Implement fallback logic + print(f"Error occurred: {exc}") + + # Option 1: Re-raise the exception + # raise exc + + # Option 2: Return fallback value + return "Fallback result" + + +# Example usage +if __name__ == "__main__": + # Create node with retry settings + node = TemplateNode(max_retries=3, wait=5) + + # Create shared store + shared = { + "input_key": "test input" + } + + # Run node + action = node.run(shared) + + print(f"Action: {action}") + print(f"Result: {shared.get('result_key')}") diff --git a/skills/pocketflow/assets/template/README.md b/skills/pocketflow/assets/template/README.md new file mode 100644 index 0000000..c8f3ae1 --- /dev/null +++ b/skills/pocketflow/assets/template/README.md @@ -0,0 +1,80 @@ +# PocketFlow Project Template + +This template provides a best-practice structure for PocketFlow projects. + +Source: https://github.com/The-Pocket/PocketFlow-Template-Python + +## Project Structure + +``` +template/ +├── main.py # Entry point +├── flow.py # Flow definition +├── nodes.py # Node implementations +├── utils.py # Utility functions (LLM wrappers, etc.) +└── requirements.txt # Python dependencies +``` + +## Quick Start + +1. **Install dependencies:** + ```bash + pip install -r requirements.txt + ``` + +2. **Configure your LLM:** + Edit `utils.py` and implement `call_llm()` for your provider (OpenAI, Anthropic, or Gemini) + +3. **Set API key:** + ```bash + export OPENAI_API_KEY=sk-... + # or + export ANTHROPIC_API_KEY=sk-ant-... + # or + export GEMINI_API_KEY=... + ``` + +4. **Run:** + ```bash + python main.py + ``` + +## Customization + +- **Add nodes:** Create new node classes in `nodes.py` +- **Modify flow:** Update connections in `flow.py` +- **Add utilities:** Implement helpers in `utils.py` +- **Update logic:** Customize `main.py` for your use case + +## Best Practices Demonstrated + +1. **Separation of Concerns:** + - `nodes.py` - Node logic only + - `flow.py` - Flow orchestration only + - `utils.py` - Reusable utilities + - `main.py` - Application entry point + +2. **Factory Pattern:** + - `create_qa_flow()` makes flow reusable + - Easy to test and modify + +3. **Clear Data Flow:** + - Shared store pattern for data passing + - Explicit state management + +4. **Configuration:** + - Environment variables for API keys + - requirements.txt for dependencies + +## Next Steps + +1. Implement your `call_llm()` function +2. Add your business logic to nodes +3. Define your workflow in flow.py +4. Run and iterate! + +## Resources + +- **PocketFlow Docs:** https://the-pocket.github.io/PocketFlow/ +- **GitHub:** https://github.com/The-Pocket/PocketFlow +- **Examples:** See the cookbook/ directory for more patterns diff --git a/skills/pocketflow/assets/template/flow.py b/skills/pocketflow/assets/template/flow.py new file mode 100644 index 0000000..d32833d --- /dev/null +++ b/skills/pocketflow/assets/template/flow.py @@ -0,0 +1,37 @@ +""" +PocketFlow Template - Flow Definition + +Source: https://github.com/The-Pocket/PocketFlow-Template-Python + +This module defines the QA flow by connecting nodes. +""" + +from pocketflow import Flow +from nodes import GetQuestionNode, AnswerNode + + +def create_qa_flow(): + """ + Create a simple Question-Answer flow + + Flow structure: + GetQuestionNode >> AnswerNode + + Returns: + Flow: Configured QA flow + """ + # Create nodes + get_question_node = GetQuestionNode() + answer_node = AnswerNode() + + # Connect nodes sequentially + get_question_node >> answer_node + + # Create flow with start node + qa_flow = Flow(start=get_question_node) + + return qa_flow + + +# For direct module execution +qa_flow = create_qa_flow() diff --git a/skills/pocketflow/assets/template/main.py b/skills/pocketflow/assets/template/main.py new file mode 100644 index 0000000..729229b --- /dev/null +++ b/skills/pocketflow/assets/template/main.py @@ -0,0 +1,35 @@ +""" +PocketFlow Template - Main Entry Point + +Source: https://github.com/The-Pocket/PocketFlow-Template-Python + +This template demonstrates best practices for structuring a PocketFlow project. +""" + +from flow import create_qa_flow + + +def main(): + """Main entry point for the application""" + + # Prepare shared data store + shared = { + "question": "In one sentence, what's the end of universe?", + "answer": None + } + + # Create and run the flow + qa_flow = create_qa_flow() + qa_flow.run(shared) + + # Display results + print(f"\n{'='*60}") + print("Results:") + print(f"{'='*60}") + print(f"Question: {shared['question']}") + print(f"Answer: {shared['answer']}") + print(f"{'='*60}\n") + + +if __name__ == "__main__": + main() diff --git a/skills/pocketflow/assets/template/nodes.py b/skills/pocketflow/assets/template/nodes.py new file mode 100644 index 0000000..7ab32ae --- /dev/null +++ b/skills/pocketflow/assets/template/nodes.py @@ -0,0 +1,56 @@ +""" +PocketFlow Template - Node Definitions + +Source: https://github.com/The-Pocket/PocketFlow-Template-Python + +This module contains the node definitions for the QA flow. +Each node implements the prep/exec/post pattern. +""" + +from pocketflow import Node +# from utils import call_llm # Uncomment when implemented + + +class GetQuestionNode(Node): + """Node to get user input""" + + def prep(self, shared): + """Prepare: can access shared store but no data needed""" + return None + + def exec(self, prep_res): + """Execute: get user input""" + question = input("\nEnter your question: ") + return question + + def post(self, shared, prep_res, exec_res): + """Post: store question in shared store""" + shared["question"] = exec_res + print(f"✓ Question received: {exec_res}") + return "default" + + +class AnswerNode(Node): + """Node to generate answer using LLM""" + + def prep(self, shared): + """Prepare: get question from shared store""" + return shared.get("question", "") + + def exec(self, question): + """Execute: call LLM to get answer""" + if not question: + return "No question provided" + + # Call your LLM implementation + # answer = call_llm(question) + + # Placeholder + answer = f"This is a placeholder answer to: {question}\nImplement call_llm() in utils.py" + return answer + + def post(self, shared, prep_res, exec_res): + """Post: store answer in shared store""" + shared["answer"] = exec_res + print(f"✓ Answer generated ({len(exec_res)} chars)") + return "default" diff --git a/skills/pocketflow/assets/template/requirements.txt b/skills/pocketflow/assets/template/requirements.txt new file mode 100644 index 0000000..3a6f000 --- /dev/null +++ b/skills/pocketflow/assets/template/requirements.txt @@ -0,0 +1,20 @@ +# PocketFlow Template Requirements + +# Core framework +pocketflow + +# LLM Providers (uncomment what you need) +# openai>=1.0.0 +# anthropic>=0.18.0 +# google-generativeai>=0.3.0 + +# Optional utilities +# requests>=2.31.0 +# beautifulsoup4>=4.12.0 +# faiss-cpu>=1.7.4 +# numpy>=1.24.0 + +# Development tools +# pytest>=7.4.0 +# black>=23.0.0 +# flake8>=6.0.0 diff --git a/skills/pocketflow/assets/template/utils.py b/skills/pocketflow/assets/template/utils.py new file mode 100644 index 0000000..9b44671 --- /dev/null +++ b/skills/pocketflow/assets/template/utils.py @@ -0,0 +1,61 @@ +""" +PocketFlow Template - Utility Functions + +Source: https://github.com/The-Pocket/PocketFlow-Template-Python + +This module contains utility functions like LLM wrappers. +""" + +import os + + +def call_llm(prompt): + """ + Call your LLM provider + + Args: + prompt (str): The prompt to send to the LLM + + Returns: + str: The LLM response + + TODO: Implement your LLM provider here + """ + + # Example: OpenAI + """ + from openai import OpenAI + client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) + response = client.chat.completions.create( + model="gpt-4o", + messages=[{"role": "user", "content": prompt}] + ) + return response.choices[0].message.content + """ + + # Example: Anthropic + """ + from anthropic import Anthropic + client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) + response = client.messages.create( + model="claude-sonnet-4-0", + messages=[{"role": "user", "content": prompt}] + ) + return response.content[0].text + """ + + # Example: Google Gemini + """ + from google import genai + client = genai.Client(api_key=os.getenv("GEMINI_API_KEY")) + response = client.models.generate_content( + model='gemini-2.0-flash-exp', + contents=prompt + ) + return response.text + """ + + raise NotImplementedError( + "Implement your LLM provider in utils.py\n" + "See examples above for OpenAI, Anthropic, or Google Gemini" + ) diff --git a/skills/pocketflow/references/core_abstraction.md b/skills/pocketflow/references/core_abstraction.md new file mode 100644 index 0000000..c18fbd5 --- /dev/null +++ b/skills/pocketflow/references/core_abstraction.md @@ -0,0 +1,1634 @@ +# Pocketflow - Core Abstraction + +**Pages:** 21 + +--- + +## (Advanced) Async + +**URL:** https://the-pocket.github.io/PocketFlow/core_abstraction/async.html + +**Contents:** +- (Advanced) Async + - Example + +Async Nodes implement prep_async(), exec_async(), exec_fallback_async(), and/or post_async(). This is useful for: + +Note: AsyncNode must be wrapped in AsyncFlow. AsyncFlow can also include regular (sync) nodes. + +**Examples:** + +Example 1 (python): +```python +class SummarizeThenVerify(AsyncNode): + async def prep_async(self, shared): + # Example: read a file asynchronously + doc_text = await read_file_async(shared["doc_path"]) + return doc_text + + async def exec_async(self, prep_res): + # Example: async LLM call + summary = await call_llm_async(f"Summarize: {prep_res}") + return summary + + async def post_async(self, shared, prep_res, exec_res): + # Example: wait for user feedback + decision = await gather_user_feedback(exec_res) + if decision == "approve": + shared["summary"] = e +... +``` + +--- + +## (Advanced) Multi-Agents + +**URL:** https://the-pocket.github.io/PocketFlow/design_pattern/multi_agent.html + +**Contents:** +- (Advanced) Multi-Agents + - Example Agent Communication: Message Queue + - Interactive Multi-Agent Example: Taboo Game + +Multiple Agents can work together by handling subtasks and communicating the progress. Communication between agents is typically implemented using message queues in shared storage. + +Most of time, you don’t need Multi-Agents. Start with a simple solution first. + +Here’s a simple example showing how to implement agent communication using asyncio.Queue. The agent listens for messages, processes them, and continues listening: + +Here’s a more complex example where two agents play the word-guessing game Taboo. One agent provides hints while avoiding forbidden words, and another agent tries to guess the target word: + +**Examples:** + +Example 1 (python): +```python +class AgentNode(AsyncNode): + async def prep_async(self, _): + message_queue = self.params["messages"] + message = await message_queue.get() + print(f"Agent received: {message}") + return message + +# Create node and flow +agent = AgentNode() +agent >> agent # connect to self +flow = AsyncFlow(start=agent) + +# Create heartbeat sender +async def send_system_messages(message_queue): + counter = 0 + messages = [ + "System status: all systems operational", + "Memory usage: normal", + "Network connectivity: stable", + "Processing load: optimal" + ] + +... +``` + +Example 2 (unknown): +```unknown +Agent received: System status: all systems operational | timestamp_0 +Agent received: Memory usage: normal | timestamp_1 +Agent received: Network connectivity: stable | timestamp_2 +Agent received: Processing load: optimal | timestamp_3 +``` + +Example 3 (python): +```python +class AsyncHinter(AsyncNode): + async def prep_async(self, shared): + guess = await shared["hinter_queue"].get() + if guess == "GAME_OVER": + return None + return shared["target_word"], shared["forbidden_words"], shared.get("past_guesses", []) + + async def exec_async(self, inputs): + if inputs is None: + return None + target, forbidden, past_guesses = inputs + prompt = f"Generate hint for '{target}'\nForbidden words: {forbidden}" + if past_guesses: + prompt += f"\nPrevious wrong guesses: {past_guesses}\nMake hint more +... +``` + +Example 4 (unknown): +```unknown +Game starting! +Target word: nostalgia +Forbidden words: ['memory', 'past', 'remember', 'feeling', 'longing'] + +Hinter: Here's your hint - Thinking of childhood summer days +Guesser: I guess it's - popsicle + +Hinter: Here's your hint - When childhood cartoons make you emotional +Guesser: I guess it's - nostalgic + +Hinter: Here's your hint - When old songs move you +Guesser: I guess it's - memories + +Hinter: Here's your hint - That warm emotion about childhood +Guesser: I guess it's - nostalgia +Game Over - Correct guess! +``` + +--- + +## (Advanced) Parallel + +**URL:** https://the-pocket.github.io/PocketFlow/core_abstraction/parallel.html + +**Contents:** +- (Advanced) Parallel +- AsyncParallelBatchNode +- AsyncParallelBatchFlow + +Parallel Nodes and Flows let you run multiple Async Nodes and Flows concurrently—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute. + +Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O. + +Ensure Tasks Are Independent: If each item depends on the output of a previous item, do not parallelize. + +Beware of Rate Limits: Parallel calls can quickly trigger rate limits on LLM services. You may need a throttling mechanism (e.g., semaphores or sleep intervals). + +Consider Single-Node Batch APIs: Some LLMs offer a batch inference API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits. + +Like AsyncBatchNode, but run exec_async() in parallel: + +Parallel version of BatchFlow. Each iteration of the sub-flow runs concurrently using different parameters: + +**Examples:** + +Example 1 (python): +```python +class ParallelSummaries(AsyncParallelBatchNode): + async def prep_async(self, shared): + # e.g., multiple texts + return shared["texts"] + + async def exec_async(self, text): + prompt = f"Summarize: {text}" + return await call_llm_async(prompt) + + async def post_async(self, shared, prep_res, exec_res_list): + shared["summary"] = "\n\n".join(exec_res_list) + return "default" + +node = ParallelSummaries() +flow = AsyncFlow(start=node) +``` + +Example 2 (python): +```python +class SummarizeMultipleFiles(AsyncParallelBatchFlow): + async def prep_async(self, shared): + return [{"filename": f} for f in shared["files"]] + +sub_flow = AsyncFlow(start=LoadAndSummarizeFile()) +parallel_flow = SummarizeMultipleFiles(start=sub_flow) +await parallel_flow.run_async(shared) +``` + +--- + +## Agentic Coding: Humans Design, Agents code! + +**URL:** https://the-pocket.github.io/PocketFlow/guide.html + +**Contents:** +- Agentic Coding: Humans Design, Agents code! +- Agentic Coding Steps +- Example LLM Project File Structure + +If you are an AI agent involved in building LLM Systems, read this guide VERY, VERY carefully! This is the most important chapter in the entire document. Throughout development, you should always (1) start with a small and simple solution, (2) design at a high level (docs/design.md) before implementation, and (3) frequently ask humans for feedback and clarification. + +Agentic Coding should be a collaboration between Human System Design and Agent Implementation: + +If Humans can’t specify the flow, AI Agents can’t automate it! Before building an LLM system, thoroughly understand the problem and potential solution by manually solving example inputs to develop intuition. + +Sometimes, design Utilities before Flow: For example, for an LLM project to automate a legacy system, the bottleneck will likely be the available interface to that system. Start by designing the hardest utilities for interfacing, and then build the flow around them. + +Avoid Exception Handling in Utilities: If a utility function is called from a Node’s exec() method, avoid using try...except blocks within the utility. Let the Node’s built-in retry mechanism handle failures. + +You’ll likely iterate a lot! Expect to repeat Steps 3–6 hundreds of times. + +def call_llm(prompt: str) -> str: client = genai.Client( api_key=os.getenv(“GEMINI_API_KEY”, “”), ) model = os.getenv(“GEMINI_MODEL”, “gemini-2.5-flash”) response = client.models.generate_content(model=model, contents=[prompt]) return response.text + +if name == “main”: test_prompt = “Hello, how are you?” + +**Examples:** + +Example 1 (mermaid): +```mermaid +flowchart LR + start[Start] --> batch[Batch] + batch --> check[Check] + check -->|OK| process + check -->|Error| fix[Fix] + fix --> check + + subgraph process[Process] + step1[Step 1] --> step2[Step 2] + end + + process --> endNode[End] +``` + +Example 2 (python): +```python +# utils/call_llm.py +from openai import OpenAI + +def call_llm(prompt): + client = OpenAI(api_key="YOUR_API_KEY_HERE") + r = client.chat.completions.create( + model="gpt-4o", + messages=[{"role": "user", "content": prompt}] + ) + return r.choices[0].message.content + +if __name__ == "__main__": + prompt = "What is the meaning of life?" + print(call_llm(prompt)) +``` + +Example 3 (unknown): +```unknown +shared = { + "user": { + "id": "user123", + "context": { # Another nested dict + "weather": {"temp": 72, "condition": "sunny"}, + "location": "San Francisco" + } + }, + "results": {} # Empty dict to store outputs +} +``` + +Example 4 (unknown): +```unknown +my_project/ +├── main.py +├── nodes.py +├── flow.py +├── utils/ +│ ├── __init__.py +│ ├── call_llm.py +│ └── search_web.py +├── requirements.txt +└── docs/ + └── design.md +``` + +--- + +## Agent + +**URL:** https://the-pocket.github.io/PocketFlow/design_pattern/agent.html + +**Contents:** +- Agent +- Implement Agent with Graph +- Example Good Action Design +- Example: Search Agent + +Agent is a powerful design pattern in which nodes can take dynamic actions based on the context. + +The core of building high-performance and reliable agents boils down to: + +Context Management: Provide relevant, minimal context. For example, rather than including an entire chat history, retrieve the most relevant via RAG. Even with larger context windows, LLMs still fall victim to “lost in the middle”, overlooking mid-prompt content. + +Action Space: Provide a well-structured and unambiguous set of actions—avoiding overlap like separate read_databases or read_csvs. Instead, import CSVs into the database. + +Incremental: Feed content in manageable chunks (500 lines or 1 page) instead of all at once. + +Overview-zoom-in: First provide high-level structure (table of contents, summary), then allow drilling into details (raw texts). + +Parameterized/Programmable: Instead of fixed actions, enable parameterized (columns to select) or programmable (SQL queries) actions, for example, to read CSV files. + +Backtracking: Let the agent undo the last step instead of restarting entirely, preserving progress when encountering errors or dead ends. + +**Examples:** + +Example 1 (unknown): +```unknown +f""" +### CONTEXT +Task: {task_description} +Previous Actions: {previous_actions} +Current State: {current_state} + +### ACTION SPACE +[1] search + Description: Use web search to get results + Parameters: + - query (str): What to search for + +[2] answer + Description: Conclude based on the results + Parameters: + - result (str): Final answer to provide + +### NEXT ACTION +Decide the next action based on the current context and available action space. +Return your response in the following format: + +```yaml +thinking: | + +action: +parameters: + > node_b This means if node_a.post() returns "default", go to node_b. (Equivalent to node_a - "default" >> node_b) + +Named action transition: node_a - "action_name" >> node_b This means if node_a.post() returns "action_name", go to node_b. + +It’s possible to create loops, branching, or multi-step flows. + +A Flow begins with a start node. You call Flow(start=some_node) to specify the entry point. When you call flow.run(shared), it executes the start node, looks at its returned Action from post(), follows the transition, and continues until there’s no next node. + +Here’s a minimal flow of two nodes in a chain: + +Here’s a simple expense approval flow that demonstrates branching and looping. The ReviewExpense node can return three possible Actions: + +We can wire them like this: + +Let’s see how it flows: + +node.run(shared) does not proceed to the successor. This is mainly for debugging or testing a single node. + +Always use flow.run(...) in production to ensure the full pipeline runs correctly. + +A Flow can act like a Node, which enables powerful composition patterns. This means you can: + +A Flow is also a Node, so it will run prep() and post(). However: + +Here’s how to connect a flow to another node: + +When parent_flow.run() executes: + +Here’s a practical example that breaks down order processing into nested flows: + +This creates a clean separation of concerns while maintaining a clear execution path: + +**Examples:** + +Example 1 (unknown): +```unknown +node_a >> node_b +flow = Flow(start=node_a) +flow.run(shared) +``` + +Example 2 (unknown): +```unknown +# Define the flow connections +review - "approved" >> payment # If approved, process payment +review - "needs_revision" >> revise # If needs changes, go to revision +review - "rejected" >> finish # If rejected, finish the process + +revise >> review # After revision, go back for another review +payment >> finish # After payment, finish the process + +flow = Flow(start=review) +``` + +Example 3 (mermaid): +```mermaid +flowchart TD + review[Review Expense] -->|approved| payment[Process Payment] + review -->|needs_revision| revise[Revise Report] + review -->|rejected| finish[Finish Process] + + revise --> review + payment --> finish +``` + +Example 4 (unknown): +```unknown +# Create a sub-flow +node_a >> node_b +subflow = Flow(start=node_a) + +# Connect it to another node +subflow >> node_c + +# Create the parent flow +parent_flow = Flow(start=subflow) +``` + +--- + +## LLM Wrappers + +**URL:** https://the-pocket.github.io/PocketFlow/utility_function/llm.html + +**Contents:** +- LLM Wrappers +- Improvements + +Check out libraries like litellm. Here, we provide some minimal example implementations: + +Store the API key in an environment variable like OPENAI_API_KEY for security. + +Feel free to enhance your call_llm function as needed. Here are examples: + +⚠️ Caching conflicts with Node retries, as retries yield the same result. + +To address this, you could use cached results only if not retried. + +**Examples:** + +Example 1 (python): +```python +def call_llm(prompt): + from openai import OpenAI + client = OpenAI(api_key="YOUR_API_KEY_HERE") + r = client.chat.completions.create( + model="gpt-4o", + messages=[{"role": "user", "content": prompt}] + ) + return r.choices[0].message.content + + # Example usage + call_llm("How are you?") +``` + +Example 2 (python): +```python +def call_llm(prompt): + from anthropic import Anthropic + client = Anthropic(api_key="YOUR_API_KEY_HERE") + r = client.messages.create( + model="claude-sonnet-4-0", + messages=[ + {"role": "user", "content": prompt} + ] + ) + return r.content[0].text +``` + +Example 3 (python): +```python +def call_llm(prompt): + from google import genai + client = genai.Client(api_key='GEMINI_API_KEY') + response = client.models.generate_content( + model='gemini-2.5-pro', + contents=prompt + ) + return response.text +``` + +Example 4 (python): +```python +def call_llm(prompt): + from openai import AzureOpenAI + client = AzureOpenAI( + azure_endpoint="https://.openai.azure.com/", + api_key="YOUR_API_KEY_HERE", + api_version="2023-05-15" + ) + r = client.chat.completions.create( + model="", + messages=[{"role": "user", "content": prompt}] + ) + return r.choices[0].message.content +``` + +--- + +## Map Reduce + +**URL:** https://the-pocket.github.io/PocketFlow/design_pattern/mapreduce.html + +**Contents:** +- Map Reduce + - Example: Document Summarization + +MapReduce is a design pattern suitable when you have either: + +and there is a logical way to break the task into smaller, ideally independent parts. + +You first break down the task using BatchNode in the map phase, followed by aggregation in the reduce phase. + +Performance Tip: The example above works sequentially. You can speed up the map phase by running it in parallel. See (Advanced) Parallel for more details. + +**Examples:** + +Example 1 (python): +```python +class SummarizeAllFiles(BatchNode): + def prep(self, shared): + files_dict = shared["files"] # e.g. 10 files + return list(files_dict.items()) # [("file1.txt", "aaa..."), ("file2.txt", "bbb..."), ...] + + def exec(self, one_file): + filename, file_content = one_file + summary_text = call_llm(f"Summarize the following file:\n{file_content}") + return (filename, summary_text) + + def post(self, shared, prep_res, exec_res_list): + shared["file_summaries"] = dict(exec_res_list) + +class CombineSummaries(Node): + def prep(self, shared): + return share +... +``` + +--- + +## Node + +**URL:** https://the-pocket.github.io/PocketFlow/core_abstraction/node.html + +**Contents:** +- Node + - Fault Tolerance & Retries + - Graceful Fallback + - Example: Summarize file + +A Node is the smallest building block. Each Node has 3 steps prep->exec->post: + +Why 3 steps? To enforce the principle of separation of concerns. The data storage and data processing are operated separately. + +All steps are optional. E.g., you can only implement prep and post if you just need to process data. + +You can retry exec() if it raises an exception via two parameters when define the Node: + +When an exception occurs in exec(), the Node automatically retries until: + +You can get the current retry times (0-based) from self.cur_retry. + +To gracefully handle the exception (after all retries) rather than raising it, override: + +By default, it just re-raises exception. But you can return a fallback result instead, which becomes the exec_res passed to post(). + +**Examples:** + +Example 1 (unknown): +```unknown +my_node = SummarizeFile(max_retries=3, wait=10) +``` + +Example 2 (python): +```python +class RetryNode(Node): + def exec(self, prep_res): + print(f"Retry {self.cur_retry} times") + raise Exception("Failed") +``` + +Example 3 (python): +```python +def exec_fallback(self, prep_res, exc): + raise exc +``` + +Example 4 (python): +```python +class SummarizeFile(Node): + def prep(self, shared): + return shared["data"] + + def exec(self, prep_res): + if not prep_res: + return "Empty file content" + prompt = f"Summarize this text in 10 words: {prep_res}" + summary = call_llm(prompt) # might fail + return summary + + def exec_fallback(self, prep_res, exc): + # Provide a simple fallback instead of crashing + return "There was an error processing your request." + + def post(self, shared, prep_res, exec_res): + shared["summary"] = exec_res + # Return "default" by not r +... +``` + +--- + +## Pocket Flow + +**URL:** https://the-pocket.github.io/PocketFlow/ + +**Contents:** +- Pocket Flow +- Core Abstraction +- Design Pattern +- Utility Function +- Ready to build your Apps? + +A 100-line minimalist LLM framework for Agents, Task Decomposition, RAG, etc. + +We model the LLM workflow as a Graph + Shared Store: + +From there, it’s easy to implement popular design patterns: + +We do not provide built-in utilities. Instead, we offer examples—please implement your own: + +Why not built-in?: I believe it’s a bad practice for vendor-specific APIs in a general framework: + +Check out Agentic Coding Guidance, the fastest way to develop LLM projects with Pocket Flow! + +--- + +## RAG (Retrieval Augmented Generation) + +**URL:** https://the-pocket.github.io/PocketFlow/design_pattern/rag.html + +**Contents:** +- RAG (Retrieval Augmented Generation) +- Stage 1: Offline Indexing +- Stage 2: Online Query & Answer + +For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a two-stage RAG pipeline: + +We create three Nodes: + +**Examples:** + +Example 1 (python): +```python +class ChunkDocs(BatchNode): + def prep(self, shared): + # A list of file paths in shared["files"]. We process each file. + return shared["files"] + + def exec(self, filepath): + # read file content. In real usage, do error handling. + with open(filepath, "r", encoding="utf-8") as f: + text = f.read() + # chunk by 100 chars each + chunks = [] + size = 100 + for i in range(0, len(text), size): + chunks.append(text[i : i + size]) + return chunks + + def post(self, shared, prep_res, exec_res_list): + # exec_res +... +``` + +Example 2 (unknown): +```unknown +shared = { + "files": ["doc1.txt", "doc2.txt"], # any text files +} +OfflineFlow.run(shared) +``` + +Example 3 (python): +```python +class EmbedQuery(Node): + def prep(self, shared): + return shared["question"] + + def exec(self, question): + return get_embedding(question) + + def post(self, shared, prep_res, q_emb): + shared["q_emb"] = q_emb + +class RetrieveDocs(Node): + def prep(self, shared): + # We'll need the query embedding, plus the offline index/chunks + return shared["q_emb"], shared["index"], shared["all_chunks"] + + def exec(self, inputs): + q_emb, index, chunks = inputs + I, D = search_index(index, q_emb, top_k=1) + best_id = I[0][0] + relevant_chunk = +... +``` + +Example 4 (unknown): +```unknown +# Suppose we already ran OfflineFlow and have: +# shared["all_chunks"], shared["index"], etc. +shared["question"] = "Why do people like cats?" + +OnlineFlow.run(shared) +# final answer in shared["answer"] +``` + +--- + +## Structured Output + +**URL:** https://the-pocket.github.io/PocketFlow/design_pattern/structure.html + +**Contents:** +- Structured Output + - Example Use Cases +- Prompt Engineering + - Example Text Summarization + - Why YAML instead of JSON? + +In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys. + +There are several approaches to achieve a structured output: + +In practice, Prompting is simple and reliable for modern LLMs. + +When prompting the LLM to produce structured output: + +Besides using assert statements, another popular way to validate schemas is Pydantic + +Current LLMs struggle with escaping. YAML is easier with strings since they don’t always need quotes. + +**Examples:** + +Example 1 (unknown): +```unknown +product: + name: Widget Pro + price: 199.99 + description: | + A high-quality widget designed for professionals. + Recommended for advanced users. +``` + +Example 2 (unknown): +```unknown +summary: + - This product is easy to use. + - It is cost-effective. + - Suitable for all skill levels. +``` + +Example 3 (unknown): +```unknown +server: + host: 127.0.0.1 + port: 8080 + ssl: true +``` + +Example 4 (javascript): +```javascript +class SummarizeNode(Node): + def exec(self, prep_res): + # Suppose `prep_res` is the text to summarize. + prompt = f""" +Please summarize the following text as YAML, with exactly 3 bullet points + +{prep_res} + +Now, output: +```yaml +summary: + - bullet 1 + - bullet 2 + - bullet 3 +```""" + response = call_llm(prompt) + yaml_str = response.split("```yaml")[1].split("```")[0].strip() + + import yaml + structured_result = yaml.safe_load(yaml_str) + + assert "summary" in structured_result + assert isinstance(structured_result["summary"], list) + + ret +... +``` + +--- + +## Text Chunking + +**URL:** https://the-pocket.github.io/PocketFlow/utility_function/chunking.html + +**Contents:** +- Text Chunking +- Example Python Code Samples + - 1. Naive (Fixed-Size) Chunking + - 2. Sentence-Based Chunking + - 3. Other Chunking + +We recommend some implementations of commonly used text chunking approaches. + +Text Chunking is more a micro optimization, compared to the Flow Design. + +It’s recommended to start with the Naive Chunking and optimize later. + +Splits text by a fixed number of words, ignoring sentence or semantic boundaries. + +However, sentences are often cut awkwardly, losing coherence. + +However, might not handle very long sentences or paragraphs well. + +**Examples:** + +Example 1 (python): +```python +def fixed_size_chunk(text, chunk_size=100): + chunks = [] + for i in range(0, len(text), chunk_size): + chunks.append(text[i : i + chunk_size]) + return chunks +``` + +Example 2 (python): +```python +import nltk + +def sentence_based_chunk(text, max_sentences=2): + sentences = nltk.sent_tokenize(text) + chunks = [] + for i in range(0, len(sentences), max_sentences): + chunks.append(" ".join(sentences[i : i + max_sentences])) + return chunks +``` + +--- + +## Text-to-Speech + +**URL:** https://the-pocket.github.io/PocketFlow/utility_function/text_to_speech.html + +**Contents:** +- Text-to-Speech +- Example Python Code + - Amazon Polly + - Google Cloud TTS + - Azure TTS + - IBM Watson TTS + - ElevenLabs + +**Examples:** + +Example 1 (python): +```python +import boto3 + +polly = boto3.client("polly", region_name="us-east-1", + aws_access_key_id="YOUR_AWS_ACCESS_KEY_ID", + aws_secret_access_key="YOUR_AWS_SECRET_ACCESS_KEY") + +resp = polly.synthesize_speech( + Text="Hello from Polly!", + OutputFormat="mp3", + VoiceId="Joanna" +) + +with open("polly.mp3", "wb") as f: + f.write(resp["AudioStream"].read()) +``` + +Example 2 (python): +```python +from google.cloud import texttospeech + +client = texttospeech.TextToSpeechClient() +input_text = texttospeech.SynthesisInput(text="Hello from Google Cloud TTS!") +voice = texttospeech.VoiceSelectionParams(language_code="en-US") +audio_cfg = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.MP3) + +resp = client.synthesize_speech(input=input_text, voice=voice, audio_config=audio_cfg) + +with open("gcloud_tts.mp3", "wb") as f: + f.write(resp.audio_content) +``` + +Example 3 (python): +```python +import azure.cognitiveservices.speech as speechsdk + +speech_config = speechsdk.SpeechConfig( + subscription="AZURE_KEY", region="AZURE_REGION") +audio_cfg = speechsdk.audio.AudioConfig(filename="azure_tts.wav") + +synthesizer = speechsdk.SpeechSynthesizer( + speech_config=speech_config, + audio_config=audio_cfg +) + +synthesizer.speak_text_async("Hello from Azure TTS!").get() +``` + +Example 4 (python): +```python +from ibm_watson import TextToSpeechV1 +from ibm_cloud_sdk_core.authenticators import IAMAuthenticator + +auth = IAMAuthenticator("IBM_API_KEY") +service = TextToSpeechV1(authenticator=auth) +service.set_service_url("IBM_SERVICE_URL") + +resp = service.synthesize( + "Hello from IBM Watson!", + voice="en-US_AllisonV3Voice", + accept="audio/mp3" +).get_result() + +with open("ibm_tts.mp3", "wb") as f: + f.write(resp.content) +``` + +--- + +## Vector Databases + +**URL:** https://the-pocket.github.io/PocketFlow/utility_function/vector.html + +**Contents:** +- Vector Databases +- Example Python Code + - FAISS + - Pinecone + - Qdrant + - Weaviate + - Milvus + - Chroma + - Redis + +Below is a table of the popular vector search solutions: + +Below are basic usage snippets for each tool. + +**Examples:** + +Example 1 (unknown): +```unknown +import faiss +import numpy as np + +# Dimensionality of embeddings +d = 128 + +# Create a flat L2 index +index = faiss.IndexFlatL2(d) + +# Random vectors +data = np.random.random((1000, d)).astype('float32') +index.add(data) + +# Query +query = np.random.random((1, d)).astype('float32') +D, I = index.search(query, k=5) + +print("Distances:", D) +print("Neighbors:", I) +``` + +Example 2 (unknown): +```unknown +import pinecone + +pinecone.init(api_key="YOUR_API_KEY", environment="YOUR_ENV") + +index_name = "my-index" + +# Create the index if it doesn't exist +if index_name not in pinecone.list_indexes(): + pinecone.create_index(name=index_name, dimension=128) + +# Connect +index = pinecone.Index(index_name) + +# Upsert +vectors = [ + ("id1", [0.1]*128), + ("id2", [0.2]*128) +] +index.upsert(vectors) + +# Query +response = index.query([[0.15]*128], top_k=3) +print(response) +``` + +Example 3 (python): +```python +import qdrant_client +from qdrant_client.models import Distance, VectorParams, PointStruct + +client = qdrant_client.QdrantClient( + url="https://YOUR-QDRANT-CLOUD-ENDPOINT", + api_key="YOUR_API_KEY" +) + +collection = "my_collection" +client.recreate_collection( + collection_name=collection, + vectors_config=VectorParams(size=128, distance=Distance.COSINE) +) + +points = [ + PointStruct(id=1, vector=[0.1]*128, payload={"type": "doc1"}), + PointStruct(id=2, vector=[0.2]*128, payload={"type": "doc2"}), +] + +client.upsert(collection_name=collection, points=points) + +results = client.search( + c +... +``` + +Example 4 (unknown): +```unknown +import weaviate + +client = weaviate.Client("https://YOUR-WEAVIATE-CLOUD-ENDPOINT") + +schema = { + "classes": [ + { + "class": "Article", + "vectorizer": "none" + } + ] +} +client.schema.create(schema) + +obj = { + "title": "Hello World", + "content": "Weaviate vector search" +} +client.data_object.create(obj, "Article", vector=[0.1]*128) + +resp = ( + client.query + .get("Article", ["title", "content"]) + .with_near_vector({"vector": [0.15]*128}) + .with_limit(3) + .do() +) +print(resp) +``` + +--- + +## Visualization and Debugging + +**URL:** https://the-pocket.github.io/PocketFlow/utility_function/viz.html + +**Contents:** +- Visualization and Debugging +- 1. Visualization with Mermaid +- 2. Call Stack Debugging + +Similar to LLM wrappers, we don’t provide built-in visualization and debugging. Here, we recommend some minimal (and incomplete) implementations These examples can serve as a starting point for your own tooling. + +This code recursively traverses the nested graph, assigns unique IDs to each node, and treats Flow nodes as subgraphs to generate Mermaid syntax for a hierarchical visualization. + +For example, suppose we have a complex Flow for data science: + +The code generates a Mermaid diagram: + +For visualization based on d3.js, check out the cookbook. + +It would be useful to print the Node call stacks for debugging. This can be achieved by inspecting the runtime call stack: + +For example, suppose we have a complex Flow for data science: + +The output would be: Call stack: ['EvaluateModelNode', 'ModelFlow', 'DataScienceFlow'] + +For a more complete implementation, check out the cookbook. + +**Examples:** + +Example 1 (python): +```python +def build_mermaid(start): + ids, visited, lines = {}, set(), ["graph LR"] + ctr = 1 + def get_id(n): + nonlocal ctr + return ids[n] if n in ids else (ids.setdefault(n, f"N{ctr}"), (ctr := ctr + 1))[0] + def link(a, b): + lines.append(f" {a} --> {b}") + def walk(node, parent=None): + if node in visited: + return parent and link(parent, get_id(node)) + visited.add(node) + if isinstance(node, Flow): + node.start_node and parent and link(parent, get_id(node.start_node)) + lines.append(f"\n subgraph sub_flow_{get_id(n +... +``` + +Example 2 (python): +```python +class DataPrepBatchNode(BatchNode): + def prep(self,shared): return [] +class ValidateDataNode(Node): pass +class FeatureExtractionNode(Node): pass +class TrainModelNode(Node): pass +class EvaluateModelNode(Node): pass +class ModelFlow(Flow): pass +class DataScienceFlow(Flow):pass + +feature_node = FeatureExtractionNode() +train_node = TrainModelNode() +evaluate_node = EvaluateModelNode() +feature_node >> train_node >> evaluate_node +model_flow = ModelFlow(start=feature_node) +data_prep_node = DataPrepBatchNode() +validate_node = ValidateDataNode() +data_prep_node >> validate_node >> model_flow +data_scienc +... +``` + +Example 3 (mermaid): +```mermaid +graph LR + subgraph sub_flow_N1[DataScienceFlow] + N2['DataPrepBatchNode'] + N3['ValidateDataNode'] + N2 --> N3 + N3 --> N4 + + subgraph sub_flow_N5[ModelFlow] + N4['FeatureExtractionNode'] + N6['TrainModelNode'] + N4 --> N6 + N7['EvaluateModelNode'] + N6 --> N7 + end + + end +``` + +Example 4 (python): +```python +import inspect + +def get_node_call_stack(): + stack = inspect.stack() + node_names = [] + seen_ids = set() + for frame_info in stack[1:]: + local_vars = frame_info.frame.f_locals + if 'self' in local_vars: + caller_self = local_vars['self'] + if isinstance(caller_self, BaseNode) and id(caller_self) not in seen_ids: + seen_ids.add(id(caller_self)) + node_names.append(type(caller_self).__name__) + return node_names +``` + +--- + +## Web Search + +**URL:** https://the-pocket.github.io/PocketFlow/utility_function/websearch.html + +**Contents:** +- Web Search +- Example Python Code + - 1. Google Custom Search JSON API + - 2. Bing Web Search API + - 3. DuckDuckGo Instant Answer + - 4. Brave Search API + - 5. SerpApi + +We recommend some implementations of commonly used web search tools. + +**Examples:** + +Example 1 (unknown): +```unknown +import requests + +API_KEY = "YOUR_API_KEY" +CX_ID = "YOUR_CX_ID" +query = "example" + +url = "https://www.googleapis.com/customsearch/v1" +params = { + "key": API_KEY, + "cx": CX_ID, + "q": query +} + +response = requests.get(url, params=params) +results = response.json() +print(results) +``` + +Example 2 (unknown): +```unknown +import requests + +SUBSCRIPTION_KEY = "YOUR_BING_API_KEY" +query = "example" + +url = "https://api.bing.microsoft.com/v7.0/search" +headers = {"Ocp-Apim-Subscription-Key": SUBSCRIPTION_KEY} +params = {"q": query} + +response = requests.get(url, headers=headers, params=params) +results = response.json() +print(results) +``` + +Example 3 (unknown): +```unknown +import requests + +query = "example" +url = "https://api.duckduckgo.com/" +params = { + "q": query, + "format": "json" +} + +response = requests.get(url, params=params) +results = response.json() +print(results) +``` + +Example 4 (unknown): +```unknown +import requests + +SUBSCRIPTION_TOKEN = "YOUR_BRAVE_API_TOKEN" +query = "example" + +url = "https://api.search.brave.com/res/v1/web/search" +headers = { + "X-Subscription-Token": SUBSCRIPTION_TOKEN +} +params = { + "q": query +} + +response = requests.get(url, headers=headers, params=params) +results = response.json() +print(results) +``` + +--- + +## Workflow + +**URL:** https://the-pocket.github.io/PocketFlow/design_pattern/workflow.html + +**Contents:** +- Workflow + - Example: Article Writing + +Many real-world tasks are too complex for one LLM call. The solution is to Task Decomposition: decompose them into a chain of multiple Nodes. + +You usually need multiple iterations to find the sweet spot. If the task has too many edge cases, consider using Agents. + +For dynamic cases, consider using Agents. + +**Examples:** + +Example 1 (python): +```python +class GenerateOutline(Node): + def prep(self, shared): return shared["topic"] + def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}") + def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res + +class WriteSection(Node): + def prep(self, shared): return shared["outline"] + def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}") + def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res + +class ReviewAndRefine(Node): + def prep(self, shared): return shared["draft"] + def e +... +``` + +--- diff --git a/skills/pocketflow/references/index.md b/skills/pocketflow/references/index.md new file mode 100644 index 0000000..33d0c93 --- /dev/null +++ b/skills/pocketflow/references/index.md @@ -0,0 +1,7 @@ +# Pocketflow Documentation Index + +## Categories + +### Core Abstraction +**File:** `core_abstraction.md` +**Pages:** 21 diff --git a/skills/pocketflow/scripts/pocketflow_init.py b/skills/pocketflow/scripts/pocketflow_init.py new file mode 100644 index 0000000..442f3cc --- /dev/null +++ b/skills/pocketflow/scripts/pocketflow_init.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python3 +""" +PocketFlow Project Initializer +Creates a new PocketFlow project with best-practice structure +""" + +import os +import sys + +def create_project(project_name): + """Create a new PocketFlow project structure""" + + # Create directories + dirs = [ + f"{project_name}/nodes", + f"{project_name}/flows", + f"{project_name}/utils", + f"{project_name}/tests", + f"{project_name}/docs" + ] + + for d in dirs: + os.makedirs(d, exist_ok=True) + # Create __init__.py for Python packages + if d.endswith(('nodes', 'flows', 'utils', 'tests')): + open(f"{d}/__init__.py", 'w').close() + + # Create main.py + with open(f"{project_name}/main.py", 'w') as f: + f.write('''#!/usr/bin/env python3 +""" +Main entry point for {name} +""" + +from flows.my_flow import MyFlow + +def main(): + shared = {{ + "input": "Hello, PocketFlow!", + }} + + flow = MyFlow() + flow.run(shared) + + print(f"Result: {{shared.get('result')}}") + +if __name__ == "__main__": + main() +'''.format(name=project_name)) + + # Create example LLM utility + with open(f"{project_name}/utils/call_llm.py", 'w') as f: + f.write('''""" +LLM wrapper - customize for your provider +""" + +def call_llm(prompt): + """Call your LLM provider""" + # TODO: Implement your LLM call + # Example for OpenAI: + # from openai import OpenAI + # client = OpenAI(api_key="YOUR_API_KEY") + # response = client.chat.completions.create( + # model="gpt-4o", + # messages=[{"role": "user", "content": prompt}] + # ) + # return response.choices[0].message.content + + raise NotImplementedError("Implement your LLM provider") +''') + + # Create example node + with open(f"{project_name}/nodes/my_node.py", 'w') as f: + f.write('''""" +Example node implementation +""" + +from pocketflow import Node +from utils.call_llm import call_llm + +class ProcessNode(Node): + """Example processing node""" + + def prep(self, shared): + """Get input from shared store""" + return shared.get("input", "") + + def exec(self, prep_res): + """Process with LLM""" + prompt = f"Process this: {prep_res}" + result = call_llm(prompt) + return result + + def post(self, shared, prep_res, exec_res): + """Store result""" + shared["result"] = exec_res + return "default" +''') + + # Create example flow + with open(f"{project_name}/flows/my_flow.py", 'w') as f: + f.write('''""" +Example flow implementation +""" + +from pocketflow import Flow +from nodes.my_node import ProcessNode + +class MyFlow(Flow): + """Example flow""" + + def __init__(self): + # Create nodes + process = ProcessNode() + + # Define flow + # process >> next_node # Add more nodes as needed + + # Initialize flow + super().__init__(start=process) +''') + + # Create requirements.txt + with open(f"{project_name}/requirements.txt", 'w') as f: + f.write('''# PocketFlow dependencies +pocketflow + +# LLM providers (uncomment what you need) +# openai +# anthropic +# google-generativeai + +# Optional utilities +# beautifulsoup4 +# requests +# faiss-cpu +''') + + # Create README + with open(f"{project_name}/README.md", 'w') as f: + f.write(f'''# {project_name} + +PocketFlow project for [describe your use case] + +## Setup + +```bash +# Install dependencies +pip install -r requirements.txt + +# Configure your LLM provider +# Edit utils/call_llm.py + +# Run +python main.py +``` + +## Project Structure + +``` +{project_name}/ +├── main.py # Entry point +├── nodes/ # Node implementations +├── flows/ # Flow definitions +├── utils/ # Utilities (LLM, DB, etc.) +├── tests/ # Unit tests +└── docs/ # Documentation +``` + +## Next Steps + +1. Implement your LLM wrapper in `utils/call_llm.py` +2. Create your nodes in `nodes/` +3. Define your flow in `flows/` +4. Run and test! +''') + + # Create design doc template + with open(f"{project_name}/docs/design.md", 'w') as f: + f.write(f'''# {project_name} Design + +## Problem Statement + +What problem are you solving? + +## Solution Overview + +High-level approach using PocketFlow + +## Flow Architecture + +```mermaid +flowchart LR + start[Start] --> process[Process] + process --> end[End] +``` + +## Data Schema + +```python +shared = {{ + "input": "...", + "intermediate": "...", + "result": "..." +}} +``` + +## Nodes + +### Node 1: ProcessNode +- **Purpose:** What does it do? +- **Input:** What does it need from shared? +- **Output:** What does it produce? +- **Actions:** What actions can it return? + +## Error Handling + +How will you handle failures? + +## Testing Strategy + +How will you test this? +''') + + print(f"✅ Created PocketFlow project: {project_name}/") + print(f"📁 Structure:") + print(f" ├── main.py") + print(f" ├── nodes/my_node.py") + print(f" ├── flows/my_flow.py") + print(f" ├── utils/call_llm.py") + print(f" ├── requirements.txt") + print(f" └── docs/design.md") + print(f"\n🚀 Next steps:") + print(f" 1. cd {project_name}") + print(f" 2. Edit utils/call_llm.py (add your LLM API key)") + print(f" 3. python main.py") + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python pocketflow_init.py ") + sys.exit(1) + + create_project(sys.argv[1]) diff --git a/skills/pocketflow/scripts/test_llm_connection.py b/skills/pocketflow/scripts/test_llm_connection.py new file mode 100644 index 0000000..93c65e1 --- /dev/null +++ b/skills/pocketflow/scripts/test_llm_connection.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +""" +Quick script to test your LLM connection +""" + +import os +import sys + +def test_openai(): + """Test OpenAI connection""" + try: + from openai import OpenAI + client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) + response = client.chat.completions.create( + model="gpt-4o-mini", + messages=[{"role": "user", "content": "Say 'hello'"}] + ) + print("✅ OpenAI: Connected") + print(f" Response: {response.choices[0].message.content}") + return True + except Exception as e: + print(f"❌ OpenAI: Failed - {e}") + return False + +def test_anthropic(): + """Test Anthropic connection""" + try: + from anthropic import Anthropic + client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) + response = client.messages.create( + model="claude-3-5-haiku-20241022", + max_tokens=100, + messages=[{"role": "user", "content": "Say 'hello'"}] + ) + print("✅ Anthropic: Connected") + print(f" Response: {response.content[0].text}") + return True + except Exception as e: + print(f"❌ Anthropic: Failed - {e}") + return False + +def test_google(): + """Test Google Gemini connection""" + try: + from google import genai + client = genai.Client(api_key=os.getenv("GEMINI_API_KEY")) + response = client.models.generate_content( + model='gemini-2.0-flash-exp', + contents="Say 'hello'" + ) + print("✅ Google Gemini: Connected") + print(f" Response: {response.text}") + return True + except Exception as e: + print(f"❌ Google Gemini: Failed - {e}") + return False + +if __name__ == "__main__": + print("🔍 Testing LLM connections...\n") + + results = { + "OpenAI": test_openai(), + "Anthropic": test_anthropic(), + "Google": test_google() + } + + print("\n" + "="*50) + working = [k for k, v in results.items() if v] + if working: + print(f"✅ Working providers: {', '.join(working)}") + else: + print("❌ No working providers found") + print("\nMake sure you've set environment variables:") + print(" export OPENAI_API_KEY=sk-...") + print(" export ANTHROPIC_API_KEY=sk-ant-...") + print(" export GEMINI_API_KEY=...")