Initial commit

This commit is contained in:
Zhongwei Li
2025-11-29 18:17:27 +08:00
commit 9b5e9af436
6 changed files with 583 additions and 0 deletions

245
skills/web/scripts/fetch.py Executable file
View File

@@ -0,0 +1,245 @@
#!/usr/bin/env python3
# /// script
# dependencies = [
# "httpx",
# "beautifulsoup4",
# "markdownify",
# "click",
# ]
# ///
"""
Web Fetch Script
Fetch and parse web page content, converting HTML to clean markdown.
Supports timeout configuration and size limits.
Usage:
uv run fetch.py --url "https://example.com"
uv run fetch.py --url "https://example.com" --json
uv run fetch.py --url "https://example.com" --timeout 60
"""
import json
import os
import sys
from urllib.parse import urlparse
import click
import httpx
from bs4 import BeautifulSoup # type: ignore
from markdownify import markdownify as md # type: ignore
# Configuration defaults
DEFAULT_TIMEOUT = 30 # seconds
DEFAULT_MAX_SIZE = 1048576 # 1MB
def is_valid_url(url: str) -> bool:
"""Validate URL format.
Args:
url: URL to validate
Returns:
True if URL is valid (http/https), False otherwise
"""
try:
parsed = urlparse(url)
return parsed.scheme in ("http", "https") and bool(parsed.netloc)
except Exception:
return False
def html_to_markdown(html: str) -> str:
"""Convert HTML content to clean markdown.
Removes scripts, styles, and other non-content elements before conversion.
Args:
html: HTML content to convert
Returns:
Markdown representation of the content
"""
# Parse HTML
soup = BeautifulSoup(html, "html.parser")
# Remove script and style elements
for element in soup(["script", "style", "nav", "footer", "header"]):
element.decompose()
# Convert to markdown
markdown = md(str(soup), heading_style="ATX", strip=["img"])
# Clean up excessive whitespace
lines = [line.strip() for line in markdown.split("\n")]
clean_lines = [line for line in lines if line] # Remove empty lines
return "\n\n".join(clean_lines)
def fetch_web_page(url: str, timeout: int, max_size: int) -> dict:
"""Fetch and parse web page content.
Args:
url: URL to fetch content from
timeout: Request timeout in seconds
max_size: Maximum response size in bytes
Returns:
Dict with success, result/error, and message
"""
# Validate URL
if not is_valid_url(url):
return {
"success": False,
"error": "invalid_url",
"message": f"URL must start with http:// or https://. Got: {url}",
}
try:
# Fetch content with timeout
with httpx.Client(timeout=timeout, follow_redirects=True) as client:
response = client.get(url)
response.raise_for_status()
# Check content size
content_length = len(response.content)
if content_length > max_size:
return {
"success": False,
"error": "content_too_large",
"message": f"Response size ({content_length} bytes) exceeds maximum ({max_size} bytes)",
}
# Parse HTML to markdown
markdown_content = html_to_markdown(response.text)
return {
"success": True,
"result": markdown_content,
"message": f"Successfully fetched {len(markdown_content)} characters from {url}",
}
except httpx.TimeoutException:
return {
"success": False,
"error": "timeout",
"message": f"Request timed out after {timeout} seconds",
}
except httpx.HTTPStatusError as e:
return {
"success": False,
"error": "http_error",
"message": f"HTTP {e.response.status_code}: {e.response.reason_phrase}",
}
except httpx.RequestError as e:
return {
"success": False,
"error": "request_error",
"message": f"Request failed: {str(e)}",
}
except Exception as e:
return {
"success": False,
"error": "unknown_error",
"message": f"Unexpected error: {str(e)}",
}
def format_markdown_output(url: str, markdown: str) -> str:
"""Format markdown for human-readable output.
Args:
url: Source URL
markdown: Markdown content
Returns:
Formatted string for display
"""
lines = []
lines.append("\n" + "=" * 60)
lines.append(f"📄 Content from {url}")
lines.append("=" * 60)
lines.append("")
lines.append(markdown)
lines.append("")
lines.append("=" * 60)
return "\n".join(lines)
@click.command()
@click.option("--url", required=True, help="URL to fetch content from")
@click.option(
"--timeout",
default=None,
type=int,
help=f"Request timeout in seconds (default: {DEFAULT_TIMEOUT})",
)
@click.option(
"--max-size",
default=None,
type=int,
help=f"Maximum response size in bytes (default: {DEFAULT_MAX_SIZE})",
)
@click.option(
"--json",
"output_json",
is_flag=True,
help="Output as JSON instead of human-readable format",
)
def main(url: str, timeout: int | None, max_size: int | None, output_json: bool) -> None:
"""
Fetch and parse web page content.
Retrieves HTML content from the specified URL and converts it to clean
markdown format. Handles timeouts, HTTP errors, and invalid URLs gracefully.
Environment variables:
WEB_FETCH_TIMEOUT - Default timeout in seconds
WEB_FETCH_MAX_SIZE - Default max response size in bytes
Examples:
uv run fetch.py --url "https://example.com"
uv run fetch.py --url "https://example.com" --json
uv run fetch.py --url "https://example.com" --timeout 60
"""
try:
# Get timeout from env or use default
if timeout is None:
timeout = int(os.getenv("WEB_FETCH_TIMEOUT", str(DEFAULT_TIMEOUT)))
# Get max_size from env or use default
if max_size is None:
max_size = int(os.getenv("WEB_FETCH_MAX_SIZE", str(DEFAULT_MAX_SIZE)))
# Fetch and parse
result = fetch_web_page(url, timeout, max_size)
# Output results
if output_json:
click.echo(json.dumps(result, indent=2))
else:
if result["success"]:
formatted = format_markdown_output(url, result["result"])
click.echo(formatted)
else:
click.echo(f"❌ Error ({result['error']}): {result['message']}", err=True)
sys.exit(0 if result["success"] else 1)
except Exception as e:
if output_json:
error_data = {"success": False, "error": "script_error", "message": str(e)}
click.echo(json.dumps(error_data, indent=2))
else:
click.echo(f"❌ Script Error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()

217
skills/web/scripts/search.py Executable file
View File

@@ -0,0 +1,217 @@
#!/usr/bin/env python3
# /// script
# dependencies = [
# "httpx",
# "click",
# ]
# ///
"""
Web Search Script using Brave Search API
Search the internet using Brave Search API.
Requires BRAVE_API_KEY environment variable.
Usage:
uv run search.py --query "anthropic claude"
uv run search.py --query "python" --count 10
uv run search.py --query "keyword" --json
"""
import json
import os
import sys
import click
import httpx
# Configuration
API_BASE_URL = "https://api.search.brave.com/res/v1"
API_TIMEOUT = 30.0 # seconds
USER_AGENT = "agent-base-web-access/1.0"
class BraveSearchClient:
"""HTTP client for Brave Search API"""
def __init__(self, api_key: str):
"""Initialize Brave Search client
Args:
api_key: Brave Search API key
"""
self.api_key = api_key
self.client = httpx.Client(
base_url=API_BASE_URL,
timeout=API_TIMEOUT,
headers={
"User-Agent": USER_AGENT,
"X-Subscription-Token": api_key,
"Accept": "application/json",
},
)
def __enter__(self) -> "BraveSearchClient":
"""Context manager entry"""
return self
def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None:
"""Context manager exit - cleanup"""
self.client.close()
def search(self, query: str, count: int = 10) -> list[dict]:
"""Search using Brave Search API
Args:
query: Search query
count: Number of results to return (max 20)
Returns:
List of search results with title, url, description
Raises:
Exception if API call fails
"""
try:
params = {
"q": query,
"count": str(min(count, 20)), # Brave API max is 20
}
response = self.client.get("/web/search", params=params)
response.raise_for_status()
data = response.json()
# Extract web results
web_results = data.get("web", {}).get("results", [])
# Format results
results = []
for result in web_results:
results.append(
{
"title": result.get("title", ""),
"url": result.get("url", ""),
"description": result.get("description", ""),
}
)
return results
except httpx.HTTPStatusError as e:
raise Exception(f"API error: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
raise Exception(f"Network error: {str(e)}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
def format_search_result(result: dict, index: int) -> str:
"""Format a single search result"""
title = result.get("title", "N/A")
url = result.get("url", "N/A")
description = result.get("description", "")
lines = []
lines.append(f"{index}. {title}")
lines.append(f" {url}")
if description:
# Truncate long descriptions
desc = description[:200] + "..." if len(description) > 200 else description
lines.append(f" {desc}")
return "\n".join(lines)
def format_search_results(query: str, results: list[dict]) -> str:
"""Format search results for human-readable output
Args:
query: Search query
results: List of search results
Returns:
Formatted string for display
"""
lines = []
lines.append("\n" + "=" * 60)
lines.append(f"🔍 Web Search Results for '{query}'")
lines.append("=" * 60)
if not results:
lines.append("\nNo results found for your search.")
lines.append("\nTip: Try different keywords.")
else:
lines.append(f"Found {len(results)} results:\n")
for i, result in enumerate(results, 1):
lines.append(format_search_result(result, i))
lines.append("")
lines.append("" * 60)
lines.append("Powered by Brave Search API")
lines.append("=" * 60)
return "\n".join(lines)
@click.command()
@click.option("--query", required=True, help="Search query")
@click.option("--count", default=10, type=int, help="Number of results (max 20)")
@click.option(
"--json",
"output_json",
is_flag=True,
help="Output as JSON instead of human-readable format",
)
def main(query: str, count: int, output_json: bool) -> None:
"""
Search the web using Brave Search API.
Requires BRAVE_API_KEY environment variable.
Examples:
uv run search.py --query "anthropic claude"
uv run search.py --query "python" --count 5 --json
"""
try:
# Get API key from environment
api_key = os.getenv("BRAVE_API_KEY")
if not api_key:
raise ValueError(
"BRAVE_API_KEY environment variable is required. "
"Get a free API key at https://brave.com/search/api/"
)
if not query.strip():
raise ValueError("Query cannot be empty")
# Search via API
with BraveSearchClient(api_key) as client:
results = client.search(query, count)
# Output results
if output_json:
click.echo(json.dumps(results, indent=2))
else:
formatted = format_search_results(query, results)
click.echo(formatted)
sys.exit(0)
except Exception as e:
if output_json:
error_data = {
"success": False,
"error": "runtime_error",
"message": str(e),
}
click.echo(json.dumps(error_data, indent=2))
else:
click.echo(f"❌ Error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()