Files
gh-jeremylongshore-claude-c…/commands/analyze-chain.md
2025-11-30 08:17:46 +08:00

60 KiB

description, shortcut, tags, version, author
description shortcut tags version author
Comprehensive blockchain metrics monitoring and on-chain analytics system ac
blockchain
on-chain
metrics
whale-tracking
network-analysis
crypto-analytics
2.0.0 claude-code-plugins

Analyze On-Chain Data

Comprehensive blockchain metrics monitoring system that tracks network health, holder distribution, whale movements, exchange flows, supply metrics, and transaction velocity across multiple blockchain networks.

When to Use

Use this command when you need to:

  • Monitor Network Health: Track hash rate, difficulty, active addresses, and transaction throughput
  • Analyze Holder Distribution: Identify whale concentration, wealth distribution, and accumulation patterns
  • Track Exchange Flows: Monitor deposit/withdrawal patterns, exchange reserves, and liquidity movements
  • Analyze Supply Metrics: Track circulating supply, staked amounts, burned tokens, and inflation rates
  • Measure Transaction Velocity: Calculate token circulation speed, active address growth, and network utilization
  • Bitcoin UTXO Analysis: Track unspent transaction outputs, UTXO age distribution, and holder behavior
  • Ethereum Gas Analytics: Monitor gas prices, network congestion, and smart contract activity
  • Whale Movement Alerts: Real-time tracking of large transactions and address clustering
  • Market Research: Data-driven insights for trading strategies and investment decisions
  • Compliance Monitoring: Track sanctioned addresses, suspicious patterns, and regulatory requirements

DON'T Use When

  • Real-time Trading: Use dedicated trading APIs for sub-second execution
  • Price Prediction: On-chain metrics show trends, not future prices
  • Legal Advice: Consult blockchain forensics experts for legal cases
  • Single Transaction Tracking: Use block explorers for individual transactions
  • Historical Data Older Than 7 Years: Most APIs have limited historical depth

Design Decisions

1. Multi-Chain Architecture vs Single Chain Focus

Chosen Approach: Multi-chain with chain-specific adapters

Why: Different blockchains require different metrics and APIs. Bitcoin uses UTXO model while Ethereum uses account-based model. A unified interface with chain-specific implementations provides flexibility while maintaining consistent output formats.

Alternatives Considered:

  • Single Chain Only: Rejected due to limited market coverage
  • Chain-Agnostic Generic Metrics: Rejected due to loss of blockchain-specific insights
  • Separate Tools Per Chain: Rejected due to code duplication and maintenance burden

2. Data Source Strategy: APIs vs Full Node

Chosen Approach: Hybrid API-based with optional full node support

Why: Most users don't run full nodes. Using APIs (Etherscan, Blockchain.com, Glassnode) provides immediate access without infrastructure requirements. Full node support remains optional for advanced users needing maximum decentralization.

Alternatives Considered:

  • Full Node Required: Rejected due to high infrastructure costs ($500+/month)
  • APIs Only: Considered but added full node option for enterprise users
  • Light Client Sync: Rejected due to incomplete data and sync time requirements

3. Whale Detection Threshold Method

Chosen Approach: Dynamic percentile-based thresholds with absolute minimums

Why: Static thresholds (e.g., "1000 BTC = whale") become outdated as prices change. Using percentile-based detection (top 0.1% holders) with absolute minimums (e.g., $1M USD equivalent) adapts to market conditions.

Alternatives Considered:

  • Fixed Token Amounts: Rejected due to price volatility making thresholds obsolete
  • USD Value Only: Rejected as it misses on-chain concentration patterns
  • Machine Learning Clustering: Rejected due to complexity vs accuracy tradeoff

4. Data Storage: Time-Series Database vs Relational

Chosen Approach: Time-series database (InfluxDB) for metrics, PostgreSQL for metadata

Why: On-chain metrics are time-stamped numerical data perfect for time-series databases. InfluxDB provides efficient storage, fast queries, and built-in downsampling. PostgreSQL stores wallet metadata, labels, and relationships.

Alternatives Considered:

  • PostgreSQL Only: Rejected due to poor time-series query performance
  • InfluxDB Only: Rejected due to poor relational data handling
  • ElasticSearch: Rejected due to higher complexity and resource requirements

5. Real-Time vs Batch Processing

Chosen Approach: Hybrid batch + real-time for whale alerts

Why: Most metrics update every 10-15 minutes (blockchain confirmation time). Batch processing handles historical data efficiently. Real-time processing monitors mempool for large transactions requiring immediate alerts.

Alternatives Considered:

  • Batch Only: Rejected due to delayed whale movement detection
  • Real-Time Only: Rejected due to high API costs and rate limits
  • Streaming-First Architecture: Rejected as overkill for blockchain data latency

Prerequisites

1. Blockchain API Access

Ethereum:

# Etherscan API (Free tier: 5 calls/second)
export ETHERSCAN_API_KEY="your_etherscan_key"
export ETHERSCAN_ENDPOINT="https://api.etherscan.io/api"

# Infura for direct node access (100k requests/day free)
export INFURA_PROJECT_ID="your_infura_project_id"
export INFURA_ENDPOINT="https://mainnet.infura.io/v3/${INFURA_PROJECT_ID}"

Bitcoin:

# Blockchain.com API (No key required, rate limited)
export BLOCKCHAIN_API_ENDPOINT="https://blockchain.info"

# Blockchair API (10k requests/day free)
export BLOCKCHAIR_API_KEY="your_blockchair_key"
export BLOCKCHAIR_ENDPOINT="https://api.blockchair.com"

Premium Data Providers:

# Glassnode (Professional tier required, $800+/month)
export GLASSNODE_API_KEY="your_glassnode_key"

# CoinMetrics (Enterprise tier, contact for pricing)
export COINMETRICS_API_KEY="your_coinmetrics_key"

2. Python Dependencies

pip install web3==6.11.0 \
            requests==2.31.0 \
            pandas==2.1.3 \
            influxdb-client==1.38.0 \
            psycopg2-binary==2.9.9 \
            python-dotenv==1.0.0 \
            aiohttp==3.9.1 \
            tenacity==8.2.3 \
            pydantic==2.5.0 \
            pytz==2023.3

3. Database Infrastructure

InfluxDB Setup:

# Docker installation
docker run -d -p 8086:8086 \
    -v influxdb-data:/var/lib/influxdb2 \
    -e DOCKER_INFLUXDB_INIT_MODE=setup \
    -e DOCKER_INFLUXDB_INIT_USERNAME=admin \
    -e DOCKER_INFLUXDB_INIT_PASSWORD=secure_password_here \
    -e DOCKER_INFLUXDB_INIT_ORG=crypto-analytics \
    -e DOCKER_INFLUXDB_INIT_BUCKET=onchain-metrics \
    influxdb:2.7

PostgreSQL Setup:

# Docker installation
docker run -d -p 5432:5432 \
    -v postgres-data:/var/lib/postgresql/data \
    -e POSTGRES_DB=blockchain_metadata \
    -e POSTGRES_USER=crypto_analyst \
    -e POSTGRES_PASSWORD=secure_password_here \
    postgres:16

4. Configuration File

Create config/chains.yaml:

ethereum:
  chain_id: 1
  rpc_endpoint: "${INFURA_ENDPOINT}"
  api_endpoint: "${ETHERSCAN_ENDPOINT}"
  api_key: "${ETHERSCAN_API_KEY}"
  whale_threshold_usd: 1000000
  whale_threshold_percentile: 0.1
  block_time: 12

bitcoin:
  rpc_endpoint: null
  api_endpoint: "${BLOCKCHAIR_ENDPOINT}"
  api_key: "${BLOCKCHAIR_API_KEY}"
  whale_threshold_usd: 1000000
  whale_threshold_percentile: 0.1
  block_time: 600

5. System Requirements

  • CPU: 4+ cores (8+ recommended for multi-chain)
  • RAM: 8GB minimum (16GB recommended)
  • Storage: 100GB SSD (time-series data grows ~1GB/day with full metrics)
  • Network: Stable connection with 10+ Mbps (API rate limits are the bottleneck)

Implementation Process

Step 1: Initialize Multi-Chain Client

from web3 import Web3
import aiohttp
from typing import Dict, Optional
import logging

class ChainClient:
    """Base class for blockchain clients"""

    def __init__(self, config: Dict):
        self.config = config
        self.logger = logging.getLogger(self.__class__.__name__)

    async def get_network_metrics(self) -> Dict:
        raise NotImplementedError

    async def get_holder_distribution(self, token: str) -> Dict:
        raise NotImplementedError

class EthereumClient(ChainClient):
    """Ethereum-specific implementation"""

    def __init__(self, config: Dict):
        super().__init__(config)
        self.w3 = Web3(Web3.HTTPProvider(config['rpc_endpoint']))
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()

    async def get_latest_block(self) -> int:
        """Get latest block number"""
        return self.w3.eth.block_number

    async def get_network_metrics(self) -> Dict:
        """Fetch Ethereum network metrics"""
        latest_block = await self.get_latest_block()

        # Get last 100 blocks for gas analysis
        gas_prices = []
        block_times = []

        for i in range(100):
            block = self.w3.eth.get_block(latest_block - i)
            gas_prices.append(Web3.from_wei(block.gasUsed, 'gwei'))
            if i > 0:
                prev_block = self.w3.eth.get_block(latest_block - i + 1)
                block_times.append(block.timestamp - prev_block.timestamp)

        return {
            'chain': 'ethereum',
            'latest_block': latest_block,
            'avg_gas_price_gwei': sum(gas_prices) / len(gas_prices),
            'median_gas_price_gwei': sorted(gas_prices)[len(gas_prices) // 2],
            'avg_block_time': sum(block_times) / len(block_times),
            'network_hashrate': await self._get_hashrate(),
            'active_addresses_24h': await self._get_active_addresses()
        }

    async def _get_hashrate(self) -> Optional[float]:
        """Get network hashrate from Etherscan"""
        try:
            url = f"{self.config['api_endpoint']}?module=stats&action=chainsize&apikey={self.config['api_key']}"
            async with self.session.get(url) as response:
                data = await response.json()
                return float(data['result'])
        except Exception as e:
            self.logger.error(f"Failed to fetch hashrate: {e}")
            return None

    async def _get_active_addresses(self) -> Optional[int]:
        """Get 24h active addresses from Etherscan"""
        try:
            url = f"{self.config['api_endpoint']}?module=stats&action=dailytx&apikey={self.config['api_key']}"
            async with self.session.get(url) as response:
                data = await response.json()
                if data['status'] == '1':
                    return int(data['result'][0]['uniqueAddresses'])
        except Exception as e:
            self.logger.error(f"Failed to fetch active addresses: {e}")
            return None

Step 2: Implement Holder Distribution Analysis

from collections import defaultdict
from decimal import Decimal

class HolderAnalytics:
    """Analyze token holder distribution and concentration"""

    def __init__(self, client: ChainClient):
        self.client = client
        self.logger = logging.getLogger(self.__class__.__name__)

    async def analyze_distribution(self, token_address: str) -> Dict:
        """Analyze holder distribution with whale detection"""

        # Fetch all holders (this is a simplified version)
        holders = await self._fetch_all_holders(token_address)

        if not holders:
            return {'error': 'No holder data available'}

        # Calculate distribution metrics
        balances = [h['balance'] for h in holders]
        total_supply = sum(balances)

        # Sort by balance descending
        holders.sort(key=lambda x: x['balance'], reverse=True)

        # Calculate concentration metrics
        top_10_balance = sum(h['balance'] for h in holders[:10])
        top_100_balance = sum(h['balance'] for h in holders[:100])
        top_1000_balance = sum(h['balance'] for h in holders[:1000])

        # Whale threshold (top 0.1% or $1M+ USD)
        whale_threshold_tokens = self._calculate_whale_threshold(
            holders,
            total_supply,
            self.client.config['whale_threshold_percentile']
        )

        whales = [h for h in holders if h['balance'] >= whale_threshold_tokens]

        # Calculate Gini coefficient (wealth inequality)
        gini = self._calculate_gini(balances)

        # Distribution buckets
        buckets = self._create_distribution_buckets(holders, total_supply)

        return {
            'token_address': token_address,
            'total_holders': len(holders),
            'total_supply': total_supply,
            'concentration': {
                'top_10_percent': (top_10_balance / total_supply) * 100,
                'top_100_percent': (top_100_balance / total_supply) * 100,
                'top_1000_percent': (top_1000_balance / total_supply) * 100,
                'gini_coefficient': gini
            },
            'whales': {
                'count': len(whales),
                'total_balance': sum(w['balance'] for w in whales),
                'percent_of_supply': (sum(w['balance'] for w in whales) / total_supply) * 100,
                'threshold_tokens': whale_threshold_tokens
            },
            'distribution_buckets': buckets
        }

    async def _fetch_all_holders(self, token_address: str) -> list:
        """Fetch all token holders (implementation depends on API)"""
        # This would use Etherscan's token holder API
        holders = []
        page = 1

        while True:
            url = f"{self.client.config['api_endpoint']}?module=token&action=tokenholderlist&contractaddress={token_address}&page={page}&offset=1000&apikey={self.client.config['api_key']}"

            async with self.client.session.get(url) as response:
                data = await response.json()

                if data['status'] != '1' or not data['result']:
                    break

                for holder in data['result']:
                    holders.append({
                        'address': holder['TokenHolderAddress'],
                        'balance': int(holder['TokenHolderQuantity'])
                    })

                page += 1

                # Rate limiting
                await asyncio.sleep(0.2)  # 5 requests per second

        return holders

    def _calculate_whale_threshold(self, holders: list, total_supply: float, percentile: float) -> float:
        """Calculate dynamic whale threshold"""
        sorted_balances = sorted([h['balance'] for h in holders], reverse=True)
        index = int(len(sorted_balances) * percentile / 100)
        return sorted_balances[min(index, len(sorted_balances) - 1)]

    def _calculate_gini(self, balances: list) -> float:
        """Calculate Gini coefficient (0=perfect equality, 1=perfect inequality)"""
        sorted_balances = sorted(balances)
        n = len(sorted_balances)
        cumsum = 0

        for i, balance in enumerate(sorted_balances):
            cumsum += (2 * (i + 1) - n - 1) * balance

        return cumsum / (n * sum(sorted_balances))

    def _create_distribution_buckets(self, holders: list, total_supply: float) -> Dict:
        """Create balance distribution buckets"""
        buckets = {
            '0-0.01%': 0,
            '0.01-0.1%': 0,
            '0.1-1%': 0,
            '1-10%': 0,
            '10%+': 0
        }

        for holder in holders:
            percent = (holder['balance'] / total_supply) * 100

            if percent < 0.01:
                buckets['0-0.01%'] += 1
            elif percent < 0.1:
                buckets['0.01-0.1%'] += 1
            elif percent < 1:
                buckets['0.1-1%'] += 1
            elif percent < 10:
                buckets['1-10%'] += 1
            else:
                buckets['10%+'] += 1

        return buckets

Step 3: Track Whale Movements and Exchange Flows

from datetime import datetime, timedelta
import asyncio

class WhaleTracker:
    """Real-time whale movement tracking"""

    def __init__(self, client: ChainClient, db_client):
        self.client = client
        self.db = db_client
        self.logger = logging.getLogger(self.__class__.__name__)
        self.known_exchanges = self._load_exchange_addresses()

    def _load_exchange_addresses(self) -> Dict[str, str]:
        """Load known exchange addresses"""
        return {
            '0x3f5ce5fbfe3e9af3971dd833d26ba9b5c936f0be': 'Binance 1',
            '0xd551234ae421e3bcba99a0da6d736074f22192ff': 'Binance 2',
            '0x28c6c06298d514db089934071355e5743bf21d60': 'Binance 14',
            '0x21a31ee1afc51d94c2efccaa2092ad1028285549': 'Binance 15',
            '0x564286362092d8e7936f0549571a803b203aaced': 'Binance 16',
            '0x0681d8db095565fe8a346fa0277bffde9c0edbbf': 'Binance 17',
            '0x4e9ce36e442e55ecd9025b9a6e0d88485d628a67': 'Binance 18',
            '0xbe0eb53f46cd790cd13851d5eff43d12404d33e8': 'Binance 19',
            '0xf977814e90da44bfa03b6295a0616a897441acec': 'Binance 8',
            # Coinbase
            '0x71660c4005ba85c37ccec55d0c4493e66fe775d3': 'Coinbase 1',
            '0x503828976d22510aad0201ac7ec88293211d23da': 'Coinbase 2',
            '0xddfabcdc4d8ffc6d5beaf154f18b778f892a0740': 'Coinbase 3',
            '0x3cd751e6b0078be393132286c442345e5dc49699': 'Coinbase 4',
            '0xb5d85cbf7cb3ee0d56b3bb207d5fc4b82f43f511': 'Coinbase 5',
            '0xeb2629a2734e272bcc07bda959863f316f4bd4cf': 'Coinbase 6',
            # Kraken
            '0x2910543af39aba0cd09dbb2d50200b3e800a63d2': 'Kraken 1',
            '0x0a869d79a7052c7f1b55a8ebabbea3420f0d1e13': 'Kraken 2',
            '0xe853c56864a2ebe4576a807d26fdc4a0ada51919': 'Kraken 3',
            '0x267be1c1d684f78cb4f6a176c4911b741e4ffdc0': 'Kraken 4',
        }

    async def track_movements(self, min_value_usd: float = 100000) -> list:
        """Track large transactions in real-time"""
        movements = []
        latest_block = await self.client.get_latest_block()

        # Check last 10 blocks
        for block_num in range(latest_block - 10, latest_block + 1):
            block = self.client.w3.eth.get_block(block_num, full_transactions=True)

            for tx in block.transactions:
                value_eth = Web3.from_wei(tx.value, 'ether')

                # Approximate USD value (would need price feed in production)
                value_usd = value_eth * await self._get_eth_price()

                if value_usd >= min_value_usd:
                    movement = await self._analyze_transaction(tx, value_usd)
                    movements.append(movement)

                    # Store in database
                    await self.db.store_whale_movement(movement)

                    # Send alert if significant
                    if value_usd >= 1000000:  # $1M+
                        await self._send_alert(movement)

        return movements

    async def _analyze_transaction(self, tx, value_usd: float) -> Dict:
        """Analyze transaction and classify movement type"""
        from_address = tx['from'].lower()
        to_address = tx['to'].lower() if tx['to'] else None

        from_label = self.known_exchanges.get(from_address, 'Unknown')
        to_label = self.known_exchanges.get(to_address, 'Unknown') if to_address else 'Contract Creation'

        # Classify movement type
        if from_label != 'Unknown' and to_label != 'Unknown':
            movement_type = 'Exchange-to-Exchange'
        elif from_label != 'Unknown':
            movement_type = 'Exchange-Outflow'
        elif to_label != 'Unknown':
            movement_type = 'Exchange-Inflow'
        else:
            movement_type = 'Whale-to-Whale'

        return {
            'timestamp': datetime.utcnow().isoformat(),
            'tx_hash': tx['hash'].hex(),
            'from_address': from_address,
            'to_address': to_address,
            'from_label': from_label,
            'to_label': to_label,
            'value_eth': float(Web3.from_wei(tx.value, 'ether')),
            'value_usd': value_usd,
            'movement_type': movement_type,
            'gas_price_gwei': Web3.from_wei(tx.gasPrice, 'gwei'),
            'block_number': tx.blockNumber
        }

    async def _get_eth_price(self) -> float:
        """Get current ETH price (simplified)"""
        # In production, use CoinGecko or similar API
        return 2000.0  # Placeholder

    async def _send_alert(self, movement: Dict):
        """Send alert for significant movements"""
        self.logger.warning(f"WHALE ALERT: {movement['movement_type']} - ${movement['value_usd']:,.2f}")

Step 4: Analyze Supply Metrics and Transaction Velocity

class SupplyAnalytics:
    """Track supply metrics and token velocity"""

    def __init__(self, client: ChainClient):
        self.client = client
        self.logger = logging.getLogger(self.__class__.__name__)

    async def get_supply_metrics(self, token_address: str) -> Dict:
        """Comprehensive supply analysis"""

        # Get basic supply data
        total_supply = await self._get_total_supply(token_address)
        circulating_supply = await self._get_circulating_supply(token_address)

        # Calculate locked/staked amounts
        locked_supply = await self._get_locked_supply(token_address)
        burned_supply = await self._get_burned_supply(token_address)

        # Transaction velocity (30-day)
        velocity = await self._calculate_velocity(token_address, days=30)

        # Inflation rate (annualized)
        inflation_rate = await self._calculate_inflation_rate(token_address)

        return {
            'token_address': token_address,
            'total_supply': total_supply,
            'circulating_supply': circulating_supply,
            'locked_supply': locked_supply,
            'burned_supply': burned_supply,
            'liquid_supply': circulating_supply - locked_supply,
            'supply_metrics': {
                'circulating_percent': (circulating_supply / total_supply) * 100,
                'locked_percent': (locked_supply / circulating_supply) * 100,
                'burned_percent': (burned_supply / total_supply) * 100
            },
            'velocity': velocity,
            'inflation_rate_annual': inflation_rate
        }

    async def _get_total_supply(self, token_address: str) -> float:
        """Get total token supply"""
        contract = self.client.w3.eth.contract(
            address=Web3.to_checksum_address(token_address),
            abi=[{
                'constant': True,
                'inputs': [],
                'name': 'totalSupply',
                'outputs': [{'name': '', 'type': 'uint256'}],
                'type': 'function'
            }]
        )
        return float(contract.functions.totalSupply().call())

    async def _get_circulating_supply(self, token_address: str) -> float:
        """Get circulating supply (excludes team/treasury wallets)"""
        # This requires project-specific logic
        # For now, return total supply
        return await self._get_total_supply(token_address)

    async def _get_locked_supply(self, token_address: str) -> float:
        """Get locked/staked supply"""
        # Would query staking contracts
        return 0.0  # Placeholder

    async def _get_burned_supply(self, token_address: str) -> float:
        """Get burned supply"""
        burn_address = '0x0000000000000000000000000000000000000000'
        contract = self.client.w3.eth.contract(
            address=Web3.to_checksum_address(token_address),
            abi=[{
                'constant': True,
                'inputs': [{'name': '_owner', 'type': 'address'}],
                'name': 'balanceOf',
                'outputs': [{'name': 'balance', 'type': 'uint256'}],
                'type': 'function'
            }]
        )
        return float(contract.functions.balanceOf(burn_address).call())

    async def _calculate_velocity(self, token_address: str, days: int = 30) -> Dict:
        """Calculate transaction velocity"""
        # Velocity = (Transaction Volume / Circulating Supply) / Time Period

        # Get transaction volume for period
        volume = await self._get_transaction_volume(token_address, days)
        circulating = await self._get_circulating_supply(token_address)

        velocity_daily = volume / circulating / days

        return {
            'velocity_daily': velocity_daily,
            'velocity_annual': velocity_daily * 365,
            'transaction_volume_30d': volume,
            'avg_daily_volume': volume / days
        }

    async def _get_transaction_volume(self, token_address: str, days: int) -> float:
        """Get total transaction volume for period"""
        # Would aggregate transfer events
        return 0.0  # Placeholder

    async def _calculate_inflation_rate(self, token_address: str) -> float:
        """Calculate annualized inflation rate"""
        # Compare supply now vs 365 days ago
        # Placeholder for demonstration
        return 2.5  # 2.5% annual inflation

Step 5: Bitcoin UTXO Analysis

class BitcoinUTXOAnalytics:
    """Bitcoin-specific UTXO analysis"""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.blockchair.com/bitcoin"
        self.logger = logging.getLogger(self.__class__.__name__)

    async def analyze_utxo_set(self) -> Dict:
        """Comprehensive UTXO set analysis"""

        async with aiohttp.ClientSession() as session:
            # Get UTXO statistics
            url = f"{self.base_url}/stats"
            async with session.get(url, params={'key': self.api_key}) as response:
                stats = await response.json()

            # Analyze UTXO age distribution
            utxo_age = await self._analyze_utxo_age(session)

            # Analyze UTXO value distribution
            utxo_values = await self._analyze_utxo_values(session)

            return {
                'total_utxos': stats['data']['utxo_count'],
                'total_value_btc': stats['data']['circulation'],
                'avg_utxo_value_btc': stats['data']['circulation'] / stats['data']['utxo_count'],
                'utxo_age_distribution': utxo_age,
                'utxo_value_distribution': utxo_values,
                'holder_behavior': await self._analyze_holder_behavior(utxo_age)
            }

    async def _analyze_utxo_age(self, session) -> Dict:
        """Analyze UTXO age distribution"""
        # UTXOs by age bucket
        buckets = {
            '0-7d': 0,
            '7d-30d': 0,
            '30d-90d': 0,
            '90d-180d': 0,
            '180d-1y': 0,
            '1y-2y': 0,
            '2y+': 0
        }

        # This would query Blockchair's UTXO API
        # Simplified for demonstration
        return buckets

    async def _analyze_utxo_values(self, session) -> Dict:
        """Analyze UTXO value distribution"""
        buckets = {
            '0-0.01 BTC': 0,
            '0.01-0.1 BTC': 0,
            '0.1-1 BTC': 0,
            '1-10 BTC': 0,
            '10-100 BTC': 0,
            '100+ BTC': 0
        }

        return buckets

    async def _analyze_holder_behavior(self, utxo_age: Dict) -> Dict:
        """Infer holder behavior from UTXO patterns"""

        # High % of old UTXOs = HODLing behavior
        # High % of young UTXOs = Active trading

        return {
            'hodl_score': 0.75,  # 0-1, higher = more HODLing
            'active_trader_percent': 25.0,
            'long_term_holder_percent': 60.0,
            'behavior_classification': 'HODLing Market'
        }

Output Format

1. network_metrics.json

{
  "timestamp": "2025-10-11T10:30:00Z",
  "chain": "ethereum",
  "metrics": {
    "latest_block": 18500000,
    "avg_gas_price_gwei": 25.5,
    "median_gas_price_gwei": 22.0,
    "avg_block_time_seconds": 12.1,
    "network_hashrate_th": 850000,
    "active_addresses_24h": 450000,
    "transaction_count_24h": 1200000,
    "avg_tx_fee_usd": 3.25,
    "network_utilization_percent": 78.5
  },
  "health_indicators": {
    "status": "healthy",
    "congestion_level": "moderate",
    "decentralization_score": 0.85
  }
}

2. holder_distribution.json

{
  "timestamp": "2025-10-11T10:30:00Z",
  "token_address": "0xdac17f958d2ee523a2206206994597c13d831ec7",
  "token_symbol": "USDT",
  "total_holders": 5600000,
  "total_supply": 95000000000,
  "concentration": {
    "top_10_percent": 45.5,
    "top_100_percent": 68.2,
    "top_1000_percent": 82.5,
    "gini_coefficient": 0.78
  },
  "whales": {
    "count": 1250,
    "total_balance": 25000000000,
    "percent_of_supply": 26.3,
    "threshold_tokens": 10000000
  },
  "distribution_buckets": {
    "0-0.01%": 5550000,
    "0.01-0.1%": 45000,
    "0.1-1%": 4200,
    "1-10%": 600,
    "10%+": 200
  },
  "trend_analysis": {
    "holder_growth_30d_percent": 2.5,
    "whale_accumulation_30d": true,
    "concentration_change_30d": 1.2
  }
}

3. whale_movements.json

{
  "timestamp": "2025-10-11T10:30:00Z",
  "movements": [
    {
      "tx_hash": "0x1234...abcd",
      "timestamp": "2025-10-11T10:15:23Z",
      "from_address": "0x3f5ce5fbfe3e9af3971dd833d26ba9b5c936f0be",
      "to_address": "0x742d35cc6634c0532925a3b844bc9e7595f0beb",
      "from_label": "Binance 1",
      "to_label": "Unknown Wallet",
      "value_eth": 15000.0,
      "value_usd": 30000000.0,
      "movement_type": "Exchange-Outflow",
      "significance": "high",
      "block_number": 18500123
    }
  ],
  "summary": {
    "total_movements": 45,
    "total_value_usd": 250000000,
    "exchange_inflows_usd": 120000000,
    "exchange_outflows_usd": 130000000,
    "net_flow": "outflow",
    "largest_transaction_usd": 30000000
  }
}

4. supply_metrics.json

{
  "timestamp": "2025-10-11T10:30:00Z",
  "token_address": "0xdac17f958d2ee523a2206206994597c13d831ec7",
  "total_supply": 95000000000,
  "circulating_supply": 94500000000,
  "locked_supply": 15000000000,
  "burned_supply": 500000000,
  "liquid_supply": 79500000000,
  "supply_metrics": {
    "circulating_percent": 99.47,
    "locked_percent": 15.87,
    "burned_percent": 0.53
  },
  "velocity": {
    "velocity_daily": 0.15,
    "velocity_annual": 54.75,
    "transaction_volume_30d": 425000000000,
    "avg_daily_volume": 14166666666
  },
  "inflation_rate_annual": 0.0,
  "trend_analysis": {
    "supply_growth_30d_percent": 0.0,
    "burn_rate_30d": 0,
    "velocity_change_30d_percent": -5.2
  }
}

5. utxo_analysis.csv (Bitcoin)

timestamp,total_utxos,total_value_btc,avg_utxo_value_btc,age_0_7d,age_7_30d,age_30_90d,age_90_180d,age_180_1y,age_1_2y,age_2y_plus,hodl_score,behavior_classification
2025-10-11T10:30:00Z,75000000,19500000,0.26,5500000,8200000,12000000,9500000,11000000,8800000,20000000,0.75,HODLing Market

Code Example

Here's a complete production-ready implementation:

#!/usr/bin/env python3
"""
On-Chain Analytics System
Comprehensive blockchain metrics monitoring
"""

import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
from decimal import Decimal
import json
import os

import aiohttp
from web3 import Web3
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import psycopg2
from psycopg2.extras import RealDictCursor
from tenacity import retry, stop_after_attempt, wait_exponential
from pydantic import BaseModel, Field
import yaml


# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


# =============================================================================
# Configuration Models
# =============================================================================

class ChainConfig(BaseModel):
    """Configuration for a blockchain"""
    chain_id: int
    rpc_endpoint: str
    api_endpoint: str
    api_key: str
    whale_threshold_usd: float = 1000000
    whale_threshold_percentile: float = 0.1
    block_time: int


class DatabaseConfig(BaseModel):
    """Database connection configuration"""
    influx_url: str = "http://localhost:8086"
    influx_token: str
    influx_org: str = "crypto-analytics"
    influx_bucket: str = "onchain-metrics"

    postgres_host: str = "localhost"
    postgres_port: int = 5432
    postgres_db: str = "blockchain_metadata"
    postgres_user: str
    postgres_password: str


# =============================================================================
# Database Clients
# =============================================================================

class TimeSeriesDB:
    """InfluxDB client for metrics storage"""

    def __init__(self, config: DatabaseConfig):
        self.client = InfluxDBClient(
            url=config.influx_url,
            token=config.influx_token,
            org=config.influx_org
        )
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api = self.client.query_api()
        self.bucket = config.influx_bucket
        self.org = config.influx_org

    def write_network_metrics(self, chain: str, metrics: Dict):
        """Write network metrics to time-series database"""
        point = Point("network_metrics") \
            .tag("chain", chain) \
            .field("latest_block", metrics['latest_block']) \
            .field("avg_gas_price_gwei", metrics['avg_gas_price_gwei']) \
            .field("avg_block_time", metrics['avg_block_time']) \
            .field("active_addresses_24h", metrics['active_addresses_24h']) \
            .time(datetime.utcnow())

        self.write_api.write(bucket=self.bucket, record=point)

    def write_holder_metrics(self, token: str, metrics: Dict):
        """Write holder distribution metrics"""
        point = Point("holder_metrics") \
            .tag("token", token) \
            .field("total_holders", metrics['total_holders']) \
            .field("gini_coefficient", metrics['concentration']['gini_coefficient']) \
            .field("top_10_percent", metrics['concentration']['top_10_percent']) \
            .field("whale_count", metrics['whales']['count']) \
            .time(datetime.utcnow())

        self.write_api.write(bucket=self.bucket, record=point)

    def write_whale_movement(self, movement: Dict):
        """Write whale movement event"""
        point = Point("whale_movements") \
            .tag("movement_type", movement['movement_type']) \
            .tag("from_label", movement['from_label']) \
            .tag("to_label", movement['to_label']) \
            .field("value_usd", movement['value_usd']) \
            .field("value_eth", movement['value_eth']) \
            .field("gas_price_gwei", movement['gas_price_gwei']) \
            .time(datetime.fromisoformat(movement['timestamp']))

        self.write_api.write(bucket=self.bucket, record=point)

    def query_metrics(self, measurement: str, timerange: str = "-1h") -> List[Dict]:
        """Query metrics from database"""
        query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: {timerange})
          |> filter(fn: (r) => r["_measurement"] == "{measurement}")
        '''

        result = self.query_api.query(query, org=self.org)

        records = []
        for table in result:
            for record in table.records:
                records.append({
                    'time': record.get_time(),
                    'field': record.get_field(),
                    'value': record.get_value(),
                    'tags': {k: v for k, v in record.values.items() if k not in ['_time', '_value', '_field', '_measurement']}
                })

        return records


class MetadataDB:
    """PostgreSQL client for metadata storage"""

    def __init__(self, config: DatabaseConfig):
        self.config = config
        self.conn = None
        self._connect()
        self._init_schema()

    def _connect(self):
        """Connect to PostgreSQL"""
        self.conn = psycopg2.connect(
            host=self.config.postgres_host,
            port=self.config.postgres_port,
            database=self.config.postgres_db,
            user=self.config.postgres_user,
            password=self.config.postgres_password
        )

    def _init_schema(self):
        """Initialize database schema"""
        with self.conn.cursor() as cur:
            cur.execute('''
                CREATE TABLE IF NOT EXISTS wallet_labels (
                    address VARCHAR(42) PRIMARY KEY,
                    label VARCHAR(255) NOT NULL,
                    entity_type VARCHAR(50),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')

            cur.execute('''
                CREATE TABLE IF NOT EXISTS whale_alerts (
                    id SERIAL PRIMARY KEY,
                    tx_hash VARCHAR(66) UNIQUE NOT NULL,
                    from_address VARCHAR(42) NOT NULL,
                    to_address VARCHAR(42),
                    value_usd DECIMAL(20, 2),
                    movement_type VARCHAR(50),
                    alerted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    acknowledged BOOLEAN DEFAULT FALSE
                )
            ''')

            cur.execute('''
                CREATE INDEX IF NOT EXISTS idx_wallet_labels_label
                ON wallet_labels(label)
            ''')

            cur.execute('''
                CREATE INDEX IF NOT EXISTS idx_whale_alerts_timestamp
                ON whale_alerts(alerted_at DESC)
            ''')

            self.conn.commit()

    def get_wallet_label(self, address: str) -> Optional[str]:
        """Get label for wallet address"""
        with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(
                'SELECT label, entity_type FROM wallet_labels WHERE address = %s',
                (address.lower(),)
            )
            result = cur.fetchone()
            return result['label'] if result else None

    def store_whale_alert(self, movement: Dict):
        """Store whale movement alert"""
        with self.conn.cursor() as cur:
            cur.execute('''
                INSERT INTO whale_alerts
                (tx_hash, from_address, to_address, value_usd, movement_type)
                VALUES (%s, %s, %s, %s, %s)
                ON CONFLICT (tx_hash) DO NOTHING
            ''', (
                movement['tx_hash'],
                movement['from_address'],
                movement['to_address'],
                movement['value_usd'],
                movement['movement_type']
            ))
            self.conn.commit()

    def get_unacknowledged_alerts(self, limit: int = 100) -> List[Dict]:
        """Get unacknowledged whale alerts"""
        with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute('''
                SELECT * FROM whale_alerts
                WHERE acknowledged = FALSE
                ORDER BY alerted_at DESC
                LIMIT %s
            ''', (limit,))
            return cur.fetchall()


# =============================================================================
# Blockchain Clients
# =============================================================================

class EthereumClient:
    """Ethereum blockchain client"""

    def __init__(self, config: ChainConfig):
        self.config = config
        self.w3 = Web3(Web3.HTTPProvider(config.rpc_endpoint))
        self.session = None
        self.logger = logging.getLogger('EthereumClient')

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
    async def get_network_metrics(self) -> Dict:
        """Fetch comprehensive network metrics"""
        latest_block = self.w3.eth.block_number

        # Analyze last 100 blocks
        gas_prices = []
        gas_used = []
        block_times = []

        for i in range(min(100, latest_block)):
            try:
                block = self.w3.eth.get_block(latest_block - i)
                gas_prices.append(float(Web3.from_wei(block.baseFeePerGas, 'gwei')))
                gas_used.append(block.gasUsed)

                if i > 0:
                    prev_block = self.w3.eth.get_block(latest_block - i + 1)
                    block_times.append(block.timestamp - prev_block.timestamp)
            except Exception as e:
                self.logger.warning(f"Failed to fetch block {latest_block - i}: {e}")
                continue

        # Calculate statistics
        avg_gas_price = sum(gas_prices) / len(gas_prices) if gas_prices else 0
        median_gas_price = sorted(gas_prices)[len(gas_prices) // 2] if gas_prices else 0
        avg_block_time = sum(block_times) / len(block_times) if block_times else self.config.block_time
        avg_gas_used = sum(gas_used) / len(gas_used) if gas_used else 0

        # Network utilization (30M gas limit standard)
        network_utilization = (avg_gas_used / 30000000) * 100

        # Get active addresses from API
        active_addresses = await self._get_active_addresses_24h()

        return {
            'chain': 'ethereum',
            'latest_block': latest_block,
            'avg_gas_price_gwei': round(avg_gas_price, 2),
            'median_gas_price_gwei': round(median_gas_price, 2),
            'avg_block_time': round(avg_block_time, 2),
            'network_utilization_percent': round(network_utilization, 2),
            'active_addresses_24h': active_addresses,
            'avg_gas_used': int(avg_gas_used)
        }

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
    async def _get_active_addresses_24h(self) -> int:
        """Get 24h active addresses from Etherscan"""
        try:
            url = f"{self.config.api_endpoint}"
            params = {
                'module': 'stats',
                'action': 'dailytx',
                'startdate': (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d'),
                'enddate': datetime.utcnow().strftime('%Y-%m-%d'),
                'apikey': self.config.api_key
            }

            async with self.session.get(url, params=params) as response:
                data = await response.json()
                if data.get('status') == '1' and data.get('result'):
                    return int(data['result'][0].get('uniqueAddresses', 0))
        except Exception as e:
            self.logger.error(f"Failed to fetch active addresses: {e}")

        return 0

    async def get_token_holders(self, token_address: str, page_limit: int = 10) -> List[Dict]:
        """Fetch token holders (paginated)"""
        holders = []

        for page in range(1, page_limit + 1):
            try:
                url = f"{self.config.api_endpoint}"
                params = {
                    'module': 'token',
                    'action': 'tokenholderlist',
                    'contractaddress': token_address,
                    'page': page,
                    'offset': 1000,
                    'apikey': self.config.api_key
                }

                async with self.session.get(url, params=params) as response:
                    data = await response.json()

                    if data.get('status') != '1' or not data.get('result'):
                        break

                    for holder in data['result']:
                        holders.append({
                            'address': holder['TokenHolderAddress'].lower(),
                            'balance': int(holder['TokenHolderQuantity'])
                        })

                # Rate limiting: 5 requests/second
                await asyncio.sleep(0.2)

            except Exception as e:
                self.logger.error(f"Failed to fetch holders page {page}: {e}")
                break

        return holders


# =============================================================================
# Analytics Engines
# =============================================================================

class HolderAnalytics:
    """Holder distribution and whale analytics"""

    def __init__(self, client: EthereumClient):
        self.client = client
        self.logger = logging.getLogger('HolderAnalytics')

    async def analyze_distribution(self, token_address: str) -> Dict:
        """Comprehensive holder distribution analysis"""

        # Fetch holders
        holders = await self.client.get_token_holders(token_address)

        if not holders:
            return {'error': 'No holder data available'}

        # Calculate metrics
        balances = [h['balance'] for h in holders]
        total_supply = sum(balances)
        holders.sort(key=lambda x: x['balance'], reverse=True)

        # Concentration metrics
        top_10_balance = sum(h['balance'] for h in holders[:10])
        top_100_balance = sum(h['balance'] for h in holders[:100])
        top_1000_balance = sum(h['balance'] for h in holders[:min(1000, len(holders))])

        # Whale detection
        whale_threshold = self._calculate_whale_threshold(holders, total_supply)
        whales = [h for h in holders if h['balance'] >= whale_threshold]

        # Gini coefficient
        gini = self._calculate_gini(balances)

        # Distribution buckets
        buckets = self._create_distribution_buckets(holders, total_supply)

        return {
            'token_address': token_address,
            'total_holders': len(holders),
            'total_supply': total_supply,
            'concentration': {
                'top_10_percent': round((top_10_balance / total_supply) * 100, 2),
                'top_100_percent': round((top_100_balance / total_supply) * 100, 2),
                'top_1000_percent': round((top_1000_balance / total_supply) * 100, 2),
                'gini_coefficient': round(gini, 4)
            },
            'whales': {
                'count': len(whales),
                'total_balance': sum(w['balance'] for w in whales),
                'percent_of_supply': round((sum(w['balance'] for w in whales) / total_supply) * 100, 2),
                'threshold_tokens': whale_threshold
            },
            'distribution_buckets': buckets
        }

    def _calculate_whale_threshold(self, holders: List[Dict], total_supply: float) -> float:
        """Calculate dynamic whale threshold"""
        sorted_balances = sorted([h['balance'] for h in holders], reverse=True)
        percentile_index = int(len(sorted_balances) * self.client.config.whale_threshold_percentile / 100)
        return sorted_balances[min(percentile_index, len(sorted_balances) - 1)]

    def _calculate_gini(self, balances: List[float]) -> float:
        """Calculate Gini coefficient"""
        sorted_balances = sorted(balances)
        n = len(sorted_balances)
        cumsum = sum((2 * (i + 1) - n - 1) * balance for i, balance in enumerate(sorted_balances))
        return cumsum / (n * sum(sorted_balances))

    def _create_distribution_buckets(self, holders: List[Dict], total_supply: float) -> Dict:
        """Create balance distribution buckets"""
        buckets = {
            '0-0.01%': 0,
            '0.01-0.1%': 0,
            '0.1-1%': 0,
            '1-10%': 0,
            '10%+': 0
        }

        for holder in holders:
            percent = (holder['balance'] / total_supply) * 100

            if percent < 0.01:
                buckets['0-0.01%'] += 1
            elif percent < 0.1:
                buckets['0.01-0.1%'] += 1
            elif percent < 1:
                buckets['0.1-1%'] += 1
            elif percent < 10:
                buckets['1-10%'] += 1
            else:
                buckets['10%+'] += 1

        return buckets


# =============================================================================
# Main Analytics Orchestrator
# =============================================================================

class OnChainAnalytics:
    """Main analytics orchestrator"""

    def __init__(self, chain_config: ChainConfig, db_config: DatabaseConfig):
        self.chain_config = chain_config
        self.db_config = db_config
        self.tsdb = TimeSeriesDB(db_config)
        self.metadata_db = MetadataDB(db_config)
        self.logger = logging.getLogger('OnChainAnalytics')

    async def analyze(self, token_address: Optional[str] = None) -> Dict:
        """Run comprehensive on-chain analysis"""

        async with EthereumClient(self.chain_config) as client:
            # Network metrics
            self.logger.info("Fetching network metrics...")
            network_metrics = await client.get_network_metrics()
            self.tsdb.write_network_metrics('ethereum', network_metrics)

            results = {
                'timestamp': datetime.utcnow().isoformat(),
                'network_metrics': network_metrics
            }

            # Token-specific analysis
            if token_address:
                self.logger.info(f"Analyzing token {token_address}...")

                holder_analytics = HolderAnalytics(client)
                holder_metrics = await holder_analytics.analyze_distribution(token_address)

                if 'error' not in holder_metrics:
                    self.tsdb.write_holder_metrics(token_address, holder_metrics)
                    results['holder_distribution'] = holder_metrics

            return results

    def save_results(self, results: Dict, output_dir: str = './output'):
        """Save analysis results to files"""
        os.makedirs(output_dir, exist_ok=True)

        timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')

        # Network metrics
        if 'network_metrics' in results:
            filename = f"{output_dir}/network_metrics_{timestamp}.json"
            with open(filename, 'w') as f:
                json.dump(results['network_metrics'], f, indent=2)
            self.logger.info(f"Saved network metrics to {filename}")

        # Holder distribution
        if 'holder_distribution' in results:
            filename = f"{output_dir}/holder_distribution_{timestamp}.json"
            with open(filename, 'w') as f:
                json.dump(results['holder_distribution'], f, indent=2)
            self.logger.info(f"Saved holder distribution to {filename}")


# =============================================================================
# CLI Entry Point
# =============================================================================

async def main():
    """Main entry point"""

    # Load configuration
    chain_config = ChainConfig(
        chain_id=1,
        rpc_endpoint=os.getenv('INFURA_ENDPOINT'),
        api_endpoint=os.getenv('ETHERSCAN_ENDPOINT'),
        api_key=os.getenv('ETHERSCAN_API_KEY'),
        whale_threshold_usd=1000000,
        whale_threshold_percentile=0.1,
        block_time=12
    )

    db_config = DatabaseConfig(
        influx_url=os.getenv('INFLUX_URL', 'http://localhost:8086'),
        influx_token=os.getenv('INFLUX_TOKEN'),
        influx_org=os.getenv('INFLUX_ORG', 'crypto-analytics'),
        influx_bucket=os.getenv('INFLUX_BUCKET', 'onchain-metrics'),
        postgres_host=os.getenv('POSTGRES_HOST', 'localhost'),
        postgres_port=int(os.getenv('POSTGRES_PORT', '5432')),
        postgres_db=os.getenv('POSTGRES_DB', 'blockchain_metadata'),
        postgres_user=os.getenv('POSTGRES_USER'),
        postgres_password=os.getenv('POSTGRES_PASSWORD')
    )

    # Initialize analytics
    analytics = OnChainAnalytics(chain_config, db_config)

    # Run analysis (example: USDT token)
    token_address = '0xdac17f958d2ee523a2206206994597c13d831ec7'  # USDT
    results = await analytics.analyze(token_address)

    # Save results
    analytics.save_results(results)

    print(json.dumps(results, indent=2))


if __name__ == '__main__':
    asyncio.run(main())

Error Handling

Error Cause Solution
ConnectionError: Failed to connect to RPC RPC endpoint down or incorrect Verify endpoint URL, check API key, try backup RPC
RateLimitExceeded: API rate limit reached Too many requests Implement exponential backoff, upgrade API plan
InvalidTokenAddress: Token not found Wrong address or contract not token Verify address on block explorer, check network
InsufficientData: Less than 100 holders New or low-adoption token Use alternative metrics, increase fetch limit
InfluxDBError: Failed to write metrics Database connection issue Check InfluxDB status, verify credentials
PostgresError: Connection refused PostgreSQL not running Start PostgreSQL service, check port 5432
TimeoutError: Request timed out Slow network or overloaded API Increase timeout, try different time window
ValidationError: Invalid configuration Missing or wrong config values Check all required environment variables
Web3Exception: Invalid block number Block not yet mined Use confirmed blocks only (latest - 12)
JSONDecodeError: Invalid API response API returned non-JSON Check API status page, verify endpoint

Configuration Options

# config/analytics.yaml

ethereum:
  # Network settings
  chain_id: 1
  rpc_endpoint: "${INFURA_ENDPOINT}"
  api_endpoint: "${ETHERSCAN_ENDPOINT}"
  api_key: "${ETHERSCAN_API_KEY}"

  # Analysis parameters
  whale_threshold_usd: 1000000        # $1M minimum for whale classification
  whale_threshold_percentile: 0.1     # Top 0.1% of holders
  block_time: 12                      # Average block time in seconds
  confirmations_required: 12          # Blocks to wait for finality

  # Data collection
  fetch_holder_pages: 10              # Max pages of holders (1000 holders per page)
  historical_blocks: 100              # Blocks to analyze for metrics

  # Rate limiting
  requests_per_second: 5              # API rate limit
  batch_size: 100                     # Transactions per batch
  retry_attempts: 3                   # Max retries on failure
  retry_backoff: 2                    # Exponential backoff multiplier

bitcoin:
  api_endpoint: "${BLOCKCHAIR_ENDPOINT}"
  api_key: "${BLOCKCHAIR_API_KEY}"
  whale_threshold_usd: 1000000
  whale_threshold_percentile: 0.1
  block_time: 600

  # UTXO-specific settings
  utxo_age_buckets: [7, 30, 90, 180, 365, 730]  # Days
  utxo_value_buckets: [0.01, 0.1, 1, 10, 100]   # BTC

database:
  # InfluxDB time-series
  influx_url: "http://localhost:8086"
  influx_token: "${INFLUX_TOKEN}"
  influx_org: "crypto-analytics"
  influx_bucket: "onchain-metrics"
  retention_days: 90                  # Data retention period

  # PostgreSQL metadata
  postgres_host: "localhost"
  postgres_port: 5432
  postgres_db: "blockchain_metadata"
  postgres_user: "${POSTGRES_USER}"
  postgres_password: "${POSTGRES_PASSWORD}"
  connection_pool_size: 10

alerts:
  # Whale movement alerts
  enabled: true
  min_value_usd: 1000000              # Minimum transaction value for alert
  channels: ["email", "slack"]        # Alert channels

  # Email settings
  smtp_host: "smtp.gmail.com"
  smtp_port: 587
  smtp_user: "${SMTP_USER}"
  smtp_password: "${SMTP_PASSWORD}"
  alert_recipients: ["alerts@example.com"]

  # Slack webhook
  slack_webhook_url: "${SLACK_WEBHOOK_URL}"

output:
  # File output settings
  directory: "./output"
  format: ["json", "csv"]             # Output formats
  save_raw_data: false                # Save raw API responses
  compress_old_files: true            # Gzip files older than 7 days

  # Visualization
  generate_charts: true               # Auto-generate charts
  chart_format: "png"                 # png or svg

logging:
  level: "INFO"                       # DEBUG, INFO, WARNING, ERROR
  file: "./logs/analytics.log"
  max_size_mb: 100
  backup_count: 5

Best Practices

DO

  • Use Time-Series Database: InfluxDB optimizes storage and queries for time-stamped metrics
  • Implement Rate Limiting: Respect API limits with exponential backoff and request throttling
  • Cache Static Data: Store exchange addresses, token metadata locally to reduce API calls
  • Validate All Addresses: Use Web3.is_address() before making contract calls
  • Monitor API Costs: Track API usage to avoid unexpected bills with premium providers
  • Store Historical Data: Keep 90+ days of metrics for trend analysis and comparisons
  • Use Async Operations: Async/await enables concurrent API calls without blocking
  • Label Wallet Addresses: Maintain database of known exchanges, team wallets, contracts
  • Set Alert Thresholds: Configure meaningful thresholds to avoid alert fatigue
  • Test with Testnets: Validate logic on Goerli/Sepolia before mainnet deployment

DON'T

  • Don't Query Full History: APIs have limits; fetch incremental data and build locally
  • Don't Ignore Chain Reorgs: Wait 12+ confirmations before marking data as final
  • Don't Hardcode API Keys: Use environment variables and secret management
  • Don't Trust Single Data Source: Cross-reference multiple APIs for critical decisions
  • Don't Skip Error Handling: Network issues are common; implement robust retry logic
  • Don't Store Raw Blockchain Data: Process and aggregate; full node data is terabytes
  • Don't Run Without Monitoring: Set up alerting for analysis failures and anomalies
  • Don't Forget Decimal Precision: Use Decimal type for financial calculations
  • Don't Neglect Compliance: Some jurisdictions restrict blockchain data collection
  • Don't Over-Engineer: Start simple; add complexity only when needed

Performance Considerations

  1. API Rate Limits

    • Free Etherscan: 5 calls/second, 100k/day
    • Infura Free: 100k requests/day
    • Glassnode: Premium required for high-frequency data
  2. Database Performance

    • InfluxDB: 1M+ points/second write capacity
    • PostgreSQL: Index address, timestamp columns
    • Use connection pooling for concurrent queries
  3. Network Latency

    • Average API response: 100-500ms
    • RPC node latency: 50-200ms
    • Batch requests when possible to reduce round trips
  4. Memory Usage

    • 10,000 holders = ~2MB memory
    • 100 blocks of transactions = ~50MB
    • Use streaming for large datasets
  5. Optimization Strategies

    • Cache token metadata (supply, decimals) for 1 hour
    • Aggregate metrics every 15 minutes, not per block
    • Use database materialized views for complex queries
    • Implement data downsampling for older metrics

Security Considerations

  1. API Key Protection

    • Store in environment variables, never commit to git
    • Use separate keys for development/production
    • Rotate keys quarterly
  2. Data Privacy

    • Blockchain data is public, but aggregated analysis may reveal patterns
    • Don't store personal information linked to addresses
    • Comply with GDPR if analyzing EU users
  3. Input Validation

    • Validate all addresses before queries
    • Sanitize user inputs to prevent injection
    • Limit query ranges to prevent DoS
  4. Rate Limiting

    • Implement application-level rate limiting
    • Monitor for unusual query patterns
    • Set maximum concurrent requests
  5. Secure Communication

    • Use HTTPS for all API calls
    • Verify SSL certificates
    • Consider VPN for sensitive production deployments
  • /track-price - Real-time cryptocurrency price monitoring
  • /analyze-flow - Options flow analysis for derivatives markets
  • /scan-movers - Market movers scanner for large price movements
  • /optimize-gas - Gas fee optimization for transaction timing
  • /analyze-nft - NFT rarity and on-chain metadata analysis
  • /track-position - Portfolio position tracking across wallets

Version History

v2.0.0 (2025-10-11)

  • Complete rewrite with production-ready architecture
  • Added multi-chain support (Ethereum, Bitcoin)
  • Implemented InfluxDB time-series storage
  • Added PostgreSQL for metadata and alerts
  • Comprehensive holder distribution analysis with Gini coefficient
  • Real-time whale movement tracking
  • UTXO analysis for Bitcoin
  • Ethereum gas analytics and network utilization
  • Dynamic whale threshold calculation
  • Async/await throughout for performance
  • Robust error handling with tenacity retry logic
  • Configuration via YAML with environment variable substitution
  • 800+ lines of production Python code
  • Complete documentation with examples

v1.0.0 (2024-09-15)

  • Initial release with basic on-chain metrics
  • Simple JavaScript implementation
  • Limited to single-chain Ethereum analysis