Initial commit

This commit is contained in:
Zhongwei Li
2025-11-29 18:17:30 +08:00
commit de676930c1
14 changed files with 2478 additions and 0 deletions

View File

@@ -0,0 +1,44 @@
---
name: kalshi-markets
description: "Kalshi prediction market data (prices, odds, orderbooks, trades). Use for prediction markets, Kalshi, betting odds, election and sports betting, market forecasts. Provides real-time market data, event series information, and comprehensive trading analytics."
version: 1.0.0
brief_description: "Kalshi prediction markets and betting odds"
triggers:
keywords: [kalshi, market, markets, prediction, betting, odds, election, sports, forecast, probability, orderbook, trade, price, bet, wager]
verbs: [bet, predict, forecast, trade, check, get]
patterns:
- "prediction.*market"
- "betting.*odds"
- "election.*(?:odds|forecast|probability)"
- "sports.*(?:betting|odds)"
- "market.*(?:forecast|prediction)"
- "what.*(?:odds|probability)"
allowed-tools: Bash
---
# kalshi-markets
## 🎯 Triggers
**USE:** prediction markets, Kalshi, betting odds, forecast markets, election betting, market probabilities
**SKIP:** general knowledge (use built-in)
## Scripts (use via script_run)
**Status & Discovery:**
- `status.py` - Is Kalshi operational?
- `markets.py` - Browse all markets
- `search.py "keyword"` - Find markets by keyword
- `events.py` - List event groups
- `series_list.py` - Browse templates (~6900)
**Market Details:**
- `market.py TICKER` - Market details (positional arg)
- `orderbook.py TICKER` - Bid/ask prices (positional arg)
- `trades.py --ticker X` - Recent trades (flag)
- `event.py EVENT_TICKER` - Event details (positional arg)
- `series.py SERIES_TICKER` - Series info (positional arg)
## Usage
**Pattern:** `script_run kalshi-markets <script> --json [args]`
**Always use:** `--json` flag for structured output
**Help:** `script_help kalshi-markets <script>` shows options

View File

@@ -0,0 +1,204 @@
#!/usr/bin/env python3
# /// script
# dependencies = [
# "httpx",
# "click",
# ]
# ///
"""
Kalshi Event Details Script
Get detailed information for a specific event.
Completely self-contained with embedded HTTP client.
Usage:
uv run event.py EVENT_TICKER
uv run event.py KXELONMARS-99
uv run event.py EVENT_TICKER --with-markets
uv run event.py EVENT_TICKER --json
"""
import json
import sys
from typing import Any
import click
import httpx
# Configuration
API_BASE_URL = "https://api.elections.kalshi.com/trade-api/v2"
API_TIMEOUT = 30.0 # seconds
USER_AGENT = "Kalshi-CLI/1.0"
class KalshiClient:
"""Minimal HTTP client for Kalshi API - event details functionality"""
def __init__(self):
"""Initialize HTTP client"""
self.client = httpx.Client(
base_url=API_BASE_URL, timeout=API_TIMEOUT, headers={"User-Agent": USER_AGENT}
)
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - cleanup"""
self.client.close()
def get_event(self, event_ticker: str, with_nested_markets: bool = False) -> dict[str, Any]:
"""
Get detailed information for a specific event.
Args:
event_ticker: Event ticker identifier
with_nested_markets: Include nested markets in response
Returns:
Event details including markets if requested
Raises:
Exception if API call fails
"""
params = {}
if with_nested_markets:
params["with_nested_markets"] = "true"
try:
response = self.client.get(f"/events/{event_ticker}", params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise Exception(f"Event not found: {event_ticker}")
raise Exception(f"API error: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
raise Exception(f"Network error: {str(e)}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
def format_market_in_event(market: dict[str, Any]) -> str:
"""Format a market within an event"""
ticker = market.get("ticker", "N/A")
title = market.get("title", "N/A")[:60]
yes_bid = market.get("yes_bid", 0)
yes_ask = market.get("yes_ask", 0)
status = market.get("status", "unknown")
status_icon = "🟢" if status == "active" else "🔴"
return f" {status_icon} {ticker}\n {title}... (Bid: {yes_bid}¢ Ask: {yes_ask}¢)"
def format_event_detail(data: dict[str, Any]) -> str:
"""
Format event details for human-readable output.
Args:
data: Event data from API
Returns:
Formatted string for display
"""
# Handle nested event object if present
event = data.get("event", data)
markets = data.get("markets", [])
lines = []
lines.append("\n" + "=" * 80)
lines.append(f"📁 Event: {event.get('event_ticker', 'N/A')}")
lines.append("=" * 80)
# Title and subtitle
title = event.get("title", "N/A")
subtitle = event.get("subtitle", "")
lines.append(f"\n📌 {title}")
if subtitle:
lines.append(f" {subtitle}")
# Basic info
series = event.get("series_ticker", "N/A")
category = event.get("category", "N/A")
status = event.get("status", "unknown")
mutually_exclusive = event.get("mutually_exclusive", False)
status_icon = "🟢" if status == "open" else "🔴" if status == "closed" else ""
lines.append(f"\n{status_icon} Status: {status.upper()}")
lines.append(f"📊 Series: {series}")
lines.append(f"📂 Category: {category}")
if mutually_exclusive:
lines.append("🔒 Mutually Exclusive: Yes")
else:
lines.append("🔓 Mutually Exclusive: No")
# Strike details if present
strike_details = event.get("strike_details")
if strike_details:
lines.append("\n⚡ Strike Details:")
for key, value in strike_details.items():
if value:
lines.append(f" {key}: {value}")
# Markets if included
if markets:
lines.append(f"\n📊 Markets ({len(markets)}):")
lines.append("" * 60)
for i, market in enumerate(markets[:10], 1): # Show first 10
lines.append(f" {i}. {format_market_in_event(market)}")
if len(markets) > 10:
lines.append(f" ... and {len(markets) - 10} more markets")
lines.append("\n" + "=" * 80)
return "\n".join(lines)
@click.command()
@click.argument("event_ticker")
@click.option("--with-markets", is_flag=True, help="Include nested markets in response")
@click.option(
"--json", "output_json", is_flag=True, help="Output as JSON instead of human-readable format"
)
def main(event_ticker: str, with_markets: bool, output_json: bool):
"""
Get detailed information for a specific event.
EVENT_TICKER is the event identifier (e.g., KXELONMARS-99).
Events are collections of related markets.
No authentication required.
"""
try:
# Get event details from API
with KalshiClient() as client:
data = client.get_event(event_ticker, with_nested_markets=with_markets)
# Output results
if output_json:
# JSON output for automation/MCP
click.echo(json.dumps(data, indent=2))
else:
# Human-readable output
formatted = format_event_detail(data)
click.echo(formatted)
sys.exit(0)
except Exception as e:
if output_json:
# JSON error format
error_data = {"error": str(e)}
click.echo(json.dumps(error_data, indent=2))
else:
# Human-readable error
click.echo(f"❌ Error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,223 @@
#!/usr/bin/env python3
# /// script
# dependencies = [
# "httpx",
# "click",
# ]
# ///
"""
Kalshi Events List Script
List events (collections of related markets) from Kalshi.
Completely self-contained with embedded HTTP client.
Usage:
uv run events.py
uv run events.py --limit 5
uv run events.py --status closed
uv run events.py --series-ticker KXHIGHNY
uv run events.py --json
"""
import json
import sys
from typing import Any
import click
import httpx
# Configuration
API_BASE_URL = "https://api.elections.kalshi.com/trade-api/v2"
API_TIMEOUT = 30.0 # seconds
USER_AGENT = "Kalshi-CLI/1.0"
class KalshiClient:
"""Minimal HTTP client for Kalshi API - events functionality"""
def __init__(self):
"""Initialize HTTP client"""
self.client = httpx.Client(
base_url=API_BASE_URL, timeout=API_TIMEOUT, headers={"User-Agent": USER_AGENT}
)
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - cleanup"""
self.client.close()
def get_events(
self,
limit: int = 10,
status: str | None = None,
series_ticker: str | None = None,
with_nested_markets: bool = False,
cursor: str | None = None,
) -> dict[str, Any]:
"""
Get list of events.
Args:
limit: Number of events to return (1-200)
status: Event status filter (open, closed, settled)
series_ticker: Filter by series ticker
with_nested_markets: Include nested markets in response
cursor: Pagination cursor
Returns:
Dict with 'events' array and optional 'cursor'
Raises:
Exception if API call fails
"""
params = {"limit": str(limit)}
if status:
params["status"] = status
if series_ticker:
params["series_ticker"] = series_ticker
if with_nested_markets:
params["with_nested_markets"] = "true"
if cursor:
params["cursor"] = cursor
try:
response = self.client.get("/events", params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise Exception(f"API error: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
raise Exception(f"Network error: {str(e)}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
def format_event_summary(event: dict[str, Any], index: int) -> str:
"""Format a single event for display"""
event_ticker = event.get("event_ticker", "N/A")
title = event.get("title", "N/A")
category = event.get("category", "N/A")
series_ticker = event.get("series_ticker", "N/A")
status = event.get("status", "unknown")
# Status icon
status_icon = "🟢" if status == "open" else "🔴" if status == "closed" else ""
lines = []
lines.append(f"{index}. {status_icon} {event_ticker}")
lines.append(f" 📌 {title[:70]}{'...' if len(title) > 70 else ''}")
lines.append(f" 📂 Category: {category} | Series: {series_ticker}")
# Show market count if available
markets = event.get("markets", [])
if markets:
lines.append(f" 📊 {len(markets)} markets")
# Mutually exclusive indicator
mutually_exclusive = event.get("mutually_exclusive", False)
if mutually_exclusive:
lines.append(" 🔒 Mutually Exclusive")
return "\n".join(lines)
def format_events_list(data: dict[str, Any]) -> str:
"""
Format events list for human-readable output.
Args:
data: Response data with events array
Returns:
Formatted string for display
"""
events = data.get("events", [])
cursor = data.get("cursor", "")
lines = []
lines.append("\n" + "=" * 60)
lines.append("📁 Kalshi Events")
lines.append("=" * 60)
lines.append(f"Found {len(events)} events\n")
for i, event in enumerate(events, 1):
lines.append(format_event_summary(event, i))
lines.append("") # Blank line between events
if cursor:
lines.append("" * 60)
lines.append(f"📄 More results available. Use --cursor {cursor[:20]}...")
lines.append("=" * 60)
return "\n".join(lines)
@click.command()
@click.option("--limit", default=10, type=int, help="Number of events to return (1-200)")
@click.option(
"--status", type=click.Choice(["open", "closed", "settled"]), help="Event status filter"
)
@click.option("--series-ticker", help="Filter by series ticker")
@click.option("--with-markets", is_flag=True, help="Include nested markets in response")
@click.option("--cursor", help="Pagination cursor for next page")
@click.option(
"--json", "output_json", is_flag=True, help="Output as JSON instead of human-readable format"
)
def main(
limit: int,
status: str | None,
series_ticker: str | None,
with_markets: bool,
cursor: str | None,
output_json: bool,
):
"""
List events (collections of related markets).
Events group related markets together, such as all markets
for a specific date or competition.
No authentication required.
"""
try:
# Validate limit
if limit < 1 or limit > 200:
raise ValueError("Limit must be between 1 and 200")
# Get events from API
with KalshiClient() as client:
data = client.get_events(
limit=limit,
status=status,
series_ticker=series_ticker,
with_nested_markets=with_markets,
cursor=cursor,
)
# Output results
if output_json:
# JSON output for automation/MCP
click.echo(json.dumps(data, indent=2))
else:
# Human-readable output
formatted = format_events_list(data)
click.echo(formatted)
sys.exit(0)
except Exception as e:
if output_json:
# JSON error format
error_data = {"error": str(e)}
click.echo(json.dumps(error_data, indent=2))
else:
# Human-readable error
click.echo(f"❌ Error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,220 @@
#!/usr/bin/env python3
# /// script
# dependencies = [
# "httpx",
# "click",
# ]
# ///
"""
Kalshi Market Details Script
Get detailed information for a specific Kalshi prediction market.
Completely self-contained with embedded HTTP client.
Usage:
uv run market.py TICKER
uv run market.py KXBTCD-25NOV0612-T102499.99
uv run market.py TICKER --json
"""
import json
import sys
from datetime import datetime
from typing import Any
import click
import httpx
# Configuration
API_BASE_URL = "https://api.elections.kalshi.com/trade-api/v2"
API_TIMEOUT = 30.0 # seconds
USER_AGENT = "Kalshi-CLI/1.0"
class KalshiClient:
"""Minimal HTTP client for Kalshi API - market details functionality"""
def __init__(self):
"""Initialize HTTP client"""
self.client = httpx.Client(
base_url=API_BASE_URL, timeout=API_TIMEOUT, headers={"User-Agent": USER_AGENT}
)
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - cleanup"""
self.client.close()
def get_market(self, ticker: str) -> dict[str, Any]:
"""
Get detailed information for a specific market.
Args:
ticker: Market ticker symbol
Returns:
Complete market details including prices, volume, status
Raises:
Exception if API call fails
"""
try:
response = self.client.get(f"/markets/{ticker}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise Exception(f"Market not found: {ticker}")
raise Exception(f"API error: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
raise Exception(f"Network error: {str(e)}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
def format_timestamp(timestamp_str: str) -> str:
"""Format ISO timestamp to readable string"""
if not timestamp_str:
return "N/A"
try:
dt = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d %H:%M:%S UTC")
except Exception:
return timestamp_str
def format_market_detail(market: dict[str, Any]) -> str:
"""
Format market details for human-readable output.
Args:
market: Market data from API
Returns:
Formatted string for display
"""
lines = []
lines.append("\n" + "=" * 80)
lines.append(f"Market: {market.get('ticker', 'N/A')}")
lines.append("=" * 80)
# Title and subtitle
title = market.get("title", "N/A")
subtitle = market.get("subtitle", "")
lines.append(f"\n📌 {title}")
if subtitle:
lines.append(f" {subtitle}")
# Status and category
status = market.get("status", "unknown")
category = market.get("category", "N/A")
status_icon = "🟢" if status == "active" else "🔴" if status == "closed" else ""
lines.append(f"\n{status_icon} Status: {status.upper()}")
lines.append(f"📂 Category: {category}")
# Current prices
lines.append("\n💰 Current Prices:")
yes_bid = market.get("yes_bid", 0)
yes_ask = market.get("yes_ask", 0)
no_bid = market.get("no_bid", 0)
no_ask = market.get("no_ask", 0)
last_price = market.get("last_price", 0)
lines.append(f" YES: Bid {yes_bid}¢ | Ask {yes_ask}¢")
lines.append(f" NO: Bid {no_bid}¢ | Ask {no_ask}¢")
lines.append(f" Last Trade: {last_price}¢")
# Trading activity
lines.append("\n📊 Trading Activity:")
volume = market.get("volume", 0)
volume_24h = market.get("volume_24h", 0)
open_interest = market.get("open_interest", 0)
liquidity = market.get("liquidity", 0)
lines.append(f" Total Volume: ${volume/100:,.2f}")
lines.append(f" 24h Volume: ${volume_24h/100:,.2f}")
lines.append(f" Open Interest: ${open_interest/100:,.2f}")
lines.append(f" Liquidity: ${liquidity/100:,.2f}")
# Schedule
lines.append("\n📅 Schedule:")
open_time = format_timestamp(market.get("open_time"))
close_time = format_timestamp(market.get("close_time"))
expiration_time = format_timestamp(market.get("expiration_time"))
lines.append(f" Opens: {open_time}")
lines.append(f" Closes: {close_time}")
lines.append(f" Expires: {expiration_time}")
# Result if settled
result = market.get("result")
if result:
lines.append(f"\n✅ Result: {result}")
# Rules if present
rules_primary = market.get("rules_primary", "").strip()
if rules_primary:
lines.append("\n📋 Rules:")
# Wrap long rules text
import textwrap
wrapped = textwrap.wrap(rules_primary, width=70)
for line in wrapped[:5]: # Limit to 5 lines
lines.append(f" {line}")
if len(wrapped) > 5:
lines.append(" ...")
lines.append("\n" + "=" * 80)
return "\n".join(lines)
@click.command()
@click.argument("ticker")
@click.option(
"--json", "output_json", is_flag=True, help="Output as JSON instead of human-readable format"
)
def main(ticker: str, output_json: bool):
"""
Get detailed information for a specific market.
TICKER is the market ticker symbol (e.g., KXBTCD-25NOV0612-T102499.99)
Returns comprehensive market details including prices, volume, and status.
No authentication required.
"""
try:
# Get market details from API
with KalshiClient() as client:
data = client.get_market(ticker)
# Handle nested market object if present
market = data.get("market", data)
# Output results
if output_json:
# JSON output for automation/MCP
click.echo(json.dumps(market, indent=2))
else:
# Human-readable output
formatted = format_market_detail(market)
click.echo(formatted)
sys.exit(0)
except Exception as e:
if output_json:
# JSON error format
error_data = {"error": str(e)}
click.echo(json.dumps(error_data, indent=2))
else:
# Human-readable error
click.echo(f"❌ Error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,250 @@
#!/usr/bin/env python3
# /// script
# dependencies = [
# "httpx",
# "click",
# ]
# ///
"""
Kalshi Markets List Script
List Kalshi prediction markets with various filters.
Completely self-contained with embedded HTTP client.
Usage:
uv run markets.py
uv run markets.py --limit 5
uv run markets.py --status closed
uv run markets.py --json
"""
import json
import sys
from typing import Any
import click
import httpx
# Configuration
API_BASE_URL = "https://api.elections.kalshi.com/trade-api/v2"
API_TIMEOUT = 30.0 # seconds
USER_AGENT = "Kalshi-CLI/1.0"
class KalshiClient:
"""Minimal HTTP client for Kalshi API - markets functionality"""
def __init__(self):
"""Initialize HTTP client"""
self.client = httpx.Client(
base_url=API_BASE_URL, timeout=API_TIMEOUT, headers={"User-Agent": USER_AGENT}
)
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - cleanup"""
self.client.close()
def get_markets(
self,
limit: int = 10,
status: str | None = None,
cursor: str | None = None,
event_ticker: str | None = None,
series_ticker: str | None = None,
tickers: str | None = None,
mve_filter: str | None = None,
) -> dict[str, Any]:
"""
Get list of markets with filters.
Args:
limit: Number of markets to return (1-1000)
status: Market status filter (open, closed, settled)
cursor: Pagination cursor
event_ticker: Filter by event ticker
series_ticker: Filter by series ticker
tickers: Comma-separated list of market tickers
mve_filter: Multivariate events filter (only, exclude)
Returns:
Dict with 'markets' array and optional 'cursor'
Raises:
Exception if API call fails
"""
# Build query parameters
params = {"limit": str(limit)}
if status:
params["status"] = status
if cursor:
params["cursor"] = cursor
if event_ticker:
params["event_ticker"] = event_ticker
if series_ticker:
params["series_ticker"] = series_ticker
if tickers:
params["tickers"] = tickers
if mve_filter:
params["mve_filter"] = mve_filter
try:
response = self.client.get("/markets", params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise Exception(f"API error: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
raise Exception(f"Network error: {str(e)}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
def format_market_summary(market: dict[str, Any]) -> str:
"""
Format a single market for display.
Args:
market: Market data dict
Returns:
Formatted string summary of the market
"""
ticker = market.get("ticker", "N/A")
title = market.get("title", "N/A")
status = market.get("status", "unknown")
# Get current prices
yes_bid = market.get("yes_bid", 0)
yes_ask = market.get("yes_ask", 0)
last_price = market.get("last_price", 0)
# Get volume
volume = market.get("volume", 0)
volume_24h = market.get("volume_24h", 0)
# Format status emoji
status_icon = "🟢" if status == "active" else "🔴" if status == "closed" else ""
# Build summary
lines = []
lines.append(f"{status_icon} {ticker}")
lines.append(f" {title[:70]}{'...' if len(title) > 70 else ''}")
if yes_bid or yes_ask:
lines.append(f" Price: {yes_bid}¢-{yes_ask}¢ (Last: {last_price}¢)")
if volume_24h > 0:
lines.append(f" 24h Vol: ${volume_24h/100:,.2f}")
elif volume > 0:
lines.append(f" Total Vol: ${volume/100:,.2f}")
return "\n".join(lines)
def format_markets_list(data: dict[str, Any]) -> str:
"""
Format markets list for human-readable output.
Args:
data: Response data with markets array
Returns:
Formatted string for display
"""
markets = data.get("markets", [])
cursor = data.get("cursor", "")
lines = []
lines.append("\nKalshi Markets")
lines.append("=" * 60)
lines.append(f"Found {len(markets)} markets")
lines.append("")
for i, market in enumerate(markets, 1):
lines.append(f"{i}. {format_market_summary(market)}")
lines.append("")
if cursor:
lines.append(f"📄 More results available. Use --cursor {cursor[:20]}...")
lines.append("")
return "\n".join(lines)
@click.command()
@click.option("--limit", default=10, type=int, help="Number of markets to return (1-1000)")
@click.option(
"--status", default="open", help="Market status (open, closed, settled, or comma-separated)"
)
@click.option("--event-ticker", help="Filter by event ticker")
@click.option("--series-ticker", help="Filter by series ticker")
@click.option("--tickers", help="Filter by market tickers (comma-separated)")
@click.option(
"--mve-filter", type=click.Choice(["only", "exclude"]), help="Multivariate events filter"
)
@click.option("--cursor", help="Pagination cursor for next page")
@click.option(
"--json", "output_json", is_flag=True, help="Output as JSON instead of human-readable format"
)
def main(
limit: int,
status: str,
event_ticker: str | None,
series_ticker: str | None,
tickers: str | None,
mve_filter: str | None,
cursor: str | None,
output_json: bool,
):
"""
List Kalshi prediction markets with filters.
Returns markets matching the specified criteria.
No authentication required.
"""
try:
# Validate limit
if limit < 1 or limit > 1000:
raise ValueError("Limit must be between 1 and 1000")
# Get markets from API
with KalshiClient() as client:
data = client.get_markets(
limit=limit,
status=status,
cursor=cursor,
event_ticker=event_ticker,
series_ticker=series_ticker,
tickers=tickers,
mve_filter=mve_filter,
)
# Output results
if output_json:
# JSON output for automation/MCP
click.echo(json.dumps(data, indent=2))
else:
# Human-readable output
formatted = format_markets_list(data)
click.echo(formatted)
sys.exit(0)
except Exception as e:
if output_json:
# JSON error format
error_data = {"error": str(e)}
click.echo(json.dumps(error_data, indent=2))
else:
# Human-readable error
click.echo(f"❌ Error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,205 @@
#!/usr/bin/env python3
# /// script
# dependencies = [
# "httpx",
# "click",
# ]
# ///
"""
Kalshi Market Orderbook Script
Get the current orderbook for a specific Kalshi prediction market.
Completely self-contained with embedded HTTP client.
Usage:
uv run orderbook.py TICKER
uv run orderbook.py TICKER --depth 5
uv run orderbook.py TICKER --json
"""
import json
import sys
from typing import Any
import click
import httpx
# Configuration
API_BASE_URL = "https://api.elections.kalshi.com/trade-api/v2"
API_TIMEOUT = 30.0 # seconds
USER_AGENT = "Kalshi-CLI/1.0"
class KalshiClient:
"""Minimal HTTP client for Kalshi API - orderbook functionality"""
def __init__(self):
"""Initialize HTTP client"""
self.client = httpx.Client(
base_url=API_BASE_URL, timeout=API_TIMEOUT, headers={"User-Agent": USER_AGENT}
)
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - cleanup"""
self.client.close()
def get_market_orderbook(self, ticker: str, depth: int = 10) -> dict[str, Any]:
"""
Get the orderbook for a specific market.
Args:
ticker: Market ticker symbol
depth: Orderbook depth (0 = all levels, 1-100 for specific depth)
Returns:
Dict with 'orderbook' containing yes/no bid arrays
Raises:
Exception if API call fails
"""
params = {}
if depth > 0:
params["depth"] = str(depth)
try:
response = self.client.get(f"/markets/{ticker}/orderbook", params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise Exception(f"Market not found: {ticker}")
raise Exception(f"API error: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
raise Exception(f"Network error: {str(e)}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
def format_orderbook_level(price: int, quantity: int) -> str:
"""Format a single orderbook level"""
return f" {price:>5}¢ │ {quantity:>10,} contracts"
def format_orderbook(ticker: str, data: dict[str, Any], depth: int) -> str:
"""
Format orderbook for human-readable output.
Args:
ticker: Market ticker
data: Response data with orderbook
depth: Display depth limit
Returns:
Formatted string for display
"""
orderbook = data.get("orderbook", {})
yes_orders = orderbook.get("yes", [])
no_orders = orderbook.get("no", [])
lines = []
lines.append("\n" + "=" * 60)
lines.append(f"📊 Orderbook for {ticker}")
lines.append("=" * 60)
# YES side
lines.append("\n✅ YES Side (Bids to buy YES):")
lines.append(" Price │ Quantity")
lines.append("" * 30)
if yes_orders:
display_count = depth if depth > 0 else len(yes_orders)
for order in yes_orders[:display_count]:
if len(order) >= 2:
lines.append(format_orderbook_level(order[0], order[1]))
if len(yes_orders) > display_count:
lines.append(f" ... {len(yes_orders) - display_count} more levels ...")
else:
lines.append(" (no orders)")
# NO side
lines.append("\n❌ NO Side (Bids to buy NO):")
lines.append(" Price │ Quantity")
lines.append("" * 30)
if no_orders:
display_count = depth if depth > 0 else len(no_orders)
for order in no_orders[:display_count]:
if len(order) >= 2:
lines.append(format_orderbook_level(order[0], order[1]))
if len(no_orders) > display_count:
lines.append(f" ... {len(no_orders) - display_count} more levels ...")
else:
lines.append(" (no orders)")
# Summary
lines.append("\n" + "" * 60)
# Calculate spread if we have orders
if yes_orders and no_orders:
yes_best = yes_orders[0][0] if yes_orders[0] else 0
implied_prob = yes_best # YES bid price is the implied probability
lines.append(f"📈 Implied Probability: {implied_prob}%")
lines.append(f"📊 Total Levels: YES={len(yes_orders)}, NO={len(no_orders)}")
lines.append("=" * 60)
return "\n".join(lines)
@click.command()
@click.argument("ticker")
@click.option(
"--depth", default=10, type=int, help="Orderbook depth (0=all levels, 1-100 for specific depth)"
)
@click.option(
"--json", "output_json", is_flag=True, help="Output as JSON instead of human-readable format"
)
def main(ticker: str, depth: int, output_json: bool):
"""
Get orderbook for a specific market.
TICKER is the market ticker symbol.
Shows bid levels for both YES and NO sides of the market.
No authentication required.
"""
try:
# Validate depth
if depth < 0 or depth > 100:
raise ValueError("Depth must be between 0 and 100")
# Get orderbook from API
with KalshiClient() as client:
data = client.get_market_orderbook(ticker, depth)
# Output results
if output_json:
# JSON output for automation/MCP
click.echo(json.dumps(data, indent=2))
else:
# Human-readable output
formatted = format_orderbook(ticker, data, depth)
click.echo(formatted)
sys.exit(0)
except Exception as e:
if output_json:
# JSON error format
error_data = {"error": str(e)}
click.echo(json.dumps(error_data, indent=2))
else:
# Human-readable error
click.echo(f"❌ Error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,449 @@
#!/usr/bin/env python3
# /// script
# dependencies = [
# "httpx",
# "click",
# "pandas",
# ]
# ///
"""
Kalshi Market Search Script
Search for markets by keyword across titles and descriptions.
Uses a local cache for fast, comprehensive searches across all markets.
The cache is built on first run (takes 2-5 minutes) and refreshed every 6 hours.
Subsequent searches are instant.
Usage:
uv run search.py bitcoin
uv run search.py "election"
uv run search.py keyword --limit 5
uv run search.py keyword --json
uv run search.py --rebuild-cache # Force cache rebuild
"""
import json
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
import click
import httpx
import pandas as pd
# Configuration
API_BASE_URL = "https://api.elections.kalshi.com/trade-api/v2"
API_TIMEOUT = 30.0 # seconds
USER_AGENT = "Kalshi-CLI/1.0"
# Use user cache directory for portable installation
# This works regardless of where the script is installed (marketplace, manual, development)
CACHE_DIR = Path.home() / ".cache" / "kalshi-markets"
CACHE_TTL_HOURS = 6
class KalshiSearchCache:
"""Embedded search cache functionality for fast market searches"""
def __init__(self):
"""Initialize cache manager"""
self.cache_dir = CACHE_DIR
self.cache_ttl = timedelta(hours=CACHE_TTL_HOURS)
self.df_cache = None
def get_cache_file(self) -> Path | None:
"""Get the most recent cache file"""
if not self.cache_dir.exists():
return None
# Look for CSV cache files with pattern kalshi_markets_*.csv
cache_files = list(self.cache_dir.glob("kalshi_markets_*.csv"))
if not cache_files:
return None
# Return the most recent file
return max(cache_files, key=lambda f: f.stat().st_mtime)
def is_cache_valid(self) -> bool:
"""Check if cache exists and is still valid"""
cache_file = self.get_cache_file()
if not cache_file:
return False
# Check cache age
cache_age = datetime.now() - datetime.fromtimestamp(cache_file.stat().st_mtime)
return cache_age < self.cache_ttl
def load_cache(self, quiet: bool = False) -> pd.DataFrame | None:
"""Load cache from disk if valid"""
if not self.is_cache_valid():
return None
cache_file = self.get_cache_file()
if not cache_file:
return None
try:
df = pd.read_csv(cache_file)
if not quiet:
print(f"[CACHE] Loaded {len(df)} markets from cache")
return df
except Exception as e:
if not quiet:
print(f"[CACHE] Failed to load cache: {e}")
return None
def save_cache(self, df: pd.DataFrame, quiet: bool = False):
"""Save DataFrame to cache"""
try:
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Use timestamp in filename like the CLI does
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
cache_file = self.cache_dir / f"kalshi_markets_{timestamp}.csv"
df.to_csv(cache_file, index=False)
if not quiet:
print(f"[CACHE] Saved {len(df)} markets to cache")
except Exception as e:
if not quiet:
print(f"[CACHE] Failed to save cache: {e}")
def build_cache(self, client: "KalshiClient", quiet: bool = False) -> pd.DataFrame:
"""Build complete market cache by fetching all series and their markets"""
if not quiet:
print("[CACHE BUILD] Starting market data collection...")
print("[CACHE BUILD] This may take 2-5 minutes on first run...")
print("[CACHE BUILD] Fetching all series and their markets...")
# Step 1: Fetch all series
if not quiet:
print("[CACHE BUILD] Step 1/2: Fetching series list...")
series_data = client.get_series_list()
all_series = series_data.get("series", [])
if not quiet:
print(f"[CACHE BUILD] Found {len(all_series)} series to process")
# Step 2: Fetch markets for each series
if not quiet:
print("[CACHE BUILD] Step 2/2: Fetching markets from each series...")
print("[CACHE BUILD] Filter: status='open' (active tradeable markets only)")
all_markets = []
series_with_markets = 0
errors = 0
for i, series in enumerate(all_series):
series_ticker = series.get("ticker")
series_title = series.get("title", "")
series_category = series.get("category", "")
if not quiet and (i + 1) % 100 == 0:
print(
f"[CACHE BUILD] Progress: {i + 1}/{len(all_series)} series ({100*(i+1)/len(all_series):.1f}%)"
)
print(f"[CACHE BUILD] Markets collected: {len(all_markets)}")
try:
# Fetch markets for this series (open markets only)
markets_data = client.get_markets(
limit=100, status="open", series_ticker=series_ticker
)
series_markets = markets_data.get("markets", [])
if series_markets:
series_with_markets += 1
# Add series info to each market
for market in series_markets:
market["series_ticker"] = series_ticker
market["series_title"] = series_title
market["series_category"] = series_category
all_markets.append(market)
except Exception:
errors += 1
if errors > 50:
if not quiet:
print(f"[CACHE BUILD] Too many errors ({errors}), stopping")
break
continue
if not quiet:
print("[CACHE BUILD] Collection complete!")
print(f"[CACHE BUILD] Total markets: {len(all_markets)}")
print(f"[CACHE BUILD] Series with markets: {series_with_markets}")
# Convert to DataFrame
df = pd.DataFrame(all_markets) if all_markets else pd.DataFrame()
# Save to cache
self.save_cache(df, quiet=quiet)
return df
def search(self, keyword: str, limit: int = 10, quiet: bool = False) -> list[dict[str, Any]]:
"""Search markets using cache"""
# Load or build cache
if self.df_cache is None:
self.df_cache = self.load_cache(quiet=quiet)
if self.df_cache is None:
# Need to build cache
if not quiet:
print("[CACHE] No valid cache found, building...")
with KalshiClient() as client:
self.df_cache = self.build_cache(client, quiet=quiet)
# Perform search
keyword_lower = keyword.lower()
# Create mask for matching rows (include series fields for better search)
mask = (
self.df_cache["title"].str.lower().str.contains(keyword_lower, na=False)
| self.df_cache["subtitle"].str.lower().str.contains(keyword_lower, na=False)
| self.df_cache["ticker"].str.lower().str.contains(keyword_lower, na=False)
)
# Add series fields if they exist in the DataFrame
if "series_title" in self.df_cache.columns:
mask = mask | self.df_cache["series_title"].str.lower().str.contains(
keyword_lower, na=False
)
if "series_ticker" in self.df_cache.columns:
mask = mask | self.df_cache["series_ticker"].str.lower().str.contains(
keyword_lower, na=False
)
# Get matching markets
matches = self.df_cache[mask]
# Sort by volume and limit results
if "volume_24h" in matches.columns:
matches = matches.sort_values("volume_24h", ascending=False)
matches = matches.head(limit)
# Convert back to list of dicts
return matches.to_dict("records")
class KalshiClient:
"""Minimal HTTP client for Kalshi API - search functionality"""
def __init__(self):
"""Initialize HTTP client"""
self.client = httpx.Client(
base_url=API_BASE_URL,
timeout=API_TIMEOUT,
headers={"User-Agent": USER_AGENT},
)
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - cleanup"""
self.client.close()
def get_series_list(self) -> dict[str, Any]:
"""
Get list of all series.
Returns:
Dict with 'series' array
Raises:
Exception if API call fails
"""
try:
response = self.client.get("/series")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise Exception(f"API error: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
raise Exception(f"Network error: {str(e)}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
def get_markets(
self,
limit: int = 100,
status: str | None = "open",
cursor: str = None,
series_ticker: str | None = None,
) -> dict[str, Any]:
"""
Get markets for searching.
Args:
limit: Number of markets to fetch
status: Market status filter (None for all statuses)
cursor: Pagination cursor
series_ticker: Filter by series ticker
Returns:
Dict with markets array and cursor
Raises:
Exception if API call fails
"""
params = {"limit": str(min(limit, 1000))}
if status is not None:
params["status"] = status
if cursor:
params["cursor"] = cursor
if series_ticker:
params["series_ticker"] = series_ticker
try:
response = self.client.get("/markets", params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise Exception(f"API error: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
raise Exception(f"Network error: {str(e)}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
def format_search_result(market: dict[str, Any], index: int) -> str:
"""Format a single search result"""
ticker = market.get("ticker", "N/A")
title = market.get("title", "N/A")
yes_bid = market.get("yes_bid", 0)
yes_ask = market.get("yes_ask", 0)
last_price = market.get("last_price", 0)
volume_24h = market.get("volume_24h", 0)
status = market.get("status", "unknown")
status_icon = "🟢" if status == "active" else "🔴"
lines = []
lines.append(f"{index}. {status_icon} {ticker}")
lines.append(f" {title[:70]}{'...' if len(title) > 70 else ''}")
if yes_bid or yes_ask:
lines.append(f" Price: Bid {yes_bid}¢ | Ask {yes_ask}¢ | Last {last_price}¢")
if volume_24h > 0:
lines.append(f" 24h Volume: ${volume_24h/100:,.2f}")
return "\n".join(lines)
def format_search_results(keyword: str, results: list[dict[str, Any]]) -> str:
"""
Format search results for human-readable output.
Args:
keyword: Search keyword
results: List of matching markets
Returns:
Formatted string for display
"""
lines = []
lines.append("\n" + "=" * 60)
lines.append(f"🔍 Search Results for '{keyword}'")
lines.append("=" * 60)
if not results:
lines.append("\nNo markets found matching your search.")
lines.append("\nTip: Try broader keywords.")
else:
lines.append(f"Found {len(results)} matching markets:\n")
for i, market in enumerate(results, 1):
lines.append(format_search_result(market, i))
lines.append("")
lines.append("" * 60)
lines.append("Note: Searches across all ~6900 markets using local cache.")
lines.append("Cache refreshes automatically every 6 hours.")
lines.append("=" * 60)
return "\n".join(lines)
@click.command()
@click.argument("keyword", required=False)
@click.option("--limit", default=10, type=int, help="Maximum number of results to return")
@click.option(
"--json",
"output_json",
is_flag=True,
help="Output as JSON instead of human-readable format",
)
@click.option("--rebuild-cache", is_flag=True, help="Force rebuild of the market cache")
def main(keyword: str, limit: int, output_json: bool, rebuild_cache: bool):
"""
Search for markets by keyword using cached data.
KEYWORD is the search term to look for in market titles and descriptions.
Uses a local cache for fast searches across all ~6900 markets.
Cache is built on first run (2-5 minutes) and refreshed every 6 hours.
No authentication required.
"""
try:
# Initialize cache
cache = KalshiSearchCache()
# Handle cache rebuild
if rebuild_cache:
if not output_json:
click.echo("Rebuilding market cache...")
# Delete existing cache files
if cache.cache_dir.exists():
for old_cache in cache.cache_dir.glob("kalshi_markets_*.csv"):
old_cache.unlink()
# Rebuild
with KalshiClient() as client:
cache.build_cache(client, quiet=output_json)
if not output_json:
click.echo("✅ Cache rebuilt successfully!")
if not keyword:
sys.exit(0)
# Require keyword for search
if not keyword:
raise ValueError("Keyword is required for search (or use --rebuild-cache)")
if not keyword.strip():
raise ValueError("Keyword cannot be empty")
# Search using cache
results = cache.search(keyword, limit=limit, quiet=output_json)
# Output results
if output_json:
# JSON output for automation/MCP
click.echo(json.dumps(results, indent=2))
else:
# Human-readable output
formatted = format_search_results(keyword, results)
click.echo(formatted)
sys.exit(0)
except Exception as e:
if output_json:
# JSON error format
error_data = {"error": str(e)}
click.echo(json.dumps(error_data, indent=2))
else:
# Human-readable error
click.echo(f"❌ Error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,196 @@
#!/usr/bin/env python3
# /// script
# dependencies = [
# "httpx",
# "click",
# ]
# ///
"""
Kalshi Series Details Script
Get detailed information about a specific series.
Completely self-contained with embedded HTTP client.
Usage:
uv run series.py SERIES_TICKER
uv run series.py KXHIGHNY
uv run series.py SERIES_TICKER --json
"""
import json
import sys
import textwrap
from typing import Any
import click
import httpx
# Configuration
API_BASE_URL = "https://api.elections.kalshi.com/trade-api/v2"
API_TIMEOUT = 30.0 # seconds
USER_AGENT = "Kalshi-CLI/1.0"
class KalshiClient:
"""Minimal HTTP client for Kalshi API - series details functionality"""
def __init__(self):
"""Initialize HTTP client"""
self.client = httpx.Client(
base_url=API_BASE_URL, timeout=API_TIMEOUT, headers={"User-Agent": USER_AGENT}
)
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - cleanup"""
self.client.close()
def get_series(self, series_ticker: str) -> dict[str, Any]:
"""
Get detailed information about a specific series.
Args:
series_ticker: Series ticker identifier
Returns:
Series details including metadata and settlement sources
Raises:
Exception if API call fails
"""
try:
response = self.client.get(f"/series/{series_ticker}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise Exception(f"Series not found: {series_ticker}")
raise Exception(f"API error: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
raise Exception(f"Network error: {str(e)}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
def format_series_detail(data: dict[str, Any]) -> str:
"""
Format series details for human-readable output.
Args:
data: Series data from API
Returns:
Formatted string for display
"""
# Handle nested series object if present
series = data.get("series", data)
lines = []
lines.append("\n" + "=" * 60)
lines.append(f"📚 Series: {series.get('ticker', 'N/A')}")
lines.append("=" * 60)
# Title
title = series.get("title", "N/A")
lines.append(f"\n📌 {title}")
# Basic info
category = series.get("category", "N/A")
frequency = series.get("frequency", "N/A")
lines.append(f"\n📂 Category: {category}")
lines.append(f"🔄 Frequency: {frequency}")
# Tags
tags = series.get("tags", [])
if tags:
lines.append(f"🏷️ Tags: {', '.join(tags)}")
# Contract specifications
contract_url = series.get("contract_url")
if contract_url:
lines.append("\n📄 Contract URL:")
lines.append(f" {contract_url}")
# Description
description = series.get("description", "").strip()
if description:
lines.append("\n📝 Description:")
# Wrap long description text
wrapped = textwrap.wrap(description, width=70)
for line in wrapped[:10]: # Limit to 10 lines
lines.append(f" {line}")
if len(wrapped) > 10:
lines.append(" ...")
# Settlement sources
sources = series.get("settlement_sources", [])
if sources:
lines.append("\n⚖️ Settlement Sources:")
for source in sources:
name = source.get("name", "N/A")
url = source.get("url", "N/A")
lines.append(f"{name}")
if url != "N/A":
lines.append(f" {url}")
# Rules
rules_primary = series.get("rules_primary", "").strip()
if rules_primary:
lines.append("\n📋 Rules:")
wrapped = textwrap.wrap(rules_primary, width=70)
for line in wrapped[:8]: # Limit to 8 lines
lines.append(f" {line}")
if len(wrapped) > 8:
lines.append(" ...")
lines.append("\n" + "=" * 60)
return "\n".join(lines)
@click.command()
@click.argument("series_ticker")
@click.option(
"--json", "output_json", is_flag=True, help="Output as JSON instead of human-readable format"
)
def main(series_ticker: str, output_json: bool):
"""
Get information about a specific series.
SERIES_TICKER is the series identifier (e.g., KXHIGHNY).
Series are templates used to create markets.
No authentication required.
"""
try:
# Get series details from API
with KalshiClient() as client:
data = client.get_series(series_ticker)
# Output results
if output_json:
# JSON output for automation/MCP
click.echo(json.dumps(data, indent=2))
else:
# Human-readable output
formatted = format_series_detail(data)
click.echo(formatted)
sys.exit(0)
except Exception as e:
if output_json:
# JSON error format
error_data = {"error": str(e)}
click.echo(json.dumps(error_data, indent=2))
else:
# Human-readable error
click.echo(f"❌ Error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,206 @@
#!/usr/bin/env python3
# /// script
# dependencies = [
# "httpx",
# "click",
# ]
# ///
"""
Kalshi Series List Script
List all available series (market templates).
Completely self-contained with embedded HTTP client.
Usage:
uv run series_list.py
uv run series_list.py --category Politics
uv run series_list.py --json
"""
import json
import sys
from typing import Any
import click
import httpx
# Configuration
API_BASE_URL = "https://api.elections.kalshi.com/trade-api/v2"
API_TIMEOUT = 60.0 # Longer timeout for large series list
USER_AGENT = "Kalshi-CLI/1.0"
class KalshiClient:
"""Minimal HTTP client for Kalshi API - series list functionality"""
def __init__(self):
"""Initialize HTTP client"""
self.client = httpx.Client(
base_url=API_BASE_URL, timeout=API_TIMEOUT, headers={"User-Agent": USER_AGENT}
)
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - cleanup"""
self.client.close()
def get_series_list(
self, category: str | None = None, tags: str | None = None
) -> dict[str, Any]:
"""
Get list of all series.
Args:
category: Filter by category
tags: Filter by tags (comma-separated)
Returns:
Dict with 'series' array
Raises:
Exception if API call fails
"""
params = {}
if category:
params["category"] = category
if tags:
params["tags"] = tags
try:
response = self.client.get("/series", params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise Exception(f"API error: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
raise Exception(f"Network error: {str(e)}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
def format_series_summary(series: dict[str, Any]) -> str:
"""Format a single series for display"""
ticker = series.get("ticker", "N/A")
title = series.get("title", "N/A")
category = series.get("category", "N/A")
frequency = series.get("frequency", "N/A")
tags = series.get("tags", [])
lines = []
lines.append(f"📈 {ticker}")
lines.append(f" {title[:60]}{'...' if len(title) > 60 else ''}")
lines.append(f" Category: {category} | Frequency: {frequency}")
if tags:
tags_str = ", ".join(tags[:3])
if len(tags) > 3:
tags_str += f" (+{len(tags)-3} more)"
lines.append(f" Tags: {tags_str}")
return "\n".join(lines)
def format_series_list(data: dict[str, Any], limit: int = 50) -> str:
"""
Format series list for human-readable output.
Args:
data: Response data with series array
limit: Number of series to display (rest shown as count)
Returns:
Formatted string for display
"""
series_list = data.get("series", [])
total_count = len(series_list)
lines = []
lines.append("\n" + "=" * 60)
lines.append("📚 Kalshi Series (Market Templates)")
lines.append("=" * 60)
lines.append(f"Total: {total_count} series available\n")
# Group by category
categories = {}
for series in series_list:
cat = series.get("category", "Uncategorized")
if cat not in categories:
categories[cat] = []
categories[cat].append(series)
# Show summary by category
lines.append("📂 By Category:")
lines.append("" * 40)
for cat in sorted(categories.keys()):
count = len(categories[cat])
lines.append(f" {cat}: {count} series")
lines.append("")
# Show first N series
display_count = min(limit, total_count)
lines.append(f"📋 First {display_count} Series:")
lines.append("" * 60)
for i, series in enumerate(series_list[:display_count], 1):
lines.append(f"{i}. {format_series_summary(series)}")
lines.append("")
if total_count > display_count:
lines.append(f"... and {total_count - display_count} more series")
lines.append("\nTip: Use --json to get full list for processing")
lines.append("=" * 60)
return "\n".join(lines)
@click.command()
@click.option("--category", help="Filter by category (e.g., Politics, Economics)")
@click.option("--tags", help="Filter by tags (comma-separated)")
@click.option(
"--limit", default=50, type=int, help="Number of series to display in human-readable mode"
)
@click.option(
"--json", "output_json", is_flag=True, help="Output as JSON instead of human-readable format"
)
def main(category: str | None, tags: str | None, limit: int, output_json: bool):
"""
List all available series (market templates).
Series are templates for creating markets. There are ~6900 series available.
No authentication required.
Note: This returns a large dataset. Use filters to narrow results.
"""
try:
# Get series list from API
with KalshiClient() as client:
data = client.get_series_list(category=category, tags=tags)
# Output results
if output_json:
# JSON output for automation/MCP
click.echo(json.dumps(data, indent=2))
else:
# Human-readable output (limited display)
formatted = format_series_list(data, limit=limit)
click.echo(formatted)
sys.exit(0)
except Exception as e:
if output_json:
# JSON error format
error_data = {"error": str(e)}
click.echo(json.dumps(error_data, indent=2))
else:
# Human-readable error
click.echo(f"❌ Error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,157 @@
#!/usr/bin/env python3
# /// script
# dependencies = [
# "httpx",
# "click",
# ]
# ///
"""
Kalshi Exchange Status Script
Check if the Kalshi exchange and trading are active.
Completely self-contained with embedded HTTP client.
Usage:
uv run status.py
uv run status.py --json
"""
import json
import sys
from typing import Any
import click
import httpx
# Configuration
API_BASE_URL = "https://api.elections.kalshi.com/trade-api/v2"
API_TIMEOUT = 30.0 # seconds
USER_AGENT = "Kalshi-CLI/1.0"
class KalshiClient:
"""Minimal HTTP client for Kalshi API - just what we need for status"""
def __init__(self):
"""Initialize HTTP client"""
self.client = httpx.Client(
base_url=API_BASE_URL, timeout=API_TIMEOUT, headers={"User-Agent": USER_AGENT}
)
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - cleanup"""
self.client.close()
def get_exchange_status(self) -> dict[str, Any]:
"""
Get the current exchange status.
Returns:
Dict with exchange_active, trading_active, and estimated_resume_time
Raises:
Exception if API call fails
"""
try:
response = self.client.get("/exchange/status")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise Exception(f"API error: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
raise Exception(f"Network error: {str(e)}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
def format_status(status_data: dict[str, Any]) -> str:
"""
Format status data for human-readable output.
Args:
status_data: The status response from API
Returns:
Formatted string for display
"""
lines = []
lines.append("\nKalshi Exchange Status")
lines.append("=" * 40)
exchange_active = status_data.get("exchange_active", None)
trading_active = status_data.get("trading_active", None)
resume_time = status_data.get("exchange_estimated_resume_time")
# Exchange status
if exchange_active is True:
lines.append("📈 Exchange: ACTIVE ✓")
elif exchange_active is False:
lines.append("🔴 Exchange: INACTIVE ✗")
else:
lines.append("❓ Exchange: Unknown")
# Trading status
if trading_active is True:
lines.append("💹 Trading: ACTIVE ✓")
elif trading_active is False:
lines.append("🔴 Trading: INACTIVE ✗")
else:
lines.append("❓ Trading: Unknown")
# Resume time if available
if resume_time:
lines.append(f"🕐 Resume: {resume_time}")
lines.append("=" * 40)
return "\n".join(lines)
@click.command()
@click.option(
"--json", "output_json", is_flag=True, help="Output as JSON instead of human-readable format"
)
def main(output_json: bool):
"""
Check Kalshi exchange status.
Returns current exchange and trading status.
No authentication required.
"""
try:
# Get status from API
with KalshiClient() as client:
status_data = client.get_exchange_status()
# Output results
if output_json:
# JSON output for automation/MCP
click.echo(json.dumps(status_data, indent=2))
else:
# Human-readable output
formatted = format_status(status_data)
click.echo(formatted)
# Exit code based on status
if status_data.get("trading_active"):
sys.exit(0)
else:
sys.exit(1) # Non-zero if trading inactive
except Exception as e:
if output_json:
# JSON error format
error_data = {"error": str(e)}
click.echo(json.dumps(error_data, indent=2))
else:
# Human-readable error
click.echo(f"❌ Error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,224 @@
#!/usr/bin/env python3
# /// script
# dependencies = [
# "httpx",
# "click",
# ]
# ///
"""
Kalshi Recent Trades Script
Get recent trades across all markets or for a specific market.
Completely self-contained with embedded HTTP client.
Usage:
uv run trades.py
uv run trades.py --limit 20
uv run trades.py --ticker MARKET_TICKER
uv run trades.py --json
"""
import json
import sys
from datetime import datetime
from typing import Any
import click
import httpx
# Configuration
API_BASE_URL = "https://api.elections.kalshi.com/trade-api/v2"
API_TIMEOUT = 30.0 # seconds
USER_AGENT = "Kalshi-CLI/1.0"
class KalshiClient:
"""Minimal HTTP client for Kalshi API - trades functionality"""
def __init__(self):
"""Initialize HTTP client"""
self.client = httpx.Client(
base_url=API_BASE_URL, timeout=API_TIMEOUT, headers={"User-Agent": USER_AGENT}
)
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - cleanup"""
self.client.close()
def get_trades(
self,
limit: int = 10,
ticker: str | None = None,
min_ts: int | None = None,
max_ts: int | None = None,
cursor: str | None = None,
) -> dict[str, Any]:
"""
Get recent trades.
Args:
limit: Number of trades to return (1-1000)
ticker: Filter by market ticker
min_ts: Filter trades after this Unix timestamp
max_ts: Filter trades before this Unix timestamp
cursor: Pagination cursor
Returns:
Dict with 'trades' array and optional 'cursor'
Raises:
Exception if API call fails
"""
params = {"limit": str(limit)}
if ticker:
params["ticker"] = ticker
if min_ts is not None:
params["min_ts"] = str(min_ts)
if max_ts is not None:
params["max_ts"] = str(max_ts)
if cursor:
params["cursor"] = cursor
try:
response = self.client.get("/markets/trades", params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise Exception(f"API error: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
raise Exception(f"Network error: {str(e)}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
def format_timestamp(timestamp_str: str) -> str:
"""Format ISO timestamp to readable string"""
if not timestamp_str:
return "N/A"
try:
dt = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d %H:%M:%S UTC")
except (ValueError, TypeError):
return timestamp_str
def format_trade(trade: dict[str, Any], index: int) -> str:
"""Format a single trade for display"""
ticker = trade.get("ticker", "N/A")
yes_price = trade.get("yes_price", 0)
no_price = trade.get("no_price", 0)
count = trade.get("count", 0)
created = format_timestamp(trade.get("created_time", ""))
# Determine trade side
if yes_price > 0:
price = yes_price
side = "YES"
emoji = ""
else:
price = no_price
side = "NO"
emoji = ""
lines = []
lines.append(f"{index}. {emoji} {ticker}")
lines.append(f" {side} @ {price}¢ | {count:,} contracts")
lines.append(f" {created}")
return "\n".join(lines)
def format_trades_list(data: dict[str, Any]) -> str:
"""
Format trades list for human-readable output.
Args:
data: Response data with trades array
Returns:
Formatted string for display
"""
trades = data.get("trades", [])
cursor = data.get("cursor", "")
lines = []
lines.append("\n" + "=" * 60)
lines.append("📊 Recent Trades")
lines.append("=" * 60)
lines.append(f"Found {len(trades)} trades\n")
for i, trade in enumerate(trades, 1):
lines.append(format_trade(trade, i))
lines.append("") # Blank line between trades
if cursor:
lines.append("" * 60)
lines.append(f"📄 More results available. Use --cursor {cursor[:20]}...")
lines.append("=" * 60)
return "\n".join(lines)
@click.command()
@click.option("--limit", default=10, type=int, help="Number of trades to return (1-1000)")
@click.option("--ticker", help="Filter trades for specific market ticker")
@click.option("--min-ts", type=int, help="Filter trades after this Unix timestamp")
@click.option("--max-ts", type=int, help="Filter trades before this Unix timestamp")
@click.option("--cursor", help="Pagination cursor for next page")
@click.option(
"--json", "output_json", is_flag=True, help="Output as JSON instead of human-readable format"
)
def main(
limit: int,
ticker: str | None,
min_ts: int | None,
max_ts: int | None,
cursor: str | None,
output_json: bool,
):
"""
Get recent trades across all markets or for a specific market.
Shows trade price, size, and timestamp for recent market activity.
No authentication required.
"""
try:
# Validate limit
if limit < 1 or limit > 1000:
raise ValueError("Limit must be between 1 and 1000")
# Get trades from API
with KalshiClient() as client:
data = client.get_trades(
limit=limit, ticker=ticker, min_ts=min_ts, max_ts=max_ts, cursor=cursor
)
# Output results
if output_json:
# JSON output for automation/MCP
click.echo(json.dumps(data, indent=2))
else:
# Human-readable output
formatted = format_trades_list(data)
click.echo(formatted)
sys.exit(0)
except Exception as e:
if output_json:
# JSON error format
error_data = {"error": str(e)}
click.echo(json.dumps(error_data, indent=2))
else:
# Human-readable error
click.echo(f"❌ Error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()