60 KiB
description, shortcut, tags, version, author
| description | shortcut | tags | version | author | ||||||
|---|---|---|---|---|---|---|---|---|---|---|
| Comprehensive blockchain metrics monitoring and on-chain analytics system | ac |
|
2.0.0 | claude-code-plugins |
Analyze On-Chain Data
Comprehensive blockchain metrics monitoring system that tracks network health, holder distribution, whale movements, exchange flows, supply metrics, and transaction velocity across multiple blockchain networks.
When to Use
Use this command when you need to:
- Monitor Network Health: Track hash rate, difficulty, active addresses, and transaction throughput
- Analyze Holder Distribution: Identify whale concentration, wealth distribution, and accumulation patterns
- Track Exchange Flows: Monitor deposit/withdrawal patterns, exchange reserves, and liquidity movements
- Analyze Supply Metrics: Track circulating supply, staked amounts, burned tokens, and inflation rates
- Measure Transaction Velocity: Calculate token circulation speed, active address growth, and network utilization
- Bitcoin UTXO Analysis: Track unspent transaction outputs, UTXO age distribution, and holder behavior
- Ethereum Gas Analytics: Monitor gas prices, network congestion, and smart contract activity
- Whale Movement Alerts: Real-time tracking of large transactions and address clustering
- Market Research: Data-driven insights for trading strategies and investment decisions
- Compliance Monitoring: Track sanctioned addresses, suspicious patterns, and regulatory requirements
DON'T Use When
- Real-time Trading: Use dedicated trading APIs for sub-second execution
- Price Prediction: On-chain metrics show trends, not future prices
- Legal Advice: Consult blockchain forensics experts for legal cases
- Single Transaction Tracking: Use block explorers for individual transactions
- Historical Data Older Than 7 Years: Most APIs have limited historical depth
Design Decisions
1. Multi-Chain Architecture vs Single Chain Focus
Chosen Approach: Multi-chain with chain-specific adapters
Why: Different blockchains require different metrics and APIs. Bitcoin uses UTXO model while Ethereum uses account-based model. A unified interface with chain-specific implementations provides flexibility while maintaining consistent output formats.
Alternatives Considered:
- Single Chain Only: Rejected due to limited market coverage
- Chain-Agnostic Generic Metrics: Rejected due to loss of blockchain-specific insights
- Separate Tools Per Chain: Rejected due to code duplication and maintenance burden
2. Data Source Strategy: APIs vs Full Node
Chosen Approach: Hybrid API-based with optional full node support
Why: Most users don't run full nodes. Using APIs (Etherscan, Blockchain.com, Glassnode) provides immediate access without infrastructure requirements. Full node support remains optional for advanced users needing maximum decentralization.
Alternatives Considered:
- Full Node Required: Rejected due to high infrastructure costs ($500+/month)
- APIs Only: Considered but added full node option for enterprise users
- Light Client Sync: Rejected due to incomplete data and sync time requirements
3. Whale Detection Threshold Method
Chosen Approach: Dynamic percentile-based thresholds with absolute minimums
Why: Static thresholds (e.g., "1000 BTC = whale") become outdated as prices change. Using percentile-based detection (top 0.1% holders) with absolute minimums (e.g., $1M USD equivalent) adapts to market conditions.
Alternatives Considered:
- Fixed Token Amounts: Rejected due to price volatility making thresholds obsolete
- USD Value Only: Rejected as it misses on-chain concentration patterns
- Machine Learning Clustering: Rejected due to complexity vs accuracy tradeoff
4. Data Storage: Time-Series Database vs Relational
Chosen Approach: Time-series database (InfluxDB) for metrics, PostgreSQL for metadata
Why: On-chain metrics are time-stamped numerical data perfect for time-series databases. InfluxDB provides efficient storage, fast queries, and built-in downsampling. PostgreSQL stores wallet metadata, labels, and relationships.
Alternatives Considered:
- PostgreSQL Only: Rejected due to poor time-series query performance
- InfluxDB Only: Rejected due to poor relational data handling
- ElasticSearch: Rejected due to higher complexity and resource requirements
5. Real-Time vs Batch Processing
Chosen Approach: Hybrid batch + real-time for whale alerts
Why: Most metrics update every 10-15 minutes (blockchain confirmation time). Batch processing handles historical data efficiently. Real-time processing monitors mempool for large transactions requiring immediate alerts.
Alternatives Considered:
- Batch Only: Rejected due to delayed whale movement detection
- Real-Time Only: Rejected due to high API costs and rate limits
- Streaming-First Architecture: Rejected as overkill for blockchain data latency
Prerequisites
1. Blockchain API Access
Ethereum:
# Etherscan API (Free tier: 5 calls/second)
export ETHERSCAN_API_KEY="your_etherscan_key"
export ETHERSCAN_ENDPOINT="https://api.etherscan.io/api"
# Infura for direct node access (100k requests/day free)
export INFURA_PROJECT_ID="your_infura_project_id"
export INFURA_ENDPOINT="https://mainnet.infura.io/v3/${INFURA_PROJECT_ID}"
Bitcoin:
# Blockchain.com API (No key required, rate limited)
export BLOCKCHAIN_API_ENDPOINT="https://blockchain.info"
# Blockchair API (10k requests/day free)
export BLOCKCHAIR_API_KEY="your_blockchair_key"
export BLOCKCHAIR_ENDPOINT="https://api.blockchair.com"
Premium Data Providers:
# Glassnode (Professional tier required, $800+/month)
export GLASSNODE_API_KEY="your_glassnode_key"
# CoinMetrics (Enterprise tier, contact for pricing)
export COINMETRICS_API_KEY="your_coinmetrics_key"
2. Python Dependencies
pip install web3==6.11.0 \
requests==2.31.0 \
pandas==2.1.3 \
influxdb-client==1.38.0 \
psycopg2-binary==2.9.9 \
python-dotenv==1.0.0 \
aiohttp==3.9.1 \
tenacity==8.2.3 \
pydantic==2.5.0 \
pytz==2023.3
3. Database Infrastructure
InfluxDB Setup:
# Docker installation
docker run -d -p 8086:8086 \
-v influxdb-data:/var/lib/influxdb2 \
-e DOCKER_INFLUXDB_INIT_MODE=setup \
-e DOCKER_INFLUXDB_INIT_USERNAME=admin \
-e DOCKER_INFLUXDB_INIT_PASSWORD=secure_password_here \
-e DOCKER_INFLUXDB_INIT_ORG=crypto-analytics \
-e DOCKER_INFLUXDB_INIT_BUCKET=onchain-metrics \
influxdb:2.7
PostgreSQL Setup:
# Docker installation
docker run -d -p 5432:5432 \
-v postgres-data:/var/lib/postgresql/data \
-e POSTGRES_DB=blockchain_metadata \
-e POSTGRES_USER=crypto_analyst \
-e POSTGRES_PASSWORD=secure_password_here \
postgres:16
4. Configuration File
Create config/chains.yaml:
ethereum:
chain_id: 1
rpc_endpoint: "${INFURA_ENDPOINT}"
api_endpoint: "${ETHERSCAN_ENDPOINT}"
api_key: "${ETHERSCAN_API_KEY}"
whale_threshold_usd: 1000000
whale_threshold_percentile: 0.1
block_time: 12
bitcoin:
rpc_endpoint: null
api_endpoint: "${BLOCKCHAIR_ENDPOINT}"
api_key: "${BLOCKCHAIR_API_KEY}"
whale_threshold_usd: 1000000
whale_threshold_percentile: 0.1
block_time: 600
5. System Requirements
- CPU: 4+ cores (8+ recommended for multi-chain)
- RAM: 8GB minimum (16GB recommended)
- Storage: 100GB SSD (time-series data grows ~1GB/day with full metrics)
- Network: Stable connection with 10+ Mbps (API rate limits are the bottleneck)
Implementation Process
Step 1: Initialize Multi-Chain Client
from web3 import Web3
import aiohttp
from typing import Dict, Optional
import logging
class ChainClient:
"""Base class for blockchain clients"""
def __init__(self, config: Dict):
self.config = config
self.logger = logging.getLogger(self.__class__.__name__)
async def get_network_metrics(self) -> Dict:
raise NotImplementedError
async def get_holder_distribution(self, token: str) -> Dict:
raise NotImplementedError
class EthereumClient(ChainClient):
"""Ethereum-specific implementation"""
def __init__(self, config: Dict):
super().__init__(config)
self.w3 = Web3(Web3.HTTPProvider(config['rpc_endpoint']))
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def get_latest_block(self) -> int:
"""Get latest block number"""
return self.w3.eth.block_number
async def get_network_metrics(self) -> Dict:
"""Fetch Ethereum network metrics"""
latest_block = await self.get_latest_block()
# Get last 100 blocks for gas analysis
gas_prices = []
block_times = []
for i in range(100):
block = self.w3.eth.get_block(latest_block - i)
gas_prices.append(Web3.from_wei(block.gasUsed, 'gwei'))
if i > 0:
prev_block = self.w3.eth.get_block(latest_block - i + 1)
block_times.append(block.timestamp - prev_block.timestamp)
return {
'chain': 'ethereum',
'latest_block': latest_block,
'avg_gas_price_gwei': sum(gas_prices) / len(gas_prices),
'median_gas_price_gwei': sorted(gas_prices)[len(gas_prices) // 2],
'avg_block_time': sum(block_times) / len(block_times),
'network_hashrate': await self._get_hashrate(),
'active_addresses_24h': await self._get_active_addresses()
}
async def _get_hashrate(self) -> Optional[float]:
"""Get network hashrate from Etherscan"""
try:
url = f"{self.config['api_endpoint']}?module=stats&action=chainsize&apikey={self.config['api_key']}"
async with self.session.get(url) as response:
data = await response.json()
return float(data['result'])
except Exception as e:
self.logger.error(f"Failed to fetch hashrate: {e}")
return None
async def _get_active_addresses(self) -> Optional[int]:
"""Get 24h active addresses from Etherscan"""
try:
url = f"{self.config['api_endpoint']}?module=stats&action=dailytx&apikey={self.config['api_key']}"
async with self.session.get(url) as response:
data = await response.json()
if data['status'] == '1':
return int(data['result'][0]['uniqueAddresses'])
except Exception as e:
self.logger.error(f"Failed to fetch active addresses: {e}")
return None
Step 2: Implement Holder Distribution Analysis
from collections import defaultdict
from decimal import Decimal
class HolderAnalytics:
"""Analyze token holder distribution and concentration"""
def __init__(self, client: ChainClient):
self.client = client
self.logger = logging.getLogger(self.__class__.__name__)
async def analyze_distribution(self, token_address: str) -> Dict:
"""Analyze holder distribution with whale detection"""
# Fetch all holders (this is a simplified version)
holders = await self._fetch_all_holders(token_address)
if not holders:
return {'error': 'No holder data available'}
# Calculate distribution metrics
balances = [h['balance'] for h in holders]
total_supply = sum(balances)
# Sort by balance descending
holders.sort(key=lambda x: x['balance'], reverse=True)
# Calculate concentration metrics
top_10_balance = sum(h['balance'] for h in holders[:10])
top_100_balance = sum(h['balance'] for h in holders[:100])
top_1000_balance = sum(h['balance'] for h in holders[:1000])
# Whale threshold (top 0.1% or $1M+ USD)
whale_threshold_tokens = self._calculate_whale_threshold(
holders,
total_supply,
self.client.config['whale_threshold_percentile']
)
whales = [h for h in holders if h['balance'] >= whale_threshold_tokens]
# Calculate Gini coefficient (wealth inequality)
gini = self._calculate_gini(balances)
# Distribution buckets
buckets = self._create_distribution_buckets(holders, total_supply)
return {
'token_address': token_address,
'total_holders': len(holders),
'total_supply': total_supply,
'concentration': {
'top_10_percent': (top_10_balance / total_supply) * 100,
'top_100_percent': (top_100_balance / total_supply) * 100,
'top_1000_percent': (top_1000_balance / total_supply) * 100,
'gini_coefficient': gini
},
'whales': {
'count': len(whales),
'total_balance': sum(w['balance'] for w in whales),
'percent_of_supply': (sum(w['balance'] for w in whales) / total_supply) * 100,
'threshold_tokens': whale_threshold_tokens
},
'distribution_buckets': buckets
}
async def _fetch_all_holders(self, token_address: str) -> list:
"""Fetch all token holders (implementation depends on API)"""
# This would use Etherscan's token holder API
holders = []
page = 1
while True:
url = f"{self.client.config['api_endpoint']}?module=token&action=tokenholderlist&contractaddress={token_address}&page={page}&offset=1000&apikey={self.client.config['api_key']}"
async with self.client.session.get(url) as response:
data = await response.json()
if data['status'] != '1' or not data['result']:
break
for holder in data['result']:
holders.append({
'address': holder['TokenHolderAddress'],
'balance': int(holder['TokenHolderQuantity'])
})
page += 1
# Rate limiting
await asyncio.sleep(0.2) # 5 requests per second
return holders
def _calculate_whale_threshold(self, holders: list, total_supply: float, percentile: float) -> float:
"""Calculate dynamic whale threshold"""
sorted_balances = sorted([h['balance'] for h in holders], reverse=True)
index = int(len(sorted_balances) * percentile / 100)
return sorted_balances[min(index, len(sorted_balances) - 1)]
def _calculate_gini(self, balances: list) -> float:
"""Calculate Gini coefficient (0=perfect equality, 1=perfect inequality)"""
sorted_balances = sorted(balances)
n = len(sorted_balances)
cumsum = 0
for i, balance in enumerate(sorted_balances):
cumsum += (2 * (i + 1) - n - 1) * balance
return cumsum / (n * sum(sorted_balances))
def _create_distribution_buckets(self, holders: list, total_supply: float) -> Dict:
"""Create balance distribution buckets"""
buckets = {
'0-0.01%': 0,
'0.01-0.1%': 0,
'0.1-1%': 0,
'1-10%': 0,
'10%+': 0
}
for holder in holders:
percent = (holder['balance'] / total_supply) * 100
if percent < 0.01:
buckets['0-0.01%'] += 1
elif percent < 0.1:
buckets['0.01-0.1%'] += 1
elif percent < 1:
buckets['0.1-1%'] += 1
elif percent < 10:
buckets['1-10%'] += 1
else:
buckets['10%+'] += 1
return buckets
Step 3: Track Whale Movements and Exchange Flows
from datetime import datetime, timedelta
import asyncio
class WhaleTracker:
"""Real-time whale movement tracking"""
def __init__(self, client: ChainClient, db_client):
self.client = client
self.db = db_client
self.logger = logging.getLogger(self.__class__.__name__)
self.known_exchanges = self._load_exchange_addresses()
def _load_exchange_addresses(self) -> Dict[str, str]:
"""Load known exchange addresses"""
return {
'0x3f5ce5fbfe3e9af3971dd833d26ba9b5c936f0be': 'Binance 1',
'0xd551234ae421e3bcba99a0da6d736074f22192ff': 'Binance 2',
'0x28c6c06298d514db089934071355e5743bf21d60': 'Binance 14',
'0x21a31ee1afc51d94c2efccaa2092ad1028285549': 'Binance 15',
'0x564286362092d8e7936f0549571a803b203aaced': 'Binance 16',
'0x0681d8db095565fe8a346fa0277bffde9c0edbbf': 'Binance 17',
'0x4e9ce36e442e55ecd9025b9a6e0d88485d628a67': 'Binance 18',
'0xbe0eb53f46cd790cd13851d5eff43d12404d33e8': 'Binance 19',
'0xf977814e90da44bfa03b6295a0616a897441acec': 'Binance 8',
# Coinbase
'0x71660c4005ba85c37ccec55d0c4493e66fe775d3': 'Coinbase 1',
'0x503828976d22510aad0201ac7ec88293211d23da': 'Coinbase 2',
'0xddfabcdc4d8ffc6d5beaf154f18b778f892a0740': 'Coinbase 3',
'0x3cd751e6b0078be393132286c442345e5dc49699': 'Coinbase 4',
'0xb5d85cbf7cb3ee0d56b3bb207d5fc4b82f43f511': 'Coinbase 5',
'0xeb2629a2734e272bcc07bda959863f316f4bd4cf': 'Coinbase 6',
# Kraken
'0x2910543af39aba0cd09dbb2d50200b3e800a63d2': 'Kraken 1',
'0x0a869d79a7052c7f1b55a8ebabbea3420f0d1e13': 'Kraken 2',
'0xe853c56864a2ebe4576a807d26fdc4a0ada51919': 'Kraken 3',
'0x267be1c1d684f78cb4f6a176c4911b741e4ffdc0': 'Kraken 4',
}
async def track_movements(self, min_value_usd: float = 100000) -> list:
"""Track large transactions in real-time"""
movements = []
latest_block = await self.client.get_latest_block()
# Check last 10 blocks
for block_num in range(latest_block - 10, latest_block + 1):
block = self.client.w3.eth.get_block(block_num, full_transactions=True)
for tx in block.transactions:
value_eth = Web3.from_wei(tx.value, 'ether')
# Approximate USD value (would need price feed in production)
value_usd = value_eth * await self._get_eth_price()
if value_usd >= min_value_usd:
movement = await self._analyze_transaction(tx, value_usd)
movements.append(movement)
# Store in database
await self.db.store_whale_movement(movement)
# Send alert if significant
if value_usd >= 1000000: # $1M+
await self._send_alert(movement)
return movements
async def _analyze_transaction(self, tx, value_usd: float) -> Dict:
"""Analyze transaction and classify movement type"""
from_address = tx['from'].lower()
to_address = tx['to'].lower() if tx['to'] else None
from_label = self.known_exchanges.get(from_address, 'Unknown')
to_label = self.known_exchanges.get(to_address, 'Unknown') if to_address else 'Contract Creation'
# Classify movement type
if from_label != 'Unknown' and to_label != 'Unknown':
movement_type = 'Exchange-to-Exchange'
elif from_label != 'Unknown':
movement_type = 'Exchange-Outflow'
elif to_label != 'Unknown':
movement_type = 'Exchange-Inflow'
else:
movement_type = 'Whale-to-Whale'
return {
'timestamp': datetime.utcnow().isoformat(),
'tx_hash': tx['hash'].hex(),
'from_address': from_address,
'to_address': to_address,
'from_label': from_label,
'to_label': to_label,
'value_eth': float(Web3.from_wei(tx.value, 'ether')),
'value_usd': value_usd,
'movement_type': movement_type,
'gas_price_gwei': Web3.from_wei(tx.gasPrice, 'gwei'),
'block_number': tx.blockNumber
}
async def _get_eth_price(self) -> float:
"""Get current ETH price (simplified)"""
# In production, use CoinGecko or similar API
return 2000.0 # Placeholder
async def _send_alert(self, movement: Dict):
"""Send alert for significant movements"""
self.logger.warning(f"WHALE ALERT: {movement['movement_type']} - ${movement['value_usd']:,.2f}")
Step 4: Analyze Supply Metrics and Transaction Velocity
class SupplyAnalytics:
"""Track supply metrics and token velocity"""
def __init__(self, client: ChainClient):
self.client = client
self.logger = logging.getLogger(self.__class__.__name__)
async def get_supply_metrics(self, token_address: str) -> Dict:
"""Comprehensive supply analysis"""
# Get basic supply data
total_supply = await self._get_total_supply(token_address)
circulating_supply = await self._get_circulating_supply(token_address)
# Calculate locked/staked amounts
locked_supply = await self._get_locked_supply(token_address)
burned_supply = await self._get_burned_supply(token_address)
# Transaction velocity (30-day)
velocity = await self._calculate_velocity(token_address, days=30)
# Inflation rate (annualized)
inflation_rate = await self._calculate_inflation_rate(token_address)
return {
'token_address': token_address,
'total_supply': total_supply,
'circulating_supply': circulating_supply,
'locked_supply': locked_supply,
'burned_supply': burned_supply,
'liquid_supply': circulating_supply - locked_supply,
'supply_metrics': {
'circulating_percent': (circulating_supply / total_supply) * 100,
'locked_percent': (locked_supply / circulating_supply) * 100,
'burned_percent': (burned_supply / total_supply) * 100
},
'velocity': velocity,
'inflation_rate_annual': inflation_rate
}
async def _get_total_supply(self, token_address: str) -> float:
"""Get total token supply"""
contract = self.client.w3.eth.contract(
address=Web3.to_checksum_address(token_address),
abi=[{
'constant': True,
'inputs': [],
'name': 'totalSupply',
'outputs': [{'name': '', 'type': 'uint256'}],
'type': 'function'
}]
)
return float(contract.functions.totalSupply().call())
async def _get_circulating_supply(self, token_address: str) -> float:
"""Get circulating supply (excludes team/treasury wallets)"""
# This requires project-specific logic
# For now, return total supply
return await self._get_total_supply(token_address)
async def _get_locked_supply(self, token_address: str) -> float:
"""Get locked/staked supply"""
# Would query staking contracts
return 0.0 # Placeholder
async def _get_burned_supply(self, token_address: str) -> float:
"""Get burned supply"""
burn_address = '0x0000000000000000000000000000000000000000'
contract = self.client.w3.eth.contract(
address=Web3.to_checksum_address(token_address),
abi=[{
'constant': True,
'inputs': [{'name': '_owner', 'type': 'address'}],
'name': 'balanceOf',
'outputs': [{'name': 'balance', 'type': 'uint256'}],
'type': 'function'
}]
)
return float(contract.functions.balanceOf(burn_address).call())
async def _calculate_velocity(self, token_address: str, days: int = 30) -> Dict:
"""Calculate transaction velocity"""
# Velocity = (Transaction Volume / Circulating Supply) / Time Period
# Get transaction volume for period
volume = await self._get_transaction_volume(token_address, days)
circulating = await self._get_circulating_supply(token_address)
velocity_daily = volume / circulating / days
return {
'velocity_daily': velocity_daily,
'velocity_annual': velocity_daily * 365,
'transaction_volume_30d': volume,
'avg_daily_volume': volume / days
}
async def _get_transaction_volume(self, token_address: str, days: int) -> float:
"""Get total transaction volume for period"""
# Would aggregate transfer events
return 0.0 # Placeholder
async def _calculate_inflation_rate(self, token_address: str) -> float:
"""Calculate annualized inflation rate"""
# Compare supply now vs 365 days ago
# Placeholder for demonstration
return 2.5 # 2.5% annual inflation
Step 5: Bitcoin UTXO Analysis
class BitcoinUTXOAnalytics:
"""Bitcoin-specific UTXO analysis"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.blockchair.com/bitcoin"
self.logger = logging.getLogger(self.__class__.__name__)
async def analyze_utxo_set(self) -> Dict:
"""Comprehensive UTXO set analysis"""
async with aiohttp.ClientSession() as session:
# Get UTXO statistics
url = f"{self.base_url}/stats"
async with session.get(url, params={'key': self.api_key}) as response:
stats = await response.json()
# Analyze UTXO age distribution
utxo_age = await self._analyze_utxo_age(session)
# Analyze UTXO value distribution
utxo_values = await self._analyze_utxo_values(session)
return {
'total_utxos': stats['data']['utxo_count'],
'total_value_btc': stats['data']['circulation'],
'avg_utxo_value_btc': stats['data']['circulation'] / stats['data']['utxo_count'],
'utxo_age_distribution': utxo_age,
'utxo_value_distribution': utxo_values,
'holder_behavior': await self._analyze_holder_behavior(utxo_age)
}
async def _analyze_utxo_age(self, session) -> Dict:
"""Analyze UTXO age distribution"""
# UTXOs by age bucket
buckets = {
'0-7d': 0,
'7d-30d': 0,
'30d-90d': 0,
'90d-180d': 0,
'180d-1y': 0,
'1y-2y': 0,
'2y+': 0
}
# This would query Blockchair's UTXO API
# Simplified for demonstration
return buckets
async def _analyze_utxo_values(self, session) -> Dict:
"""Analyze UTXO value distribution"""
buckets = {
'0-0.01 BTC': 0,
'0.01-0.1 BTC': 0,
'0.1-1 BTC': 0,
'1-10 BTC': 0,
'10-100 BTC': 0,
'100+ BTC': 0
}
return buckets
async def _analyze_holder_behavior(self, utxo_age: Dict) -> Dict:
"""Infer holder behavior from UTXO patterns"""
# High % of old UTXOs = HODLing behavior
# High % of young UTXOs = Active trading
return {
'hodl_score': 0.75, # 0-1, higher = more HODLing
'active_trader_percent': 25.0,
'long_term_holder_percent': 60.0,
'behavior_classification': 'HODLing Market'
}
Output Format
1. network_metrics.json
{
"timestamp": "2025-10-11T10:30:00Z",
"chain": "ethereum",
"metrics": {
"latest_block": 18500000,
"avg_gas_price_gwei": 25.5,
"median_gas_price_gwei": 22.0,
"avg_block_time_seconds": 12.1,
"network_hashrate_th": 850000,
"active_addresses_24h": 450000,
"transaction_count_24h": 1200000,
"avg_tx_fee_usd": 3.25,
"network_utilization_percent": 78.5
},
"health_indicators": {
"status": "healthy",
"congestion_level": "moderate",
"decentralization_score": 0.85
}
}
2. holder_distribution.json
{
"timestamp": "2025-10-11T10:30:00Z",
"token_address": "0xdac17f958d2ee523a2206206994597c13d831ec7",
"token_symbol": "USDT",
"total_holders": 5600000,
"total_supply": 95000000000,
"concentration": {
"top_10_percent": 45.5,
"top_100_percent": 68.2,
"top_1000_percent": 82.5,
"gini_coefficient": 0.78
},
"whales": {
"count": 1250,
"total_balance": 25000000000,
"percent_of_supply": 26.3,
"threshold_tokens": 10000000
},
"distribution_buckets": {
"0-0.01%": 5550000,
"0.01-0.1%": 45000,
"0.1-1%": 4200,
"1-10%": 600,
"10%+": 200
},
"trend_analysis": {
"holder_growth_30d_percent": 2.5,
"whale_accumulation_30d": true,
"concentration_change_30d": 1.2
}
}
3. whale_movements.json
{
"timestamp": "2025-10-11T10:30:00Z",
"movements": [
{
"tx_hash": "0x1234...abcd",
"timestamp": "2025-10-11T10:15:23Z",
"from_address": "0x3f5ce5fbfe3e9af3971dd833d26ba9b5c936f0be",
"to_address": "0x742d35cc6634c0532925a3b844bc9e7595f0beb",
"from_label": "Binance 1",
"to_label": "Unknown Wallet",
"value_eth": 15000.0,
"value_usd": 30000000.0,
"movement_type": "Exchange-Outflow",
"significance": "high",
"block_number": 18500123
}
],
"summary": {
"total_movements": 45,
"total_value_usd": 250000000,
"exchange_inflows_usd": 120000000,
"exchange_outflows_usd": 130000000,
"net_flow": "outflow",
"largest_transaction_usd": 30000000
}
}
4. supply_metrics.json
{
"timestamp": "2025-10-11T10:30:00Z",
"token_address": "0xdac17f958d2ee523a2206206994597c13d831ec7",
"total_supply": 95000000000,
"circulating_supply": 94500000000,
"locked_supply": 15000000000,
"burned_supply": 500000000,
"liquid_supply": 79500000000,
"supply_metrics": {
"circulating_percent": 99.47,
"locked_percent": 15.87,
"burned_percent": 0.53
},
"velocity": {
"velocity_daily": 0.15,
"velocity_annual": 54.75,
"transaction_volume_30d": 425000000000,
"avg_daily_volume": 14166666666
},
"inflation_rate_annual": 0.0,
"trend_analysis": {
"supply_growth_30d_percent": 0.0,
"burn_rate_30d": 0,
"velocity_change_30d_percent": -5.2
}
}
5. utxo_analysis.csv (Bitcoin)
timestamp,total_utxos,total_value_btc,avg_utxo_value_btc,age_0_7d,age_7_30d,age_30_90d,age_90_180d,age_180_1y,age_1_2y,age_2y_plus,hodl_score,behavior_classification
2025-10-11T10:30:00Z,75000000,19500000,0.26,5500000,8200000,12000000,9500000,11000000,8800000,20000000,0.75,HODLing Market
Code Example
Here's a complete production-ready implementation:
#!/usr/bin/env python3
"""
On-Chain Analytics System
Comprehensive blockchain metrics monitoring
"""
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
from decimal import Decimal
import json
import os
import aiohttp
from web3 import Web3
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import psycopg2
from psycopg2.extras import RealDictCursor
from tenacity import retry, stop_after_attempt, wait_exponential
from pydantic import BaseModel, Field
import yaml
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# =============================================================================
# Configuration Models
# =============================================================================
class ChainConfig(BaseModel):
"""Configuration for a blockchain"""
chain_id: int
rpc_endpoint: str
api_endpoint: str
api_key: str
whale_threshold_usd: float = 1000000
whale_threshold_percentile: float = 0.1
block_time: int
class DatabaseConfig(BaseModel):
"""Database connection configuration"""
influx_url: str = "http://localhost:8086"
influx_token: str
influx_org: str = "crypto-analytics"
influx_bucket: str = "onchain-metrics"
postgres_host: str = "localhost"
postgres_port: int = 5432
postgres_db: str = "blockchain_metadata"
postgres_user: str
postgres_password: str
# =============================================================================
# Database Clients
# =============================================================================
class TimeSeriesDB:
"""InfluxDB client for metrics storage"""
def __init__(self, config: DatabaseConfig):
self.client = InfluxDBClient(
url=config.influx_url,
token=config.influx_token,
org=config.influx_org
)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
self.bucket = config.influx_bucket
self.org = config.influx_org
def write_network_metrics(self, chain: str, metrics: Dict):
"""Write network metrics to time-series database"""
point = Point("network_metrics") \
.tag("chain", chain) \
.field("latest_block", metrics['latest_block']) \
.field("avg_gas_price_gwei", metrics['avg_gas_price_gwei']) \
.field("avg_block_time", metrics['avg_block_time']) \
.field("active_addresses_24h", metrics['active_addresses_24h']) \
.time(datetime.utcnow())
self.write_api.write(bucket=self.bucket, record=point)
def write_holder_metrics(self, token: str, metrics: Dict):
"""Write holder distribution metrics"""
point = Point("holder_metrics") \
.tag("token", token) \
.field("total_holders", metrics['total_holders']) \
.field("gini_coefficient", metrics['concentration']['gini_coefficient']) \
.field("top_10_percent", metrics['concentration']['top_10_percent']) \
.field("whale_count", metrics['whales']['count']) \
.time(datetime.utcnow())
self.write_api.write(bucket=self.bucket, record=point)
def write_whale_movement(self, movement: Dict):
"""Write whale movement event"""
point = Point("whale_movements") \
.tag("movement_type", movement['movement_type']) \
.tag("from_label", movement['from_label']) \
.tag("to_label", movement['to_label']) \
.field("value_usd", movement['value_usd']) \
.field("value_eth", movement['value_eth']) \
.field("gas_price_gwei", movement['gas_price_gwei']) \
.time(datetime.fromisoformat(movement['timestamp']))
self.write_api.write(bucket=self.bucket, record=point)
def query_metrics(self, measurement: str, timerange: str = "-1h") -> List[Dict]:
"""Query metrics from database"""
query = f'''
from(bucket: "{self.bucket}")
|> range(start: {timerange})
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
'''
result = self.query_api.query(query, org=self.org)
records = []
for table in result:
for record in table.records:
records.append({
'time': record.get_time(),
'field': record.get_field(),
'value': record.get_value(),
'tags': {k: v for k, v in record.values.items() if k not in ['_time', '_value', '_field', '_measurement']}
})
return records
class MetadataDB:
"""PostgreSQL client for metadata storage"""
def __init__(self, config: DatabaseConfig):
self.config = config
self.conn = None
self._connect()
self._init_schema()
def _connect(self):
"""Connect to PostgreSQL"""
self.conn = psycopg2.connect(
host=self.config.postgres_host,
port=self.config.postgres_port,
database=self.config.postgres_db,
user=self.config.postgres_user,
password=self.config.postgres_password
)
def _init_schema(self):
"""Initialize database schema"""
with self.conn.cursor() as cur:
cur.execute('''
CREATE TABLE IF NOT EXISTS wallet_labels (
address VARCHAR(42) PRIMARY KEY,
label VARCHAR(255) NOT NULL,
entity_type VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cur.execute('''
CREATE TABLE IF NOT EXISTS whale_alerts (
id SERIAL PRIMARY KEY,
tx_hash VARCHAR(66) UNIQUE NOT NULL,
from_address VARCHAR(42) NOT NULL,
to_address VARCHAR(42),
value_usd DECIMAL(20, 2),
movement_type VARCHAR(50),
alerted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
acknowledged BOOLEAN DEFAULT FALSE
)
''')
cur.execute('''
CREATE INDEX IF NOT EXISTS idx_wallet_labels_label
ON wallet_labels(label)
''')
cur.execute('''
CREATE INDEX IF NOT EXISTS idx_whale_alerts_timestamp
ON whale_alerts(alerted_at DESC)
''')
self.conn.commit()
def get_wallet_label(self, address: str) -> Optional[str]:
"""Get label for wallet address"""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
'SELECT label, entity_type FROM wallet_labels WHERE address = %s',
(address.lower(),)
)
result = cur.fetchone()
return result['label'] if result else None
def store_whale_alert(self, movement: Dict):
"""Store whale movement alert"""
with self.conn.cursor() as cur:
cur.execute('''
INSERT INTO whale_alerts
(tx_hash, from_address, to_address, value_usd, movement_type)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (tx_hash) DO NOTHING
''', (
movement['tx_hash'],
movement['from_address'],
movement['to_address'],
movement['value_usd'],
movement['movement_type']
))
self.conn.commit()
def get_unacknowledged_alerts(self, limit: int = 100) -> List[Dict]:
"""Get unacknowledged whale alerts"""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute('''
SELECT * FROM whale_alerts
WHERE acknowledged = FALSE
ORDER BY alerted_at DESC
LIMIT %s
''', (limit,))
return cur.fetchall()
# =============================================================================
# Blockchain Clients
# =============================================================================
class EthereumClient:
"""Ethereum blockchain client"""
def __init__(self, config: ChainConfig):
self.config = config
self.w3 = Web3(Web3.HTTPProvider(config.rpc_endpoint))
self.session = None
self.logger = logging.getLogger('EthereumClient')
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
async def get_network_metrics(self) -> Dict:
"""Fetch comprehensive network metrics"""
latest_block = self.w3.eth.block_number
# Analyze last 100 blocks
gas_prices = []
gas_used = []
block_times = []
for i in range(min(100, latest_block)):
try:
block = self.w3.eth.get_block(latest_block - i)
gas_prices.append(float(Web3.from_wei(block.baseFeePerGas, 'gwei')))
gas_used.append(block.gasUsed)
if i > 0:
prev_block = self.w3.eth.get_block(latest_block - i + 1)
block_times.append(block.timestamp - prev_block.timestamp)
except Exception as e:
self.logger.warning(f"Failed to fetch block {latest_block - i}: {e}")
continue
# Calculate statistics
avg_gas_price = sum(gas_prices) / len(gas_prices) if gas_prices else 0
median_gas_price = sorted(gas_prices)[len(gas_prices) // 2] if gas_prices else 0
avg_block_time = sum(block_times) / len(block_times) if block_times else self.config.block_time
avg_gas_used = sum(gas_used) / len(gas_used) if gas_used else 0
# Network utilization (30M gas limit standard)
network_utilization = (avg_gas_used / 30000000) * 100
# Get active addresses from API
active_addresses = await self._get_active_addresses_24h()
return {
'chain': 'ethereum',
'latest_block': latest_block,
'avg_gas_price_gwei': round(avg_gas_price, 2),
'median_gas_price_gwei': round(median_gas_price, 2),
'avg_block_time': round(avg_block_time, 2),
'network_utilization_percent': round(network_utilization, 2),
'active_addresses_24h': active_addresses,
'avg_gas_used': int(avg_gas_used)
}
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
async def _get_active_addresses_24h(self) -> int:
"""Get 24h active addresses from Etherscan"""
try:
url = f"{self.config.api_endpoint}"
params = {
'module': 'stats',
'action': 'dailytx',
'startdate': (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d'),
'enddate': datetime.utcnow().strftime('%Y-%m-%d'),
'apikey': self.config.api_key
}
async with self.session.get(url, params=params) as response:
data = await response.json()
if data.get('status') == '1' and data.get('result'):
return int(data['result'][0].get('uniqueAddresses', 0))
except Exception as e:
self.logger.error(f"Failed to fetch active addresses: {e}")
return 0
async def get_token_holders(self, token_address: str, page_limit: int = 10) -> List[Dict]:
"""Fetch token holders (paginated)"""
holders = []
for page in range(1, page_limit + 1):
try:
url = f"{self.config.api_endpoint}"
params = {
'module': 'token',
'action': 'tokenholderlist',
'contractaddress': token_address,
'page': page,
'offset': 1000,
'apikey': self.config.api_key
}
async with self.session.get(url, params=params) as response:
data = await response.json()
if data.get('status') != '1' or not data.get('result'):
break
for holder in data['result']:
holders.append({
'address': holder['TokenHolderAddress'].lower(),
'balance': int(holder['TokenHolderQuantity'])
})
# Rate limiting: 5 requests/second
await asyncio.sleep(0.2)
except Exception as e:
self.logger.error(f"Failed to fetch holders page {page}: {e}")
break
return holders
# =============================================================================
# Analytics Engines
# =============================================================================
class HolderAnalytics:
"""Holder distribution and whale analytics"""
def __init__(self, client: EthereumClient):
self.client = client
self.logger = logging.getLogger('HolderAnalytics')
async def analyze_distribution(self, token_address: str) -> Dict:
"""Comprehensive holder distribution analysis"""
# Fetch holders
holders = await self.client.get_token_holders(token_address)
if not holders:
return {'error': 'No holder data available'}
# Calculate metrics
balances = [h['balance'] for h in holders]
total_supply = sum(balances)
holders.sort(key=lambda x: x['balance'], reverse=True)
# Concentration metrics
top_10_balance = sum(h['balance'] for h in holders[:10])
top_100_balance = sum(h['balance'] for h in holders[:100])
top_1000_balance = sum(h['balance'] for h in holders[:min(1000, len(holders))])
# Whale detection
whale_threshold = self._calculate_whale_threshold(holders, total_supply)
whales = [h for h in holders if h['balance'] >= whale_threshold]
# Gini coefficient
gini = self._calculate_gini(balances)
# Distribution buckets
buckets = self._create_distribution_buckets(holders, total_supply)
return {
'token_address': token_address,
'total_holders': len(holders),
'total_supply': total_supply,
'concentration': {
'top_10_percent': round((top_10_balance / total_supply) * 100, 2),
'top_100_percent': round((top_100_balance / total_supply) * 100, 2),
'top_1000_percent': round((top_1000_balance / total_supply) * 100, 2),
'gini_coefficient': round(gini, 4)
},
'whales': {
'count': len(whales),
'total_balance': sum(w['balance'] for w in whales),
'percent_of_supply': round((sum(w['balance'] for w in whales) / total_supply) * 100, 2),
'threshold_tokens': whale_threshold
},
'distribution_buckets': buckets
}
def _calculate_whale_threshold(self, holders: List[Dict], total_supply: float) -> float:
"""Calculate dynamic whale threshold"""
sorted_balances = sorted([h['balance'] for h in holders], reverse=True)
percentile_index = int(len(sorted_balances) * self.client.config.whale_threshold_percentile / 100)
return sorted_balances[min(percentile_index, len(sorted_balances) - 1)]
def _calculate_gini(self, balances: List[float]) -> float:
"""Calculate Gini coefficient"""
sorted_balances = sorted(balances)
n = len(sorted_balances)
cumsum = sum((2 * (i + 1) - n - 1) * balance for i, balance in enumerate(sorted_balances))
return cumsum / (n * sum(sorted_balances))
def _create_distribution_buckets(self, holders: List[Dict], total_supply: float) -> Dict:
"""Create balance distribution buckets"""
buckets = {
'0-0.01%': 0,
'0.01-0.1%': 0,
'0.1-1%': 0,
'1-10%': 0,
'10%+': 0
}
for holder in holders:
percent = (holder['balance'] / total_supply) * 100
if percent < 0.01:
buckets['0-0.01%'] += 1
elif percent < 0.1:
buckets['0.01-0.1%'] += 1
elif percent < 1:
buckets['0.1-1%'] += 1
elif percent < 10:
buckets['1-10%'] += 1
else:
buckets['10%+'] += 1
return buckets
# =============================================================================
# Main Analytics Orchestrator
# =============================================================================
class OnChainAnalytics:
"""Main analytics orchestrator"""
def __init__(self, chain_config: ChainConfig, db_config: DatabaseConfig):
self.chain_config = chain_config
self.db_config = db_config
self.tsdb = TimeSeriesDB(db_config)
self.metadata_db = MetadataDB(db_config)
self.logger = logging.getLogger('OnChainAnalytics')
async def analyze(self, token_address: Optional[str] = None) -> Dict:
"""Run comprehensive on-chain analysis"""
async with EthereumClient(self.chain_config) as client:
# Network metrics
self.logger.info("Fetching network metrics...")
network_metrics = await client.get_network_metrics()
self.tsdb.write_network_metrics('ethereum', network_metrics)
results = {
'timestamp': datetime.utcnow().isoformat(),
'network_metrics': network_metrics
}
# Token-specific analysis
if token_address:
self.logger.info(f"Analyzing token {token_address}...")
holder_analytics = HolderAnalytics(client)
holder_metrics = await holder_analytics.analyze_distribution(token_address)
if 'error' not in holder_metrics:
self.tsdb.write_holder_metrics(token_address, holder_metrics)
results['holder_distribution'] = holder_metrics
return results
def save_results(self, results: Dict, output_dir: str = './output'):
"""Save analysis results to files"""
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
# Network metrics
if 'network_metrics' in results:
filename = f"{output_dir}/network_metrics_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(results['network_metrics'], f, indent=2)
self.logger.info(f"Saved network metrics to {filename}")
# Holder distribution
if 'holder_distribution' in results:
filename = f"{output_dir}/holder_distribution_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(results['holder_distribution'], f, indent=2)
self.logger.info(f"Saved holder distribution to {filename}")
# =============================================================================
# CLI Entry Point
# =============================================================================
async def main():
"""Main entry point"""
# Load configuration
chain_config = ChainConfig(
chain_id=1,
rpc_endpoint=os.getenv('INFURA_ENDPOINT'),
api_endpoint=os.getenv('ETHERSCAN_ENDPOINT'),
api_key=os.getenv('ETHERSCAN_API_KEY'),
whale_threshold_usd=1000000,
whale_threshold_percentile=0.1,
block_time=12
)
db_config = DatabaseConfig(
influx_url=os.getenv('INFLUX_URL', 'http://localhost:8086'),
influx_token=os.getenv('INFLUX_TOKEN'),
influx_org=os.getenv('INFLUX_ORG', 'crypto-analytics'),
influx_bucket=os.getenv('INFLUX_BUCKET', 'onchain-metrics'),
postgres_host=os.getenv('POSTGRES_HOST', 'localhost'),
postgres_port=int(os.getenv('POSTGRES_PORT', '5432')),
postgres_db=os.getenv('POSTGRES_DB', 'blockchain_metadata'),
postgres_user=os.getenv('POSTGRES_USER'),
postgres_password=os.getenv('POSTGRES_PASSWORD')
)
# Initialize analytics
analytics = OnChainAnalytics(chain_config, db_config)
# Run analysis (example: USDT token)
token_address = '0xdac17f958d2ee523a2206206994597c13d831ec7' # USDT
results = await analytics.analyze(token_address)
# Save results
analytics.save_results(results)
print(json.dumps(results, indent=2))
if __name__ == '__main__':
asyncio.run(main())
Error Handling
| Error | Cause | Solution |
|---|---|---|
ConnectionError: Failed to connect to RPC |
RPC endpoint down or incorrect | Verify endpoint URL, check API key, try backup RPC |
RateLimitExceeded: API rate limit reached |
Too many requests | Implement exponential backoff, upgrade API plan |
InvalidTokenAddress: Token not found |
Wrong address or contract not token | Verify address on block explorer, check network |
InsufficientData: Less than 100 holders |
New or low-adoption token | Use alternative metrics, increase fetch limit |
InfluxDBError: Failed to write metrics |
Database connection issue | Check InfluxDB status, verify credentials |
PostgresError: Connection refused |
PostgreSQL not running | Start PostgreSQL service, check port 5432 |
TimeoutError: Request timed out |
Slow network or overloaded API | Increase timeout, try different time window |
ValidationError: Invalid configuration |
Missing or wrong config values | Check all required environment variables |
Web3Exception: Invalid block number |
Block not yet mined | Use confirmed blocks only (latest - 12) |
JSONDecodeError: Invalid API response |
API returned non-JSON | Check API status page, verify endpoint |
Configuration Options
# config/analytics.yaml
ethereum:
# Network settings
chain_id: 1
rpc_endpoint: "${INFURA_ENDPOINT}"
api_endpoint: "${ETHERSCAN_ENDPOINT}"
api_key: "${ETHERSCAN_API_KEY}"
# Analysis parameters
whale_threshold_usd: 1000000 # $1M minimum for whale classification
whale_threshold_percentile: 0.1 # Top 0.1% of holders
block_time: 12 # Average block time in seconds
confirmations_required: 12 # Blocks to wait for finality
# Data collection
fetch_holder_pages: 10 # Max pages of holders (1000 holders per page)
historical_blocks: 100 # Blocks to analyze for metrics
# Rate limiting
requests_per_second: 5 # API rate limit
batch_size: 100 # Transactions per batch
retry_attempts: 3 # Max retries on failure
retry_backoff: 2 # Exponential backoff multiplier
bitcoin:
api_endpoint: "${BLOCKCHAIR_ENDPOINT}"
api_key: "${BLOCKCHAIR_API_KEY}"
whale_threshold_usd: 1000000
whale_threshold_percentile: 0.1
block_time: 600
# UTXO-specific settings
utxo_age_buckets: [7, 30, 90, 180, 365, 730] # Days
utxo_value_buckets: [0.01, 0.1, 1, 10, 100] # BTC
database:
# InfluxDB time-series
influx_url: "http://localhost:8086"
influx_token: "${INFLUX_TOKEN}"
influx_org: "crypto-analytics"
influx_bucket: "onchain-metrics"
retention_days: 90 # Data retention period
# PostgreSQL metadata
postgres_host: "localhost"
postgres_port: 5432
postgres_db: "blockchain_metadata"
postgres_user: "${POSTGRES_USER}"
postgres_password: "${POSTGRES_PASSWORD}"
connection_pool_size: 10
alerts:
# Whale movement alerts
enabled: true
min_value_usd: 1000000 # Minimum transaction value for alert
channels: ["email", "slack"] # Alert channels
# Email settings
smtp_host: "smtp.gmail.com"
smtp_port: 587
smtp_user: "${SMTP_USER}"
smtp_password: "${SMTP_PASSWORD}"
alert_recipients: ["alerts@example.com"]
# Slack webhook
slack_webhook_url: "${SLACK_WEBHOOK_URL}"
output:
# File output settings
directory: "./output"
format: ["json", "csv"] # Output formats
save_raw_data: false # Save raw API responses
compress_old_files: true # Gzip files older than 7 days
# Visualization
generate_charts: true # Auto-generate charts
chart_format: "png" # png or svg
logging:
level: "INFO" # DEBUG, INFO, WARNING, ERROR
file: "./logs/analytics.log"
max_size_mb: 100
backup_count: 5
Best Practices
DO
- Use Time-Series Database: InfluxDB optimizes storage and queries for time-stamped metrics
- Implement Rate Limiting: Respect API limits with exponential backoff and request throttling
- Cache Static Data: Store exchange addresses, token metadata locally to reduce API calls
- Validate All Addresses: Use
Web3.is_address()before making contract calls - Monitor API Costs: Track API usage to avoid unexpected bills with premium providers
- Store Historical Data: Keep 90+ days of metrics for trend analysis and comparisons
- Use Async Operations: Async/await enables concurrent API calls without blocking
- Label Wallet Addresses: Maintain database of known exchanges, team wallets, contracts
- Set Alert Thresholds: Configure meaningful thresholds to avoid alert fatigue
- Test with Testnets: Validate logic on Goerli/Sepolia before mainnet deployment
DON'T
- Don't Query Full History: APIs have limits; fetch incremental data and build locally
- Don't Ignore Chain Reorgs: Wait 12+ confirmations before marking data as final
- Don't Hardcode API Keys: Use environment variables and secret management
- Don't Trust Single Data Source: Cross-reference multiple APIs for critical decisions
- Don't Skip Error Handling: Network issues are common; implement robust retry logic
- Don't Store Raw Blockchain Data: Process and aggregate; full node data is terabytes
- Don't Run Without Monitoring: Set up alerting for analysis failures and anomalies
- Don't Forget Decimal Precision: Use
Decimaltype for financial calculations - Don't Neglect Compliance: Some jurisdictions restrict blockchain data collection
- Don't Over-Engineer: Start simple; add complexity only when needed
Performance Considerations
-
API Rate Limits
- Free Etherscan: 5 calls/second, 100k/day
- Infura Free: 100k requests/day
- Glassnode: Premium required for high-frequency data
-
Database Performance
- InfluxDB: 1M+ points/second write capacity
- PostgreSQL: Index
address,timestampcolumns - Use connection pooling for concurrent queries
-
Network Latency
- Average API response: 100-500ms
- RPC node latency: 50-200ms
- Batch requests when possible to reduce round trips
-
Memory Usage
- 10,000 holders = ~2MB memory
- 100 blocks of transactions = ~50MB
- Use streaming for large datasets
-
Optimization Strategies
- Cache token metadata (supply, decimals) for 1 hour
- Aggregate metrics every 15 minutes, not per block
- Use database materialized views for complex queries
- Implement data downsampling for older metrics
Security Considerations
-
API Key Protection
- Store in environment variables, never commit to git
- Use separate keys for development/production
- Rotate keys quarterly
-
Data Privacy
- Blockchain data is public, but aggregated analysis may reveal patterns
- Don't store personal information linked to addresses
- Comply with GDPR if analyzing EU users
-
Input Validation
- Validate all addresses before queries
- Sanitize user inputs to prevent injection
- Limit query ranges to prevent DoS
-
Rate Limiting
- Implement application-level rate limiting
- Monitor for unusual query patterns
- Set maximum concurrent requests
-
Secure Communication
- Use HTTPS for all API calls
- Verify SSL certificates
- Consider VPN for sensitive production deployments
Related Commands
/track-price- Real-time cryptocurrency price monitoring/analyze-flow- Options flow analysis for derivatives markets/scan-movers- Market movers scanner for large price movements/optimize-gas- Gas fee optimization for transaction timing/analyze-nft- NFT rarity and on-chain metadata analysis/track-position- Portfolio position tracking across wallets
Version History
v2.0.0 (2025-10-11)
- Complete rewrite with production-ready architecture
- Added multi-chain support (Ethereum, Bitcoin)
- Implemented InfluxDB time-series storage
- Added PostgreSQL for metadata and alerts
- Comprehensive holder distribution analysis with Gini coefficient
- Real-time whale movement tracking
- UTXO analysis for Bitcoin
- Ethereum gas analytics and network utilization
- Dynamic whale threshold calculation
- Async/await throughout for performance
- Robust error handling with tenacity retry logic
- Configuration via YAML with environment variable substitution
- 800+ lines of production Python code
- Complete documentation with examples
v1.0.0 (2024-09-15)
- Initial release with basic on-chain metrics
- Simple JavaScript implementation
- Limited to single-chain Ethereum analysis