1657 lines
60 KiB
Markdown
1657 lines
60 KiB
Markdown
---
|
|
description: Comprehensive blockchain metrics monitoring and on-chain analytics system
|
|
shortcut: ac
|
|
tags: [blockchain, on-chain, metrics, whale-tracking, network-analysis, crypto-analytics]
|
|
version: 2.0.0
|
|
author: claude-code-plugins
|
|
---
|
|
|
|
# Analyze On-Chain Data
|
|
|
|
Comprehensive blockchain metrics monitoring system that tracks network health, holder distribution, whale movements, exchange flows, supply metrics, and transaction velocity across multiple blockchain networks.
|
|
|
|
## When to Use
|
|
|
|
Use this command when you need to:
|
|
|
|
- **Monitor Network Health**: Track hash rate, difficulty, active addresses, and transaction throughput
|
|
- **Analyze Holder Distribution**: Identify whale concentration, wealth distribution, and accumulation patterns
|
|
- **Track Exchange Flows**: Monitor deposit/withdrawal patterns, exchange reserves, and liquidity movements
|
|
- **Analyze Supply Metrics**: Track circulating supply, staked amounts, burned tokens, and inflation rates
|
|
- **Measure Transaction Velocity**: Calculate token circulation speed, active address growth, and network utilization
|
|
- **Bitcoin UTXO Analysis**: Track unspent transaction outputs, UTXO age distribution, and holder behavior
|
|
- **Ethereum Gas Analytics**: Monitor gas prices, network congestion, and smart contract activity
|
|
- **Whale Movement Alerts**: Real-time tracking of large transactions and address clustering
|
|
- **Market Research**: Data-driven insights for trading strategies and investment decisions
|
|
- **Compliance Monitoring**: Track sanctioned addresses, suspicious patterns, and regulatory requirements
|
|
|
|
## DON'T Use When
|
|
|
|
- **Real-time Trading**: Use dedicated trading APIs for sub-second execution
|
|
- **Price Prediction**: On-chain metrics show trends, not future prices
|
|
- **Legal Advice**: Consult blockchain forensics experts for legal cases
|
|
- **Single Transaction Tracking**: Use block explorers for individual transactions
|
|
- **Historical Data Older Than 7 Years**: Most APIs have limited historical depth
|
|
|
|
## Design Decisions
|
|
|
|
### 1. Multi-Chain Architecture vs Single Chain Focus
|
|
|
|
**Chosen Approach**: Multi-chain with chain-specific adapters
|
|
|
|
**Why**: Different blockchains require different metrics and APIs. Bitcoin uses UTXO model while Ethereum uses account-based model. A unified interface with chain-specific implementations provides flexibility while maintaining consistent output formats.
|
|
|
|
**Alternatives Considered**:
|
|
- **Single Chain Only**: Rejected due to limited market coverage
|
|
- **Chain-Agnostic Generic Metrics**: Rejected due to loss of blockchain-specific insights
|
|
- **Separate Tools Per Chain**: Rejected due to code duplication and maintenance burden
|
|
|
|
### 2. Data Source Strategy: APIs vs Full Node
|
|
|
|
**Chosen Approach**: Hybrid API-based with optional full node support
|
|
|
|
**Why**: Most users don't run full nodes. Using APIs (Etherscan, Blockchain.com, Glassnode) provides immediate access without infrastructure requirements. Full node support remains optional for advanced users needing maximum decentralization.
|
|
|
|
**Alternatives Considered**:
|
|
- **Full Node Required**: Rejected due to high infrastructure costs ($500+/month)
|
|
- **APIs Only**: Considered but added full node option for enterprise users
|
|
- **Light Client Sync**: Rejected due to incomplete data and sync time requirements
|
|
|
|
### 3. Whale Detection Threshold Method
|
|
|
|
**Chosen Approach**: Dynamic percentile-based thresholds with absolute minimums
|
|
|
|
**Why**: Static thresholds (e.g., "1000 BTC = whale") become outdated as prices change. Using percentile-based detection (top 0.1% holders) with absolute minimums (e.g., $1M USD equivalent) adapts to market conditions.
|
|
|
|
**Alternatives Considered**:
|
|
- **Fixed Token Amounts**: Rejected due to price volatility making thresholds obsolete
|
|
- **USD Value Only**: Rejected as it misses on-chain concentration patterns
|
|
- **Machine Learning Clustering**: Rejected due to complexity vs accuracy tradeoff
|
|
|
|
### 4. Data Storage: Time-Series Database vs Relational
|
|
|
|
**Chosen Approach**: Time-series database (InfluxDB) for metrics, PostgreSQL for metadata
|
|
|
|
**Why**: On-chain metrics are time-stamped numerical data perfect for time-series databases. InfluxDB provides efficient storage, fast queries, and built-in downsampling. PostgreSQL stores wallet metadata, labels, and relationships.
|
|
|
|
**Alternatives Considered**:
|
|
- **PostgreSQL Only**: Rejected due to poor time-series query performance
|
|
- **InfluxDB Only**: Rejected due to poor relational data handling
|
|
- **ElasticSearch**: Rejected due to higher complexity and resource requirements
|
|
|
|
### 5. Real-Time vs Batch Processing
|
|
|
|
**Chosen Approach**: Hybrid batch + real-time for whale alerts
|
|
|
|
**Why**: Most metrics update every 10-15 minutes (blockchain confirmation time). Batch processing handles historical data efficiently. Real-time processing monitors mempool for large transactions requiring immediate alerts.
|
|
|
|
**Alternatives Considered**:
|
|
- **Batch Only**: Rejected due to delayed whale movement detection
|
|
- **Real-Time Only**: Rejected due to high API costs and rate limits
|
|
- **Streaming-First Architecture**: Rejected as overkill for blockchain data latency
|
|
|
|
## Prerequisites
|
|
|
|
### 1. Blockchain API Access
|
|
|
|
**Ethereum**:
|
|
```bash
|
|
# Etherscan API (Free tier: 5 calls/second)
|
|
export ETHERSCAN_API_KEY="your_etherscan_key"
|
|
export ETHERSCAN_ENDPOINT="https://api.etherscan.io/api"
|
|
|
|
# Infura for direct node access (100k requests/day free)
|
|
export INFURA_PROJECT_ID="your_infura_project_id"
|
|
export INFURA_ENDPOINT="https://mainnet.infura.io/v3/${INFURA_PROJECT_ID}"
|
|
```
|
|
|
|
**Bitcoin**:
|
|
```bash
|
|
# Blockchain.com API (No key required, rate limited)
|
|
export BLOCKCHAIN_API_ENDPOINT="https://blockchain.info"
|
|
|
|
# Blockchair API (10k requests/day free)
|
|
export BLOCKCHAIR_API_KEY="your_blockchair_key"
|
|
export BLOCKCHAIR_ENDPOINT="https://api.blockchair.com"
|
|
```
|
|
|
|
**Premium Data Providers**:
|
|
```bash
|
|
# Glassnode (Professional tier required, $800+/month)
|
|
export GLASSNODE_API_KEY="your_glassnode_key"
|
|
|
|
# CoinMetrics (Enterprise tier, contact for pricing)
|
|
export COINMETRICS_API_KEY="your_coinmetrics_key"
|
|
```
|
|
|
|
### 2. Python Dependencies
|
|
|
|
```bash
|
|
pip install web3==6.11.0 \
|
|
requests==2.31.0 \
|
|
pandas==2.1.3 \
|
|
influxdb-client==1.38.0 \
|
|
psycopg2-binary==2.9.9 \
|
|
python-dotenv==1.0.0 \
|
|
aiohttp==3.9.1 \
|
|
tenacity==8.2.3 \
|
|
pydantic==2.5.0 \
|
|
pytz==2023.3
|
|
```
|
|
|
|
### 3. Database Infrastructure
|
|
|
|
**InfluxDB Setup**:
|
|
```bash
|
|
# Docker installation
|
|
docker run -d -p 8086:8086 \
|
|
-v influxdb-data:/var/lib/influxdb2 \
|
|
-e DOCKER_INFLUXDB_INIT_MODE=setup \
|
|
-e DOCKER_INFLUXDB_INIT_USERNAME=admin \
|
|
-e DOCKER_INFLUXDB_INIT_PASSWORD=secure_password_here \
|
|
-e DOCKER_INFLUXDB_INIT_ORG=crypto-analytics \
|
|
-e DOCKER_INFLUXDB_INIT_BUCKET=onchain-metrics \
|
|
influxdb:2.7
|
|
```
|
|
|
|
**PostgreSQL Setup**:
|
|
```bash
|
|
# Docker installation
|
|
docker run -d -p 5432:5432 \
|
|
-v postgres-data:/var/lib/postgresql/data \
|
|
-e POSTGRES_DB=blockchain_metadata \
|
|
-e POSTGRES_USER=crypto_analyst \
|
|
-e POSTGRES_PASSWORD=secure_password_here \
|
|
postgres:16
|
|
```
|
|
|
|
### 4. Configuration File
|
|
|
|
Create `config/chains.yaml`:
|
|
```yaml
|
|
ethereum:
|
|
chain_id: 1
|
|
rpc_endpoint: "${INFURA_ENDPOINT}"
|
|
api_endpoint: "${ETHERSCAN_ENDPOINT}"
|
|
api_key: "${ETHERSCAN_API_KEY}"
|
|
whale_threshold_usd: 1000000
|
|
whale_threshold_percentile: 0.1
|
|
block_time: 12
|
|
|
|
bitcoin:
|
|
rpc_endpoint: null
|
|
api_endpoint: "${BLOCKCHAIR_ENDPOINT}"
|
|
api_key: "${BLOCKCHAIR_API_KEY}"
|
|
whale_threshold_usd: 1000000
|
|
whale_threshold_percentile: 0.1
|
|
block_time: 600
|
|
```
|
|
|
|
### 5. System Requirements
|
|
|
|
- **CPU**: 4+ cores (8+ recommended for multi-chain)
|
|
- **RAM**: 8GB minimum (16GB recommended)
|
|
- **Storage**: 100GB SSD (time-series data grows ~1GB/day with full metrics)
|
|
- **Network**: Stable connection with 10+ Mbps (API rate limits are the bottleneck)
|
|
|
|
## Implementation Process
|
|
|
|
### Step 1: Initialize Multi-Chain Client
|
|
|
|
```python
|
|
from web3 import Web3
|
|
import aiohttp
|
|
from typing import Dict, Optional
|
|
import logging
|
|
|
|
class ChainClient:
|
|
"""Base class for blockchain clients"""
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
|
|
async def get_network_metrics(self) -> Dict:
|
|
raise NotImplementedError
|
|
|
|
async def get_holder_distribution(self, token: str) -> Dict:
|
|
raise NotImplementedError
|
|
|
|
class EthereumClient(ChainClient):
|
|
"""Ethereum-specific implementation"""
|
|
|
|
def __init__(self, config: Dict):
|
|
super().__init__(config)
|
|
self.w3 = Web3(Web3.HTTPProvider(config['rpc_endpoint']))
|
|
self.session = None
|
|
|
|
async def __aenter__(self):
|
|
self.session = aiohttp.ClientSession()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
await self.session.close()
|
|
|
|
async def get_latest_block(self) -> int:
|
|
"""Get latest block number"""
|
|
return self.w3.eth.block_number
|
|
|
|
async def get_network_metrics(self) -> Dict:
|
|
"""Fetch Ethereum network metrics"""
|
|
latest_block = await self.get_latest_block()
|
|
|
|
# Get last 100 blocks for gas analysis
|
|
gas_prices = []
|
|
block_times = []
|
|
|
|
for i in range(100):
|
|
block = self.w3.eth.get_block(latest_block - i)
|
|
gas_prices.append(Web3.from_wei(block.gasUsed, 'gwei'))
|
|
if i > 0:
|
|
prev_block = self.w3.eth.get_block(latest_block - i + 1)
|
|
block_times.append(block.timestamp - prev_block.timestamp)
|
|
|
|
return {
|
|
'chain': 'ethereum',
|
|
'latest_block': latest_block,
|
|
'avg_gas_price_gwei': sum(gas_prices) / len(gas_prices),
|
|
'median_gas_price_gwei': sorted(gas_prices)[len(gas_prices) // 2],
|
|
'avg_block_time': sum(block_times) / len(block_times),
|
|
'network_hashrate': await self._get_hashrate(),
|
|
'active_addresses_24h': await self._get_active_addresses()
|
|
}
|
|
|
|
async def _get_hashrate(self) -> Optional[float]:
|
|
"""Get network hashrate from Etherscan"""
|
|
try:
|
|
url = f"{self.config['api_endpoint']}?module=stats&action=chainsize&apikey={self.config['api_key']}"
|
|
async with self.session.get(url) as response:
|
|
data = await response.json()
|
|
return float(data['result'])
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to fetch hashrate: {e}")
|
|
return None
|
|
|
|
async def _get_active_addresses(self) -> Optional[int]:
|
|
"""Get 24h active addresses from Etherscan"""
|
|
try:
|
|
url = f"{self.config['api_endpoint']}?module=stats&action=dailytx&apikey={self.config['api_key']}"
|
|
async with self.session.get(url) as response:
|
|
data = await response.json()
|
|
if data['status'] == '1':
|
|
return int(data['result'][0]['uniqueAddresses'])
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to fetch active addresses: {e}")
|
|
return None
|
|
```
|
|
|
|
### Step 2: Implement Holder Distribution Analysis
|
|
|
|
```python
|
|
from collections import defaultdict
|
|
from decimal import Decimal
|
|
|
|
class HolderAnalytics:
|
|
"""Analyze token holder distribution and concentration"""
|
|
|
|
def __init__(self, client: ChainClient):
|
|
self.client = client
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
|
|
async def analyze_distribution(self, token_address: str) -> Dict:
|
|
"""Analyze holder distribution with whale detection"""
|
|
|
|
# Fetch all holders (this is a simplified version)
|
|
holders = await self._fetch_all_holders(token_address)
|
|
|
|
if not holders:
|
|
return {'error': 'No holder data available'}
|
|
|
|
# Calculate distribution metrics
|
|
balances = [h['balance'] for h in holders]
|
|
total_supply = sum(balances)
|
|
|
|
# Sort by balance descending
|
|
holders.sort(key=lambda x: x['balance'], reverse=True)
|
|
|
|
# Calculate concentration metrics
|
|
top_10_balance = sum(h['balance'] for h in holders[:10])
|
|
top_100_balance = sum(h['balance'] for h in holders[:100])
|
|
top_1000_balance = sum(h['balance'] for h in holders[:1000])
|
|
|
|
# Whale threshold (top 0.1% or $1M+ USD)
|
|
whale_threshold_tokens = self._calculate_whale_threshold(
|
|
holders,
|
|
total_supply,
|
|
self.client.config['whale_threshold_percentile']
|
|
)
|
|
|
|
whales = [h for h in holders if h['balance'] >= whale_threshold_tokens]
|
|
|
|
# Calculate Gini coefficient (wealth inequality)
|
|
gini = self._calculate_gini(balances)
|
|
|
|
# Distribution buckets
|
|
buckets = self._create_distribution_buckets(holders, total_supply)
|
|
|
|
return {
|
|
'token_address': token_address,
|
|
'total_holders': len(holders),
|
|
'total_supply': total_supply,
|
|
'concentration': {
|
|
'top_10_percent': (top_10_balance / total_supply) * 100,
|
|
'top_100_percent': (top_100_balance / total_supply) * 100,
|
|
'top_1000_percent': (top_1000_balance / total_supply) * 100,
|
|
'gini_coefficient': gini
|
|
},
|
|
'whales': {
|
|
'count': len(whales),
|
|
'total_balance': sum(w['balance'] for w in whales),
|
|
'percent_of_supply': (sum(w['balance'] for w in whales) / total_supply) * 100,
|
|
'threshold_tokens': whale_threshold_tokens
|
|
},
|
|
'distribution_buckets': buckets
|
|
}
|
|
|
|
async def _fetch_all_holders(self, token_address: str) -> list:
|
|
"""Fetch all token holders (implementation depends on API)"""
|
|
# This would use Etherscan's token holder API
|
|
holders = []
|
|
page = 1
|
|
|
|
while True:
|
|
url = f"{self.client.config['api_endpoint']}?module=token&action=tokenholderlist&contractaddress={token_address}&page={page}&offset=1000&apikey={self.client.config['api_key']}"
|
|
|
|
async with self.client.session.get(url) as response:
|
|
data = await response.json()
|
|
|
|
if data['status'] != '1' or not data['result']:
|
|
break
|
|
|
|
for holder in data['result']:
|
|
holders.append({
|
|
'address': holder['TokenHolderAddress'],
|
|
'balance': int(holder['TokenHolderQuantity'])
|
|
})
|
|
|
|
page += 1
|
|
|
|
# Rate limiting
|
|
await asyncio.sleep(0.2) # 5 requests per second
|
|
|
|
return holders
|
|
|
|
def _calculate_whale_threshold(self, holders: list, total_supply: float, percentile: float) -> float:
|
|
"""Calculate dynamic whale threshold"""
|
|
sorted_balances = sorted([h['balance'] for h in holders], reverse=True)
|
|
index = int(len(sorted_balances) * percentile / 100)
|
|
return sorted_balances[min(index, len(sorted_balances) - 1)]
|
|
|
|
def _calculate_gini(self, balances: list) -> float:
|
|
"""Calculate Gini coefficient (0=perfect equality, 1=perfect inequality)"""
|
|
sorted_balances = sorted(balances)
|
|
n = len(sorted_balances)
|
|
cumsum = 0
|
|
|
|
for i, balance in enumerate(sorted_balances):
|
|
cumsum += (2 * (i + 1) - n - 1) * balance
|
|
|
|
return cumsum / (n * sum(sorted_balances))
|
|
|
|
def _create_distribution_buckets(self, holders: list, total_supply: float) -> Dict:
|
|
"""Create balance distribution buckets"""
|
|
buckets = {
|
|
'0-0.01%': 0,
|
|
'0.01-0.1%': 0,
|
|
'0.1-1%': 0,
|
|
'1-10%': 0,
|
|
'10%+': 0
|
|
}
|
|
|
|
for holder in holders:
|
|
percent = (holder['balance'] / total_supply) * 100
|
|
|
|
if percent < 0.01:
|
|
buckets['0-0.01%'] += 1
|
|
elif percent < 0.1:
|
|
buckets['0.01-0.1%'] += 1
|
|
elif percent < 1:
|
|
buckets['0.1-1%'] += 1
|
|
elif percent < 10:
|
|
buckets['1-10%'] += 1
|
|
else:
|
|
buckets['10%+'] += 1
|
|
|
|
return buckets
|
|
```
|
|
|
|
### Step 3: Track Whale Movements and Exchange Flows
|
|
|
|
```python
|
|
from datetime import datetime, timedelta
|
|
import asyncio
|
|
|
|
class WhaleTracker:
|
|
"""Real-time whale movement tracking"""
|
|
|
|
def __init__(self, client: ChainClient, db_client):
|
|
self.client = client
|
|
self.db = db_client
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
self.known_exchanges = self._load_exchange_addresses()
|
|
|
|
def _load_exchange_addresses(self) -> Dict[str, str]:
|
|
"""Load known exchange addresses"""
|
|
return {
|
|
'0x3f5ce5fbfe3e9af3971dd833d26ba9b5c936f0be': 'Binance 1',
|
|
'0xd551234ae421e3bcba99a0da6d736074f22192ff': 'Binance 2',
|
|
'0x28c6c06298d514db089934071355e5743bf21d60': 'Binance 14',
|
|
'0x21a31ee1afc51d94c2efccaa2092ad1028285549': 'Binance 15',
|
|
'0x564286362092d8e7936f0549571a803b203aaced': 'Binance 16',
|
|
'0x0681d8db095565fe8a346fa0277bffde9c0edbbf': 'Binance 17',
|
|
'0x4e9ce36e442e55ecd9025b9a6e0d88485d628a67': 'Binance 18',
|
|
'0xbe0eb53f46cd790cd13851d5eff43d12404d33e8': 'Binance 19',
|
|
'0xf977814e90da44bfa03b6295a0616a897441acec': 'Binance 8',
|
|
# Coinbase
|
|
'0x71660c4005ba85c37ccec55d0c4493e66fe775d3': 'Coinbase 1',
|
|
'0x503828976d22510aad0201ac7ec88293211d23da': 'Coinbase 2',
|
|
'0xddfabcdc4d8ffc6d5beaf154f18b778f892a0740': 'Coinbase 3',
|
|
'0x3cd751e6b0078be393132286c442345e5dc49699': 'Coinbase 4',
|
|
'0xb5d85cbf7cb3ee0d56b3bb207d5fc4b82f43f511': 'Coinbase 5',
|
|
'0xeb2629a2734e272bcc07bda959863f316f4bd4cf': 'Coinbase 6',
|
|
# Kraken
|
|
'0x2910543af39aba0cd09dbb2d50200b3e800a63d2': 'Kraken 1',
|
|
'0x0a869d79a7052c7f1b55a8ebabbea3420f0d1e13': 'Kraken 2',
|
|
'0xe853c56864a2ebe4576a807d26fdc4a0ada51919': 'Kraken 3',
|
|
'0x267be1c1d684f78cb4f6a176c4911b741e4ffdc0': 'Kraken 4',
|
|
}
|
|
|
|
async def track_movements(self, min_value_usd: float = 100000) -> list:
|
|
"""Track large transactions in real-time"""
|
|
movements = []
|
|
latest_block = await self.client.get_latest_block()
|
|
|
|
# Check last 10 blocks
|
|
for block_num in range(latest_block - 10, latest_block + 1):
|
|
block = self.client.w3.eth.get_block(block_num, full_transactions=True)
|
|
|
|
for tx in block.transactions:
|
|
value_eth = Web3.from_wei(tx.value, 'ether')
|
|
|
|
# Approximate USD value (would need price feed in production)
|
|
value_usd = value_eth * await self._get_eth_price()
|
|
|
|
if value_usd >= min_value_usd:
|
|
movement = await self._analyze_transaction(tx, value_usd)
|
|
movements.append(movement)
|
|
|
|
# Store in database
|
|
await self.db.store_whale_movement(movement)
|
|
|
|
# Send alert if significant
|
|
if value_usd >= 1000000: # $1M+
|
|
await self._send_alert(movement)
|
|
|
|
return movements
|
|
|
|
async def _analyze_transaction(self, tx, value_usd: float) -> Dict:
|
|
"""Analyze transaction and classify movement type"""
|
|
from_address = tx['from'].lower()
|
|
to_address = tx['to'].lower() if tx['to'] else None
|
|
|
|
from_label = self.known_exchanges.get(from_address, 'Unknown')
|
|
to_label = self.known_exchanges.get(to_address, 'Unknown') if to_address else 'Contract Creation'
|
|
|
|
# Classify movement type
|
|
if from_label != 'Unknown' and to_label != 'Unknown':
|
|
movement_type = 'Exchange-to-Exchange'
|
|
elif from_label != 'Unknown':
|
|
movement_type = 'Exchange-Outflow'
|
|
elif to_label != 'Unknown':
|
|
movement_type = 'Exchange-Inflow'
|
|
else:
|
|
movement_type = 'Whale-to-Whale'
|
|
|
|
return {
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'tx_hash': tx['hash'].hex(),
|
|
'from_address': from_address,
|
|
'to_address': to_address,
|
|
'from_label': from_label,
|
|
'to_label': to_label,
|
|
'value_eth': float(Web3.from_wei(tx.value, 'ether')),
|
|
'value_usd': value_usd,
|
|
'movement_type': movement_type,
|
|
'gas_price_gwei': Web3.from_wei(tx.gasPrice, 'gwei'),
|
|
'block_number': tx.blockNumber
|
|
}
|
|
|
|
async def _get_eth_price(self) -> float:
|
|
"""Get current ETH price (simplified)"""
|
|
# In production, use CoinGecko or similar API
|
|
return 2000.0 # Placeholder
|
|
|
|
async def _send_alert(self, movement: Dict):
|
|
"""Send alert for significant movements"""
|
|
self.logger.warning(f"WHALE ALERT: {movement['movement_type']} - ${movement['value_usd']:,.2f}")
|
|
```
|
|
|
|
### Step 4: Analyze Supply Metrics and Transaction Velocity
|
|
|
|
```python
|
|
class SupplyAnalytics:
|
|
"""Track supply metrics and token velocity"""
|
|
|
|
def __init__(self, client: ChainClient):
|
|
self.client = client
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
|
|
async def get_supply_metrics(self, token_address: str) -> Dict:
|
|
"""Comprehensive supply analysis"""
|
|
|
|
# Get basic supply data
|
|
total_supply = await self._get_total_supply(token_address)
|
|
circulating_supply = await self._get_circulating_supply(token_address)
|
|
|
|
# Calculate locked/staked amounts
|
|
locked_supply = await self._get_locked_supply(token_address)
|
|
burned_supply = await self._get_burned_supply(token_address)
|
|
|
|
# Transaction velocity (30-day)
|
|
velocity = await self._calculate_velocity(token_address, days=30)
|
|
|
|
# Inflation rate (annualized)
|
|
inflation_rate = await self._calculate_inflation_rate(token_address)
|
|
|
|
return {
|
|
'token_address': token_address,
|
|
'total_supply': total_supply,
|
|
'circulating_supply': circulating_supply,
|
|
'locked_supply': locked_supply,
|
|
'burned_supply': burned_supply,
|
|
'liquid_supply': circulating_supply - locked_supply,
|
|
'supply_metrics': {
|
|
'circulating_percent': (circulating_supply / total_supply) * 100,
|
|
'locked_percent': (locked_supply / circulating_supply) * 100,
|
|
'burned_percent': (burned_supply / total_supply) * 100
|
|
},
|
|
'velocity': velocity,
|
|
'inflation_rate_annual': inflation_rate
|
|
}
|
|
|
|
async def _get_total_supply(self, token_address: str) -> float:
|
|
"""Get total token supply"""
|
|
contract = self.client.w3.eth.contract(
|
|
address=Web3.to_checksum_address(token_address),
|
|
abi=[{
|
|
'constant': True,
|
|
'inputs': [],
|
|
'name': 'totalSupply',
|
|
'outputs': [{'name': '', 'type': 'uint256'}],
|
|
'type': 'function'
|
|
}]
|
|
)
|
|
return float(contract.functions.totalSupply().call())
|
|
|
|
async def _get_circulating_supply(self, token_address: str) -> float:
|
|
"""Get circulating supply (excludes team/treasury wallets)"""
|
|
# This requires project-specific logic
|
|
# For now, return total supply
|
|
return await self._get_total_supply(token_address)
|
|
|
|
async def _get_locked_supply(self, token_address: str) -> float:
|
|
"""Get locked/staked supply"""
|
|
# Would query staking contracts
|
|
return 0.0 # Placeholder
|
|
|
|
async def _get_burned_supply(self, token_address: str) -> float:
|
|
"""Get burned supply"""
|
|
burn_address = '0x0000000000000000000000000000000000000000'
|
|
contract = self.client.w3.eth.contract(
|
|
address=Web3.to_checksum_address(token_address),
|
|
abi=[{
|
|
'constant': True,
|
|
'inputs': [{'name': '_owner', 'type': 'address'}],
|
|
'name': 'balanceOf',
|
|
'outputs': [{'name': 'balance', 'type': 'uint256'}],
|
|
'type': 'function'
|
|
}]
|
|
)
|
|
return float(contract.functions.balanceOf(burn_address).call())
|
|
|
|
async def _calculate_velocity(self, token_address: str, days: int = 30) -> Dict:
|
|
"""Calculate transaction velocity"""
|
|
# Velocity = (Transaction Volume / Circulating Supply) / Time Period
|
|
|
|
# Get transaction volume for period
|
|
volume = await self._get_transaction_volume(token_address, days)
|
|
circulating = await self._get_circulating_supply(token_address)
|
|
|
|
velocity_daily = volume / circulating / days
|
|
|
|
return {
|
|
'velocity_daily': velocity_daily,
|
|
'velocity_annual': velocity_daily * 365,
|
|
'transaction_volume_30d': volume,
|
|
'avg_daily_volume': volume / days
|
|
}
|
|
|
|
async def _get_transaction_volume(self, token_address: str, days: int) -> float:
|
|
"""Get total transaction volume for period"""
|
|
# Would aggregate transfer events
|
|
return 0.0 # Placeholder
|
|
|
|
async def _calculate_inflation_rate(self, token_address: str) -> float:
|
|
"""Calculate annualized inflation rate"""
|
|
# Compare supply now vs 365 days ago
|
|
# Placeholder for demonstration
|
|
return 2.5 # 2.5% annual inflation
|
|
```
|
|
|
|
### Step 5: Bitcoin UTXO Analysis
|
|
|
|
```python
|
|
class BitcoinUTXOAnalytics:
|
|
"""Bitcoin-specific UTXO analysis"""
|
|
|
|
def __init__(self, api_key: str):
|
|
self.api_key = api_key
|
|
self.base_url = "https://api.blockchair.com/bitcoin"
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
|
|
async def analyze_utxo_set(self) -> Dict:
|
|
"""Comprehensive UTXO set analysis"""
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
# Get UTXO statistics
|
|
url = f"{self.base_url}/stats"
|
|
async with session.get(url, params={'key': self.api_key}) as response:
|
|
stats = await response.json()
|
|
|
|
# Analyze UTXO age distribution
|
|
utxo_age = await self._analyze_utxo_age(session)
|
|
|
|
# Analyze UTXO value distribution
|
|
utxo_values = await self._analyze_utxo_values(session)
|
|
|
|
return {
|
|
'total_utxos': stats['data']['utxo_count'],
|
|
'total_value_btc': stats['data']['circulation'],
|
|
'avg_utxo_value_btc': stats['data']['circulation'] / stats['data']['utxo_count'],
|
|
'utxo_age_distribution': utxo_age,
|
|
'utxo_value_distribution': utxo_values,
|
|
'holder_behavior': await self._analyze_holder_behavior(utxo_age)
|
|
}
|
|
|
|
async def _analyze_utxo_age(self, session) -> Dict:
|
|
"""Analyze UTXO age distribution"""
|
|
# UTXOs by age bucket
|
|
buckets = {
|
|
'0-7d': 0,
|
|
'7d-30d': 0,
|
|
'30d-90d': 0,
|
|
'90d-180d': 0,
|
|
'180d-1y': 0,
|
|
'1y-2y': 0,
|
|
'2y+': 0
|
|
}
|
|
|
|
# This would query Blockchair's UTXO API
|
|
# Simplified for demonstration
|
|
return buckets
|
|
|
|
async def _analyze_utxo_values(self, session) -> Dict:
|
|
"""Analyze UTXO value distribution"""
|
|
buckets = {
|
|
'0-0.01 BTC': 0,
|
|
'0.01-0.1 BTC': 0,
|
|
'0.1-1 BTC': 0,
|
|
'1-10 BTC': 0,
|
|
'10-100 BTC': 0,
|
|
'100+ BTC': 0
|
|
}
|
|
|
|
return buckets
|
|
|
|
async def _analyze_holder_behavior(self, utxo_age: Dict) -> Dict:
|
|
"""Infer holder behavior from UTXO patterns"""
|
|
|
|
# High % of old UTXOs = HODLing behavior
|
|
# High % of young UTXOs = Active trading
|
|
|
|
return {
|
|
'hodl_score': 0.75, # 0-1, higher = more HODLing
|
|
'active_trader_percent': 25.0,
|
|
'long_term_holder_percent': 60.0,
|
|
'behavior_classification': 'HODLing Market'
|
|
}
|
|
```
|
|
|
|
## Output Format
|
|
|
|
### 1. network_metrics.json
|
|
|
|
```json
|
|
{
|
|
"timestamp": "2025-10-11T10:30:00Z",
|
|
"chain": "ethereum",
|
|
"metrics": {
|
|
"latest_block": 18500000,
|
|
"avg_gas_price_gwei": 25.5,
|
|
"median_gas_price_gwei": 22.0,
|
|
"avg_block_time_seconds": 12.1,
|
|
"network_hashrate_th": 850000,
|
|
"active_addresses_24h": 450000,
|
|
"transaction_count_24h": 1200000,
|
|
"avg_tx_fee_usd": 3.25,
|
|
"network_utilization_percent": 78.5
|
|
},
|
|
"health_indicators": {
|
|
"status": "healthy",
|
|
"congestion_level": "moderate",
|
|
"decentralization_score": 0.85
|
|
}
|
|
}
|
|
```
|
|
|
|
### 2. holder_distribution.json
|
|
|
|
```json
|
|
{
|
|
"timestamp": "2025-10-11T10:30:00Z",
|
|
"token_address": "0xdac17f958d2ee523a2206206994597c13d831ec7",
|
|
"token_symbol": "USDT",
|
|
"total_holders": 5600000,
|
|
"total_supply": 95000000000,
|
|
"concentration": {
|
|
"top_10_percent": 45.5,
|
|
"top_100_percent": 68.2,
|
|
"top_1000_percent": 82.5,
|
|
"gini_coefficient": 0.78
|
|
},
|
|
"whales": {
|
|
"count": 1250,
|
|
"total_balance": 25000000000,
|
|
"percent_of_supply": 26.3,
|
|
"threshold_tokens": 10000000
|
|
},
|
|
"distribution_buckets": {
|
|
"0-0.01%": 5550000,
|
|
"0.01-0.1%": 45000,
|
|
"0.1-1%": 4200,
|
|
"1-10%": 600,
|
|
"10%+": 200
|
|
},
|
|
"trend_analysis": {
|
|
"holder_growth_30d_percent": 2.5,
|
|
"whale_accumulation_30d": true,
|
|
"concentration_change_30d": 1.2
|
|
}
|
|
}
|
|
```
|
|
|
|
### 3. whale_movements.json
|
|
|
|
```json
|
|
{
|
|
"timestamp": "2025-10-11T10:30:00Z",
|
|
"movements": [
|
|
{
|
|
"tx_hash": "0x1234...abcd",
|
|
"timestamp": "2025-10-11T10:15:23Z",
|
|
"from_address": "0x3f5ce5fbfe3e9af3971dd833d26ba9b5c936f0be",
|
|
"to_address": "0x742d35cc6634c0532925a3b844bc9e7595f0beb",
|
|
"from_label": "Binance 1",
|
|
"to_label": "Unknown Wallet",
|
|
"value_eth": 15000.0,
|
|
"value_usd": 30000000.0,
|
|
"movement_type": "Exchange-Outflow",
|
|
"significance": "high",
|
|
"block_number": 18500123
|
|
}
|
|
],
|
|
"summary": {
|
|
"total_movements": 45,
|
|
"total_value_usd": 250000000,
|
|
"exchange_inflows_usd": 120000000,
|
|
"exchange_outflows_usd": 130000000,
|
|
"net_flow": "outflow",
|
|
"largest_transaction_usd": 30000000
|
|
}
|
|
}
|
|
```
|
|
|
|
### 4. supply_metrics.json
|
|
|
|
```json
|
|
{
|
|
"timestamp": "2025-10-11T10:30:00Z",
|
|
"token_address": "0xdac17f958d2ee523a2206206994597c13d831ec7",
|
|
"total_supply": 95000000000,
|
|
"circulating_supply": 94500000000,
|
|
"locked_supply": 15000000000,
|
|
"burned_supply": 500000000,
|
|
"liquid_supply": 79500000000,
|
|
"supply_metrics": {
|
|
"circulating_percent": 99.47,
|
|
"locked_percent": 15.87,
|
|
"burned_percent": 0.53
|
|
},
|
|
"velocity": {
|
|
"velocity_daily": 0.15,
|
|
"velocity_annual": 54.75,
|
|
"transaction_volume_30d": 425000000000,
|
|
"avg_daily_volume": 14166666666
|
|
},
|
|
"inflation_rate_annual": 0.0,
|
|
"trend_analysis": {
|
|
"supply_growth_30d_percent": 0.0,
|
|
"burn_rate_30d": 0,
|
|
"velocity_change_30d_percent": -5.2
|
|
}
|
|
}
|
|
```
|
|
|
|
### 5. utxo_analysis.csv (Bitcoin)
|
|
|
|
```csv
|
|
timestamp,total_utxos,total_value_btc,avg_utxo_value_btc,age_0_7d,age_7_30d,age_30_90d,age_90_180d,age_180_1y,age_1_2y,age_2y_plus,hodl_score,behavior_classification
|
|
2025-10-11T10:30:00Z,75000000,19500000,0.26,5500000,8200000,12000000,9500000,11000000,8800000,20000000,0.75,HODLing Market
|
|
```
|
|
|
|
## Code Example
|
|
|
|
Here's a complete production-ready implementation:
|
|
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""
|
|
On-Chain Analytics System
|
|
Comprehensive blockchain metrics monitoring
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional
|
|
from dataclasses import dataclass
|
|
from decimal import Decimal
|
|
import json
|
|
import os
|
|
|
|
import aiohttp
|
|
from web3 import Web3
|
|
from influxdb_client import InfluxDBClient, Point
|
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
from tenacity import retry, stop_after_attempt, wait_exponential
|
|
from pydantic import BaseModel, Field
|
|
import yaml
|
|
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =============================================================================
|
|
# Configuration Models
|
|
# =============================================================================
|
|
|
|
class ChainConfig(BaseModel):
|
|
"""Configuration for a blockchain"""
|
|
chain_id: int
|
|
rpc_endpoint: str
|
|
api_endpoint: str
|
|
api_key: str
|
|
whale_threshold_usd: float = 1000000
|
|
whale_threshold_percentile: float = 0.1
|
|
block_time: int
|
|
|
|
|
|
class DatabaseConfig(BaseModel):
|
|
"""Database connection configuration"""
|
|
influx_url: str = "http://localhost:8086"
|
|
influx_token: str
|
|
influx_org: str = "crypto-analytics"
|
|
influx_bucket: str = "onchain-metrics"
|
|
|
|
postgres_host: str = "localhost"
|
|
postgres_port: int = 5432
|
|
postgres_db: str = "blockchain_metadata"
|
|
postgres_user: str
|
|
postgres_password: str
|
|
|
|
|
|
# =============================================================================
|
|
# Database Clients
|
|
# =============================================================================
|
|
|
|
class TimeSeriesDB:
|
|
"""InfluxDB client for metrics storage"""
|
|
|
|
def __init__(self, config: DatabaseConfig):
|
|
self.client = InfluxDBClient(
|
|
url=config.influx_url,
|
|
token=config.influx_token,
|
|
org=config.influx_org
|
|
)
|
|
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
|
|
self.query_api = self.client.query_api()
|
|
self.bucket = config.influx_bucket
|
|
self.org = config.influx_org
|
|
|
|
def write_network_metrics(self, chain: str, metrics: Dict):
|
|
"""Write network metrics to time-series database"""
|
|
point = Point("network_metrics") \
|
|
.tag("chain", chain) \
|
|
.field("latest_block", metrics['latest_block']) \
|
|
.field("avg_gas_price_gwei", metrics['avg_gas_price_gwei']) \
|
|
.field("avg_block_time", metrics['avg_block_time']) \
|
|
.field("active_addresses_24h", metrics['active_addresses_24h']) \
|
|
.time(datetime.utcnow())
|
|
|
|
self.write_api.write(bucket=self.bucket, record=point)
|
|
|
|
def write_holder_metrics(self, token: str, metrics: Dict):
|
|
"""Write holder distribution metrics"""
|
|
point = Point("holder_metrics") \
|
|
.tag("token", token) \
|
|
.field("total_holders", metrics['total_holders']) \
|
|
.field("gini_coefficient", metrics['concentration']['gini_coefficient']) \
|
|
.field("top_10_percent", metrics['concentration']['top_10_percent']) \
|
|
.field("whale_count", metrics['whales']['count']) \
|
|
.time(datetime.utcnow())
|
|
|
|
self.write_api.write(bucket=self.bucket, record=point)
|
|
|
|
def write_whale_movement(self, movement: Dict):
|
|
"""Write whale movement event"""
|
|
point = Point("whale_movements") \
|
|
.tag("movement_type", movement['movement_type']) \
|
|
.tag("from_label", movement['from_label']) \
|
|
.tag("to_label", movement['to_label']) \
|
|
.field("value_usd", movement['value_usd']) \
|
|
.field("value_eth", movement['value_eth']) \
|
|
.field("gas_price_gwei", movement['gas_price_gwei']) \
|
|
.time(datetime.fromisoformat(movement['timestamp']))
|
|
|
|
self.write_api.write(bucket=self.bucket, record=point)
|
|
|
|
def query_metrics(self, measurement: str, timerange: str = "-1h") -> List[Dict]:
|
|
"""Query metrics from database"""
|
|
query = f'''
|
|
from(bucket: "{self.bucket}")
|
|
|> range(start: {timerange})
|
|
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|
|
'''
|
|
|
|
result = self.query_api.query(query, org=self.org)
|
|
|
|
records = []
|
|
for table in result:
|
|
for record in table.records:
|
|
records.append({
|
|
'time': record.get_time(),
|
|
'field': record.get_field(),
|
|
'value': record.get_value(),
|
|
'tags': {k: v for k, v in record.values.items() if k not in ['_time', '_value', '_field', '_measurement']}
|
|
})
|
|
|
|
return records
|
|
|
|
|
|
class MetadataDB:
|
|
"""PostgreSQL client for metadata storage"""
|
|
|
|
def __init__(self, config: DatabaseConfig):
|
|
self.config = config
|
|
self.conn = None
|
|
self._connect()
|
|
self._init_schema()
|
|
|
|
def _connect(self):
|
|
"""Connect to PostgreSQL"""
|
|
self.conn = psycopg2.connect(
|
|
host=self.config.postgres_host,
|
|
port=self.config.postgres_port,
|
|
database=self.config.postgres_db,
|
|
user=self.config.postgres_user,
|
|
password=self.config.postgres_password
|
|
)
|
|
|
|
def _init_schema(self):
|
|
"""Initialize database schema"""
|
|
with self.conn.cursor() as cur:
|
|
cur.execute('''
|
|
CREATE TABLE IF NOT EXISTS wallet_labels (
|
|
address VARCHAR(42) PRIMARY KEY,
|
|
label VARCHAR(255) NOT NULL,
|
|
entity_type VARCHAR(50),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
''')
|
|
|
|
cur.execute('''
|
|
CREATE TABLE IF NOT EXISTS whale_alerts (
|
|
id SERIAL PRIMARY KEY,
|
|
tx_hash VARCHAR(66) UNIQUE NOT NULL,
|
|
from_address VARCHAR(42) NOT NULL,
|
|
to_address VARCHAR(42),
|
|
value_usd DECIMAL(20, 2),
|
|
movement_type VARCHAR(50),
|
|
alerted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
acknowledged BOOLEAN DEFAULT FALSE
|
|
)
|
|
''')
|
|
|
|
cur.execute('''
|
|
CREATE INDEX IF NOT EXISTS idx_wallet_labels_label
|
|
ON wallet_labels(label)
|
|
''')
|
|
|
|
cur.execute('''
|
|
CREATE INDEX IF NOT EXISTS idx_whale_alerts_timestamp
|
|
ON whale_alerts(alerted_at DESC)
|
|
''')
|
|
|
|
self.conn.commit()
|
|
|
|
def get_wallet_label(self, address: str) -> Optional[str]:
|
|
"""Get label for wallet address"""
|
|
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute(
|
|
'SELECT label, entity_type FROM wallet_labels WHERE address = %s',
|
|
(address.lower(),)
|
|
)
|
|
result = cur.fetchone()
|
|
return result['label'] if result else None
|
|
|
|
def store_whale_alert(self, movement: Dict):
|
|
"""Store whale movement alert"""
|
|
with self.conn.cursor() as cur:
|
|
cur.execute('''
|
|
INSERT INTO whale_alerts
|
|
(tx_hash, from_address, to_address, value_usd, movement_type)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
ON CONFLICT (tx_hash) DO NOTHING
|
|
''', (
|
|
movement['tx_hash'],
|
|
movement['from_address'],
|
|
movement['to_address'],
|
|
movement['value_usd'],
|
|
movement['movement_type']
|
|
))
|
|
self.conn.commit()
|
|
|
|
def get_unacknowledged_alerts(self, limit: int = 100) -> List[Dict]:
|
|
"""Get unacknowledged whale alerts"""
|
|
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute('''
|
|
SELECT * FROM whale_alerts
|
|
WHERE acknowledged = FALSE
|
|
ORDER BY alerted_at DESC
|
|
LIMIT %s
|
|
''', (limit,))
|
|
return cur.fetchall()
|
|
|
|
|
|
# =============================================================================
|
|
# Blockchain Clients
|
|
# =============================================================================
|
|
|
|
class EthereumClient:
|
|
"""Ethereum blockchain client"""
|
|
|
|
def __init__(self, config: ChainConfig):
|
|
self.config = config
|
|
self.w3 = Web3(Web3.HTTPProvider(config.rpc_endpoint))
|
|
self.session = None
|
|
self.logger = logging.getLogger('EthereumClient')
|
|
|
|
async def __aenter__(self):
|
|
self.session = aiohttp.ClientSession()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
if self.session:
|
|
await self.session.close()
|
|
|
|
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
|
|
async def get_network_metrics(self) -> Dict:
|
|
"""Fetch comprehensive network metrics"""
|
|
latest_block = self.w3.eth.block_number
|
|
|
|
# Analyze last 100 blocks
|
|
gas_prices = []
|
|
gas_used = []
|
|
block_times = []
|
|
|
|
for i in range(min(100, latest_block)):
|
|
try:
|
|
block = self.w3.eth.get_block(latest_block - i)
|
|
gas_prices.append(float(Web3.from_wei(block.baseFeePerGas, 'gwei')))
|
|
gas_used.append(block.gasUsed)
|
|
|
|
if i > 0:
|
|
prev_block = self.w3.eth.get_block(latest_block - i + 1)
|
|
block_times.append(block.timestamp - prev_block.timestamp)
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to fetch block {latest_block - i}: {e}")
|
|
continue
|
|
|
|
# Calculate statistics
|
|
avg_gas_price = sum(gas_prices) / len(gas_prices) if gas_prices else 0
|
|
median_gas_price = sorted(gas_prices)[len(gas_prices) // 2] if gas_prices else 0
|
|
avg_block_time = sum(block_times) / len(block_times) if block_times else self.config.block_time
|
|
avg_gas_used = sum(gas_used) / len(gas_used) if gas_used else 0
|
|
|
|
# Network utilization (30M gas limit standard)
|
|
network_utilization = (avg_gas_used / 30000000) * 100
|
|
|
|
# Get active addresses from API
|
|
active_addresses = await self._get_active_addresses_24h()
|
|
|
|
return {
|
|
'chain': 'ethereum',
|
|
'latest_block': latest_block,
|
|
'avg_gas_price_gwei': round(avg_gas_price, 2),
|
|
'median_gas_price_gwei': round(median_gas_price, 2),
|
|
'avg_block_time': round(avg_block_time, 2),
|
|
'network_utilization_percent': round(network_utilization, 2),
|
|
'active_addresses_24h': active_addresses,
|
|
'avg_gas_used': int(avg_gas_used)
|
|
}
|
|
|
|
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
|
|
async def _get_active_addresses_24h(self) -> int:
|
|
"""Get 24h active addresses from Etherscan"""
|
|
try:
|
|
url = f"{self.config.api_endpoint}"
|
|
params = {
|
|
'module': 'stats',
|
|
'action': 'dailytx',
|
|
'startdate': (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d'),
|
|
'enddate': datetime.utcnow().strftime('%Y-%m-%d'),
|
|
'apikey': self.config.api_key
|
|
}
|
|
|
|
async with self.session.get(url, params=params) as response:
|
|
data = await response.json()
|
|
if data.get('status') == '1' and data.get('result'):
|
|
return int(data['result'][0].get('uniqueAddresses', 0))
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to fetch active addresses: {e}")
|
|
|
|
return 0
|
|
|
|
async def get_token_holders(self, token_address: str, page_limit: int = 10) -> List[Dict]:
|
|
"""Fetch token holders (paginated)"""
|
|
holders = []
|
|
|
|
for page in range(1, page_limit + 1):
|
|
try:
|
|
url = f"{self.config.api_endpoint}"
|
|
params = {
|
|
'module': 'token',
|
|
'action': 'tokenholderlist',
|
|
'contractaddress': token_address,
|
|
'page': page,
|
|
'offset': 1000,
|
|
'apikey': self.config.api_key
|
|
}
|
|
|
|
async with self.session.get(url, params=params) as response:
|
|
data = await response.json()
|
|
|
|
if data.get('status') != '1' or not data.get('result'):
|
|
break
|
|
|
|
for holder in data['result']:
|
|
holders.append({
|
|
'address': holder['TokenHolderAddress'].lower(),
|
|
'balance': int(holder['TokenHolderQuantity'])
|
|
})
|
|
|
|
# Rate limiting: 5 requests/second
|
|
await asyncio.sleep(0.2)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to fetch holders page {page}: {e}")
|
|
break
|
|
|
|
return holders
|
|
|
|
|
|
# =============================================================================
|
|
# Analytics Engines
|
|
# =============================================================================
|
|
|
|
class HolderAnalytics:
|
|
"""Holder distribution and whale analytics"""
|
|
|
|
def __init__(self, client: EthereumClient):
|
|
self.client = client
|
|
self.logger = logging.getLogger('HolderAnalytics')
|
|
|
|
async def analyze_distribution(self, token_address: str) -> Dict:
|
|
"""Comprehensive holder distribution analysis"""
|
|
|
|
# Fetch holders
|
|
holders = await self.client.get_token_holders(token_address)
|
|
|
|
if not holders:
|
|
return {'error': 'No holder data available'}
|
|
|
|
# Calculate metrics
|
|
balances = [h['balance'] for h in holders]
|
|
total_supply = sum(balances)
|
|
holders.sort(key=lambda x: x['balance'], reverse=True)
|
|
|
|
# Concentration metrics
|
|
top_10_balance = sum(h['balance'] for h in holders[:10])
|
|
top_100_balance = sum(h['balance'] for h in holders[:100])
|
|
top_1000_balance = sum(h['balance'] for h in holders[:min(1000, len(holders))])
|
|
|
|
# Whale detection
|
|
whale_threshold = self._calculate_whale_threshold(holders, total_supply)
|
|
whales = [h for h in holders if h['balance'] >= whale_threshold]
|
|
|
|
# Gini coefficient
|
|
gini = self._calculate_gini(balances)
|
|
|
|
# Distribution buckets
|
|
buckets = self._create_distribution_buckets(holders, total_supply)
|
|
|
|
return {
|
|
'token_address': token_address,
|
|
'total_holders': len(holders),
|
|
'total_supply': total_supply,
|
|
'concentration': {
|
|
'top_10_percent': round((top_10_balance / total_supply) * 100, 2),
|
|
'top_100_percent': round((top_100_balance / total_supply) * 100, 2),
|
|
'top_1000_percent': round((top_1000_balance / total_supply) * 100, 2),
|
|
'gini_coefficient': round(gini, 4)
|
|
},
|
|
'whales': {
|
|
'count': len(whales),
|
|
'total_balance': sum(w['balance'] for w in whales),
|
|
'percent_of_supply': round((sum(w['balance'] for w in whales) / total_supply) * 100, 2),
|
|
'threshold_tokens': whale_threshold
|
|
},
|
|
'distribution_buckets': buckets
|
|
}
|
|
|
|
def _calculate_whale_threshold(self, holders: List[Dict], total_supply: float) -> float:
|
|
"""Calculate dynamic whale threshold"""
|
|
sorted_balances = sorted([h['balance'] for h in holders], reverse=True)
|
|
percentile_index = int(len(sorted_balances) * self.client.config.whale_threshold_percentile / 100)
|
|
return sorted_balances[min(percentile_index, len(sorted_balances) - 1)]
|
|
|
|
def _calculate_gini(self, balances: List[float]) -> float:
|
|
"""Calculate Gini coefficient"""
|
|
sorted_balances = sorted(balances)
|
|
n = len(sorted_balances)
|
|
cumsum = sum((2 * (i + 1) - n - 1) * balance for i, balance in enumerate(sorted_balances))
|
|
return cumsum / (n * sum(sorted_balances))
|
|
|
|
def _create_distribution_buckets(self, holders: List[Dict], total_supply: float) -> Dict:
|
|
"""Create balance distribution buckets"""
|
|
buckets = {
|
|
'0-0.01%': 0,
|
|
'0.01-0.1%': 0,
|
|
'0.1-1%': 0,
|
|
'1-10%': 0,
|
|
'10%+': 0
|
|
}
|
|
|
|
for holder in holders:
|
|
percent = (holder['balance'] / total_supply) * 100
|
|
|
|
if percent < 0.01:
|
|
buckets['0-0.01%'] += 1
|
|
elif percent < 0.1:
|
|
buckets['0.01-0.1%'] += 1
|
|
elif percent < 1:
|
|
buckets['0.1-1%'] += 1
|
|
elif percent < 10:
|
|
buckets['1-10%'] += 1
|
|
else:
|
|
buckets['10%+'] += 1
|
|
|
|
return buckets
|
|
|
|
|
|
# =============================================================================
|
|
# Main Analytics Orchestrator
|
|
# =============================================================================
|
|
|
|
class OnChainAnalytics:
|
|
"""Main analytics orchestrator"""
|
|
|
|
def __init__(self, chain_config: ChainConfig, db_config: DatabaseConfig):
|
|
self.chain_config = chain_config
|
|
self.db_config = db_config
|
|
self.tsdb = TimeSeriesDB(db_config)
|
|
self.metadata_db = MetadataDB(db_config)
|
|
self.logger = logging.getLogger('OnChainAnalytics')
|
|
|
|
async def analyze(self, token_address: Optional[str] = None) -> Dict:
|
|
"""Run comprehensive on-chain analysis"""
|
|
|
|
async with EthereumClient(self.chain_config) as client:
|
|
# Network metrics
|
|
self.logger.info("Fetching network metrics...")
|
|
network_metrics = await client.get_network_metrics()
|
|
self.tsdb.write_network_metrics('ethereum', network_metrics)
|
|
|
|
results = {
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'network_metrics': network_metrics
|
|
}
|
|
|
|
# Token-specific analysis
|
|
if token_address:
|
|
self.logger.info(f"Analyzing token {token_address}...")
|
|
|
|
holder_analytics = HolderAnalytics(client)
|
|
holder_metrics = await holder_analytics.analyze_distribution(token_address)
|
|
|
|
if 'error' not in holder_metrics:
|
|
self.tsdb.write_holder_metrics(token_address, holder_metrics)
|
|
results['holder_distribution'] = holder_metrics
|
|
|
|
return results
|
|
|
|
def save_results(self, results: Dict, output_dir: str = './output'):
|
|
"""Save analysis results to files"""
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
|
|
|
|
# Network metrics
|
|
if 'network_metrics' in results:
|
|
filename = f"{output_dir}/network_metrics_{timestamp}.json"
|
|
with open(filename, 'w') as f:
|
|
json.dump(results['network_metrics'], f, indent=2)
|
|
self.logger.info(f"Saved network metrics to {filename}")
|
|
|
|
# Holder distribution
|
|
if 'holder_distribution' in results:
|
|
filename = f"{output_dir}/holder_distribution_{timestamp}.json"
|
|
with open(filename, 'w') as f:
|
|
json.dump(results['holder_distribution'], f, indent=2)
|
|
self.logger.info(f"Saved holder distribution to {filename}")
|
|
|
|
|
|
# =============================================================================
|
|
# CLI Entry Point
|
|
# =============================================================================
|
|
|
|
async def main():
|
|
"""Main entry point"""
|
|
|
|
# Load configuration
|
|
chain_config = ChainConfig(
|
|
chain_id=1,
|
|
rpc_endpoint=os.getenv('INFURA_ENDPOINT'),
|
|
api_endpoint=os.getenv('ETHERSCAN_ENDPOINT'),
|
|
api_key=os.getenv('ETHERSCAN_API_KEY'),
|
|
whale_threshold_usd=1000000,
|
|
whale_threshold_percentile=0.1,
|
|
block_time=12
|
|
)
|
|
|
|
db_config = DatabaseConfig(
|
|
influx_url=os.getenv('INFLUX_URL', 'http://localhost:8086'),
|
|
influx_token=os.getenv('INFLUX_TOKEN'),
|
|
influx_org=os.getenv('INFLUX_ORG', 'crypto-analytics'),
|
|
influx_bucket=os.getenv('INFLUX_BUCKET', 'onchain-metrics'),
|
|
postgres_host=os.getenv('POSTGRES_HOST', 'localhost'),
|
|
postgres_port=int(os.getenv('POSTGRES_PORT', '5432')),
|
|
postgres_db=os.getenv('POSTGRES_DB', 'blockchain_metadata'),
|
|
postgres_user=os.getenv('POSTGRES_USER'),
|
|
postgres_password=os.getenv('POSTGRES_PASSWORD')
|
|
)
|
|
|
|
# Initialize analytics
|
|
analytics = OnChainAnalytics(chain_config, db_config)
|
|
|
|
# Run analysis (example: USDT token)
|
|
token_address = '0xdac17f958d2ee523a2206206994597c13d831ec7' # USDT
|
|
results = await analytics.analyze(token_address)
|
|
|
|
# Save results
|
|
analytics.save_results(results)
|
|
|
|
print(json.dumps(results, indent=2))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(main())
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
| Error | Cause | Solution |
|
|
|-------|-------|----------|
|
|
| `ConnectionError: Failed to connect to RPC` | RPC endpoint down or incorrect | Verify endpoint URL, check API key, try backup RPC |
|
|
| `RateLimitExceeded: API rate limit reached` | Too many requests | Implement exponential backoff, upgrade API plan |
|
|
| `InvalidTokenAddress: Token not found` | Wrong address or contract not token | Verify address on block explorer, check network |
|
|
| `InsufficientData: Less than 100 holders` | New or low-adoption token | Use alternative metrics, increase fetch limit |
|
|
| `InfluxDBError: Failed to write metrics` | Database connection issue | Check InfluxDB status, verify credentials |
|
|
| `PostgresError: Connection refused` | PostgreSQL not running | Start PostgreSQL service, check port 5432 |
|
|
| `TimeoutError: Request timed out` | Slow network or overloaded API | Increase timeout, try different time window |
|
|
| `ValidationError: Invalid configuration` | Missing or wrong config values | Check all required environment variables |
|
|
| `Web3Exception: Invalid block number` | Block not yet mined | Use confirmed blocks only (latest - 12) |
|
|
| `JSONDecodeError: Invalid API response` | API returned non-JSON | Check API status page, verify endpoint |
|
|
|
|
## Configuration Options
|
|
|
|
```yaml
|
|
# config/analytics.yaml
|
|
|
|
ethereum:
|
|
# Network settings
|
|
chain_id: 1
|
|
rpc_endpoint: "${INFURA_ENDPOINT}"
|
|
api_endpoint: "${ETHERSCAN_ENDPOINT}"
|
|
api_key: "${ETHERSCAN_API_KEY}"
|
|
|
|
# Analysis parameters
|
|
whale_threshold_usd: 1000000 # $1M minimum for whale classification
|
|
whale_threshold_percentile: 0.1 # Top 0.1% of holders
|
|
block_time: 12 # Average block time in seconds
|
|
confirmations_required: 12 # Blocks to wait for finality
|
|
|
|
# Data collection
|
|
fetch_holder_pages: 10 # Max pages of holders (1000 holders per page)
|
|
historical_blocks: 100 # Blocks to analyze for metrics
|
|
|
|
# Rate limiting
|
|
requests_per_second: 5 # API rate limit
|
|
batch_size: 100 # Transactions per batch
|
|
retry_attempts: 3 # Max retries on failure
|
|
retry_backoff: 2 # Exponential backoff multiplier
|
|
|
|
bitcoin:
|
|
api_endpoint: "${BLOCKCHAIR_ENDPOINT}"
|
|
api_key: "${BLOCKCHAIR_API_KEY}"
|
|
whale_threshold_usd: 1000000
|
|
whale_threshold_percentile: 0.1
|
|
block_time: 600
|
|
|
|
# UTXO-specific settings
|
|
utxo_age_buckets: [7, 30, 90, 180, 365, 730] # Days
|
|
utxo_value_buckets: [0.01, 0.1, 1, 10, 100] # BTC
|
|
|
|
database:
|
|
# InfluxDB time-series
|
|
influx_url: "http://localhost:8086"
|
|
influx_token: "${INFLUX_TOKEN}"
|
|
influx_org: "crypto-analytics"
|
|
influx_bucket: "onchain-metrics"
|
|
retention_days: 90 # Data retention period
|
|
|
|
# PostgreSQL metadata
|
|
postgres_host: "localhost"
|
|
postgres_port: 5432
|
|
postgres_db: "blockchain_metadata"
|
|
postgres_user: "${POSTGRES_USER}"
|
|
postgres_password: "${POSTGRES_PASSWORD}"
|
|
connection_pool_size: 10
|
|
|
|
alerts:
|
|
# Whale movement alerts
|
|
enabled: true
|
|
min_value_usd: 1000000 # Minimum transaction value for alert
|
|
channels: ["email", "slack"] # Alert channels
|
|
|
|
# Email settings
|
|
smtp_host: "smtp.gmail.com"
|
|
smtp_port: 587
|
|
smtp_user: "${SMTP_USER}"
|
|
smtp_password: "${SMTP_PASSWORD}"
|
|
alert_recipients: ["alerts@example.com"]
|
|
|
|
# Slack webhook
|
|
slack_webhook_url: "${SLACK_WEBHOOK_URL}"
|
|
|
|
output:
|
|
# File output settings
|
|
directory: "./output"
|
|
format: ["json", "csv"] # Output formats
|
|
save_raw_data: false # Save raw API responses
|
|
compress_old_files: true # Gzip files older than 7 days
|
|
|
|
# Visualization
|
|
generate_charts: true # Auto-generate charts
|
|
chart_format: "png" # png or svg
|
|
|
|
logging:
|
|
level: "INFO" # DEBUG, INFO, WARNING, ERROR
|
|
file: "./logs/analytics.log"
|
|
max_size_mb: 100
|
|
backup_count: 5
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### DO
|
|
|
|
- **Use Time-Series Database**: InfluxDB optimizes storage and queries for time-stamped metrics
|
|
- **Implement Rate Limiting**: Respect API limits with exponential backoff and request throttling
|
|
- **Cache Static Data**: Store exchange addresses, token metadata locally to reduce API calls
|
|
- **Validate All Addresses**: Use `Web3.is_address()` before making contract calls
|
|
- **Monitor API Costs**: Track API usage to avoid unexpected bills with premium providers
|
|
- **Store Historical Data**: Keep 90+ days of metrics for trend analysis and comparisons
|
|
- **Use Async Operations**: Async/await enables concurrent API calls without blocking
|
|
- **Label Wallet Addresses**: Maintain database of known exchanges, team wallets, contracts
|
|
- **Set Alert Thresholds**: Configure meaningful thresholds to avoid alert fatigue
|
|
- **Test with Testnets**: Validate logic on Goerli/Sepolia before mainnet deployment
|
|
|
|
### DON'T
|
|
|
|
- **Don't Query Full History**: APIs have limits; fetch incremental data and build locally
|
|
- **Don't Ignore Chain Reorgs**: Wait 12+ confirmations before marking data as final
|
|
- **Don't Hardcode API Keys**: Use environment variables and secret management
|
|
- **Don't Trust Single Data Source**: Cross-reference multiple APIs for critical decisions
|
|
- **Don't Skip Error Handling**: Network issues are common; implement robust retry logic
|
|
- **Don't Store Raw Blockchain Data**: Process and aggregate; full node data is terabytes
|
|
- **Don't Run Without Monitoring**: Set up alerting for analysis failures and anomalies
|
|
- **Don't Forget Decimal Precision**: Use `Decimal` type for financial calculations
|
|
- **Don't Neglect Compliance**: Some jurisdictions restrict blockchain data collection
|
|
- **Don't Over-Engineer**: Start simple; add complexity only when needed
|
|
|
|
## Performance Considerations
|
|
|
|
1. **API Rate Limits**
|
|
- Free Etherscan: 5 calls/second, 100k/day
|
|
- Infura Free: 100k requests/day
|
|
- Glassnode: Premium required for high-frequency data
|
|
|
|
2. **Database Performance**
|
|
- InfluxDB: 1M+ points/second write capacity
|
|
- PostgreSQL: Index `address`, `timestamp` columns
|
|
- Use connection pooling for concurrent queries
|
|
|
|
3. **Network Latency**
|
|
- Average API response: 100-500ms
|
|
- RPC node latency: 50-200ms
|
|
- Batch requests when possible to reduce round trips
|
|
|
|
4. **Memory Usage**
|
|
- 10,000 holders = ~2MB memory
|
|
- 100 blocks of transactions = ~50MB
|
|
- Use streaming for large datasets
|
|
|
|
5. **Optimization Strategies**
|
|
- Cache token metadata (supply, decimals) for 1 hour
|
|
- Aggregate metrics every 15 minutes, not per block
|
|
- Use database materialized views for complex queries
|
|
- Implement data downsampling for older metrics
|
|
|
|
## Security Considerations
|
|
|
|
1. **API Key Protection**
|
|
- Store in environment variables, never commit to git
|
|
- Use separate keys for development/production
|
|
- Rotate keys quarterly
|
|
|
|
2. **Data Privacy**
|
|
- Blockchain data is public, but aggregated analysis may reveal patterns
|
|
- Don't store personal information linked to addresses
|
|
- Comply with GDPR if analyzing EU users
|
|
|
|
3. **Input Validation**
|
|
- Validate all addresses before queries
|
|
- Sanitize user inputs to prevent injection
|
|
- Limit query ranges to prevent DoS
|
|
|
|
4. **Rate Limiting**
|
|
- Implement application-level rate limiting
|
|
- Monitor for unusual query patterns
|
|
- Set maximum concurrent requests
|
|
|
|
5. **Secure Communication**
|
|
- Use HTTPS for all API calls
|
|
- Verify SSL certificates
|
|
- Consider VPN for sensitive production deployments
|
|
|
|
## Related Commands
|
|
|
|
- `/track-price` - Real-time cryptocurrency price monitoring
|
|
- `/analyze-flow` - Options flow analysis for derivatives markets
|
|
- `/scan-movers` - Market movers scanner for large price movements
|
|
- `/optimize-gas` - Gas fee optimization for transaction timing
|
|
- `/analyze-nft` - NFT rarity and on-chain metadata analysis
|
|
- `/track-position` - Portfolio position tracking across wallets
|
|
|
|
## Version History
|
|
|
|
### v2.0.0 (2025-10-11)
|
|
- Complete rewrite with production-ready architecture
|
|
- Added multi-chain support (Ethereum, Bitcoin)
|
|
- Implemented InfluxDB time-series storage
|
|
- Added PostgreSQL for metadata and alerts
|
|
- Comprehensive holder distribution analysis with Gini coefficient
|
|
- Real-time whale movement tracking
|
|
- UTXO analysis for Bitcoin
|
|
- Ethereum gas analytics and network utilization
|
|
- Dynamic whale threshold calculation
|
|
- Async/await throughout for performance
|
|
- Robust error handling with tenacity retry logic
|
|
- Configuration via YAML with environment variable substitution
|
|
- 800+ lines of production Python code
|
|
- Complete documentation with examples
|
|
|
|
### v1.0.0 (2024-09-15)
|
|
- Initial release with basic on-chain metrics
|
|
- Simple JavaScript implementation
|
|
- Limited to single-chain Ethereum analysis
|