Files
gh-nickth3man-claude-market/skills/pocketflow/assets/flow_template.py
2025-11-30 08:44:08 +08:00

148 lines
3.3 KiB
Python

"""
PocketFlow Flow Template
Copy this template and customize for your workflow
"""
from pocketflow import Flow, Node
# from nodes.my_nodes import Node1, Node2, Node3 # Import your nodes
class TemplateFlow(Flow):
"""
Brief description of what this flow does
Flow Architecture:
node1 >> node2 >> node3
node2 - "special" >> node4
Shared Store Schema:
Input:
- input_data (str): Initial input
Intermediate:
- step1_result (str): Result from node1
- step2_result (str): Result from node2
Output:
- final_result (str): Final output
"""
def __init__(self):
"""Initialize the flow with nodes and connections"""
# TODO: Create your nodes
node1 = Node1()
node2 = Node2()
node3 = Node3()
# TODO: Define flow connections
# Simple sequence
node1 >> node2 >> node3
# Branching (conditional)
# node2 - "error" >> error_handler
# node2 - "success" >> node3
# Looping
# node3 - "retry" >> node1
# Initialize with start node
super().__init__(start=node1)
# Example with actual implementation
class SimpleWorkflow(Flow):
"""Example: Simple 3-step workflow"""
def __init__(self):
# Step 1: Load data
load = LoadNode()
# Step 2: Process
process = ProcessNode()
# Step 3: Save
save = SaveNode()
# Connect
load >> process >> save
super().__init__(start=load)
class ConditionalWorkflow(Flow):
"""Example: Workflow with branching"""
def __init__(self):
# Create nodes
validate = ValidateNode()
process_valid = ProcessValidNode()
process_invalid = ProcessInvalidNode()
finalize = FinalizeNode()
# Branching based on validation
validate - "valid" >> process_valid
validate - "invalid" >> process_invalid
# Both paths lead to finalize
process_valid >> finalize
process_invalid >> finalize
super().__init__(start=validate)
class LoopingWorkflow(Flow):
"""Example: Workflow with retry loop"""
def __init__(self):
# Create nodes
attempt = AttemptNode()
verify = VerifyNode()
finish = FinishNode()
# Setup loop
attempt >> verify
# Branching: success or retry
verify - "success" >> finish
verify - "retry" >> attempt # Loop back
# Optional: max attempts check
verify - "failed" >> finish
super().__init__(start=attempt)
class NestedWorkflow(Flow):
"""Example: Flow containing sub-flows"""
def __init__(self):
# Create sub-flows
preprocessing_flow = PreprocessFlow()
processing_flow = ProcessFlow()
postprocessing_flow = PostprocessFlow()
# Connect sub-flows
preprocessing_flow >> processing_flow >> postprocessing_flow
super().__init__(start=preprocessing_flow)
# Example usage
if __name__ == "__main__":
# Create flow
flow = SimpleWorkflow()
# Prepare shared store
shared = {
"input_data": "Hello, PocketFlow!"
}
# Run flow
flow.run(shared)
# Check results
print(f"Final result: {shared.get('final_result')}")