20 KiB
Asleep Webhook Reference
This reference provides comprehensive documentation for implementing Asleep webhooks in backend applications.
Overview
Asleep webhooks enable real-time notifications about sleep session events. The system sends HTTP POST requests to your configured callback URL when specific events occur.
Webhook Configuration
Webhooks are configured by providing a callback URL during session operations (via SDK) or through the Asleep Dashboard.
Callback URL Requirements:
- Must be publicly accessible HTTPS endpoint
- Should respond with 2xx status code
- Should handle requests within 30 seconds
Authentication
Webhook requests include authentication headers:
x-api-key: YOUR_API_KEY
x-user-id: USER_ID
Security Best Practices:
- Verify the
x-api-keymatches your expected API key - Validate the
x-user-idbelongs to your system - Use HTTPS for your webhook endpoint
- Implement request signing if needed
- Log all webhook attempts for audit
Supported Events
Asleep webhooks support two primary event types:
1. INFERENCE_COMPLETE
Triggered during sleep session analysis at regular intervals (every 5 or 40 minutes).
Use Cases:
- Real-time sleep stage monitoring
- Live dashboard updates
- Progressive data analysis
- User notifications during tracking
Timing:
- Fires every 5 minutes during active tracking
- May also fire at 40-minute intervals
- Multiple events per session
2. SESSION_COMPLETE
Triggered when complete sleep session analysis finishes.
Use Cases:
- Final report generation
- User notifications
- Data storage
- Statistics calculation
- Integration with other systems
Timing:
- Fires once per session
- Occurs after session end
- Contains complete analysis
Webhook Payload Schemas
INFERENCE_COMPLETE Payload
Provides incremental sleep analysis data.
Structure:
{
"event": "INFERENCE_COMPLETE",
"version": "V3",
"timestamp": "2024-01-21T06:15:00Z",
"user_id": "550e8400-e29b-41d4-a716-446655440000",
"session_id": "session123",
"seq_num": 60,
"inference_seq_num": 12,
"sleep_stages": [1, 1, 2, 2, 2],
"breath_stages": [0, 0, 0, 0, 0],
"snoring_stages": [0, 0, 1, 1, 0],
"time_window": {
"start": "2024-01-21T06:10:00Z",
"end": "2024-01-21T06:15:00Z"
}
}
Field Descriptions:
| Field | Type | Description |
|---|---|---|
| event | String | Always "INFERENCE_COMPLETE" |
| version | String | API version (V1, V2, V3) |
| timestamp | String (ISO 8601) | Event generation time |
| user_id | String | User identifier |
| session_id | String | Session identifier |
| seq_num | Integer | Audio data upload sequence number |
| inference_seq_num | Integer | Analysis sequence (5-minute increments) |
| sleep_stages | Array[Integer] | Sleep stage values for time window |
| breath_stages | Array[Integer] | Breathing stability indicators |
| snoring_stages | Array[Integer] | Snoring detection values |
| time_window | Object | Time range for this analysis chunk |
Sleep Stage Values:
-1: Unknown/No data0: Wake1: Light sleep2: Deep sleep3: REM sleep
Snoring Stage Values:
0: No snoring1: Snoring detected
Example Handler (Python):
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/webhook', methods=['POST'])
def handle_inference():
# Verify authentication
api_key = request.headers.get('x-api-key')
user_id = request.headers.get('x-user-id')
if api_key != EXPECTED_API_KEY:
return jsonify({"error": "Unauthorized"}), 401
# Parse payload
data = request.json
if data['event'] == 'INFERENCE_COMPLETE':
session_id = data['session_id']
sleep_stages = data['sleep_stages']
# Process incremental data
update_live_dashboard(session_id, sleep_stages)
# Store for real-time analysis
store_incremental_data(data)
return jsonify({"status": "received"}), 200
Example Handler (Node.js):
const express = require('express');
const app = express();
app.use(express.json());
app.post('/webhook', async (req, res) => {
// Verify authentication
const apiKey = req.headers['x-api-key'];
const userId = req.headers['x-user-id'];
if (apiKey !== process.env.ASLEEP_API_KEY) {
return res.status(401).json({ error: 'Unauthorized' });
}
const { event, session_id, sleep_stages } = req.body;
if (event === 'INFERENCE_COMPLETE') {
// Update real-time dashboard
await updateLiveDashboard(session_id, sleep_stages);
// Store incremental data
await storeIncrementalData(req.body);
}
res.status(200).json({ status: 'received' });
});
SESSION_COMPLETE Payload
Provides comprehensive final sleep analysis.
Structure:
{
"event": "SESSION_COMPLETE",
"version": "V3",
"timestamp": "2024-01-21T06:30:00Z",
"user_id": "550e8400-e29b-41d4-a716-446655440000",
"session_id": "session123",
"session": {
"id": "session123",
"state": "COMPLETE",
"start_time": "2024-01-20T22:00:00+00:00",
"end_time": "2024-01-21T06:30:00+00:00",
"timezone": "UTC",
"sleep_stages": [0, 0, 1, 1, 2, 3, 2, 1, 0],
"snoring_stages": [0, 0, 0, 1, 1, 0, 0, 0, 0]
},
"stat": {
"sleep_time": "06:30:00",
"sleep_index": 85.5,
"sleep_latency": 900,
"time_in_bed": 30600,
"time_in_sleep": 27000,
"time_in_light": 13500,
"time_in_deep": 6750,
"time_in_rem": 6750,
"sleep_efficiency": 88.24,
"waso_count": 2,
"longest_waso": 300,
"sleep_cycle": [
{
"order": 1,
"start_time": "2024-01-20T22:15:00+00:00",
"end_time": "2024-01-21T01:30:00+00:00"
}
]
},
"peculiarities": []
}
Field Descriptions:
| Field | Type | Description |
|---|---|---|
| event | String | Always "SESSION_COMPLETE" |
| version | String | API version (V1, V2, V3) |
| timestamp | String (ISO 8601) | Event generation time |
| user_id | String | User identifier |
| session_id | String | Session identifier |
| session | Object | Complete session data |
| stat | Object | Comprehensive sleep statistics |
| peculiarities | Array[String] | Special session conditions |
Session Object Fields:
id: Session identifierstate: Always "COMPLETE" for this eventstart_time,end_time: Session timestamps (ISO 8601)timezone: Timezone of the sessionsleep_stages: Complete sleep stage timelinesnoring_stages: Complete snoring timeline
Stat Object Fields:
sleep_time: Total sleep duration (HH:MM:SS)sleep_index: Overall sleep quality score (0-100)sleep_latency: Time to fall asleep (seconds)time_in_bed: Total time in bed (seconds)time_in_sleep: Total actual sleep time (seconds)time_in_light/deep/rem: Stage durations (seconds)sleep_efficiency: Percentage of time spent sleepingwaso_count: Wake after sleep onset episodeslongest_waso: Longest wake episode (seconds)sleep_cycle: Array of sleep cycle objects
Peculiarities:
IN_PROGRESS: Analysis still ongoing (shouldn't occur for COMPLETE)NEVER_SLEPT: No sleep detectedTOO_SHORT_FOR_ANALYSIS: Session < 5 minutesNO_BREATHING_STABILITY: Inconsistent breathing data
Example Handler (Python):
from flask import Flask, request, jsonify
import logging
app = Flask(__name__)
logger = logging.getLogger(__name__)
@app.route('/webhook', methods=['POST'])
def handle_session_complete():
# Verify authentication
api_key = request.headers.get('x-api-key')
user_id = request.headers.get('x-user-id')
if api_key != EXPECTED_API_KEY:
logger.warning(f"Unauthorized webhook attempt from {request.remote_addr}")
return jsonify({"error": "Unauthorized"}), 401
# Parse payload
data = request.json
if data['event'] == 'SESSION_COMPLETE':
session_id = data['session_id']
stat = data['stat']
# Store complete report
save_sleep_report(user_id, session_id, data)
# Send user notification
notify_user(user_id, {
'session_id': session_id,
'sleep_time': stat['sleep_time'],
'sleep_efficiency': stat['sleep_efficiency'],
'sleep_index': stat['sleep_index']
})
# Update user statistics
update_user_statistics(user_id)
# Trigger integrations
sync_to_health_platform(user_id, data)
logger.info(f"Processed SESSION_COMPLETE for {session_id}")
return jsonify({"status": "processed"}), 200
Example Handler (Node.js):
const express = require('express');
const app = express();
app.use(express.json());
app.post('/webhook', async (req, res) => {
// Verify authentication
const apiKey = req.headers['x-api-key'];
const userId = req.headers['x-user-id'];
if (apiKey !== process.env.ASLEEP_API_KEY) {
console.warn('Unauthorized webhook attempt');
return res.status(401).json({ error: 'Unauthorized' });
}
const { event, session_id, stat } = req.body;
if (event === 'SESSION_COMPLETE') {
try {
// Store complete report
await saveSleepReport(userId, session_id, req.body);
// Send user notification
await notifyUser(userId, {
sessionId: session_id,
sleepTime: stat.sleep_time,
sleepEfficiency: stat.sleep_efficiency,
sleepIndex: stat.sleep_index
});
// Update statistics
await updateUserStatistics(userId);
// Sync to integrations
await syncToHealthPlatform(userId, req.body);
console.log(`Processed SESSION_COMPLETE for ${session_id}`);
res.status(200).json({ status: 'processed' });
} catch (error) {
console.error('Webhook processing error:', error);
res.status(500).json({ error: 'Processing failed' });
}
} else {
res.status(200).json({ status: 'received' });
}
});
Webhook Versioning
Webhooks support three format versions for backward compatibility:
V1 (Legacy)
Original webhook format. Use V3 for new implementations.
V2 (Legacy)
Updated format with additional fields. Use V3 for new implementations.
V3 (Current)
Latest format with comprehensive data structures. Recommended for all new integrations.
Version Selection: Configure webhook version through SDK initialization or Dashboard settings.
Implementation Guide
1. Set Up Webhook Endpoint
Create a public HTTPS endpoint to receive webhook events:
Python (Flask):
from flask import Flask, request, jsonify
import hmac
import hashlib
app = Flask(__name__)
@app.route('/asleep-webhook', methods=['POST'])
def asleep_webhook():
# Verify authentication
if not verify_webhook(request):
return jsonify({"error": "Unauthorized"}), 401
# Parse event
event = request.json
event_type = event.get('event')
# Route to appropriate handler
if event_type == 'INFERENCE_COMPLETE':
handle_inference_complete(event)
elif event_type == 'SESSION_COMPLETE':
handle_session_complete(event)
return jsonify({"status": "success"}), 200
def verify_webhook(request):
api_key = request.headers.get('x-api-key')
return api_key == EXPECTED_API_KEY
if __name__ == '__main__':
app.run(host='0.0.0.0', port=443, ssl_context='adhoc')
Node.js (Express):
const express = require('express');
const https = require('https');
const fs = require('fs');
const app = express();
app.use(express.json());
app.post('/asleep-webhook', async (req, res) => {
// Verify authentication
if (!verifyWebhook(req)) {
return res.status(401).json({ error: 'Unauthorized' });
}
const { event } = req.body;
try {
switch (event) {
case 'INFERENCE_COMPLETE':
await handleInferenceComplete(req.body);
break;
case 'SESSION_COMPLETE':
await handleSessionComplete(req.body);
break;
default:
console.warn(`Unknown event type: ${event}`);
}
res.status(200).json({ status: 'success' });
} catch (error) {
console.error('Webhook error:', error);
res.status(500).json({ error: 'Processing failed' });
}
});
function verifyWebhook(req) {
const apiKey = req.headers['x-api-key'];
return apiKey === process.env.ASLEEP_API_KEY;
}
// HTTPS server
const options = {
key: fs.readFileSync('private-key.pem'),
cert: fs.readFileSync('certificate.pem')
};
https.createServer(options, app).listen(443);
2. Configure Webhook URL
Configure your webhook URL through:
- SDK initialization (for mobile apps)
- Asleep Dashboard (for backend integrations)
SDK Example (Android):
AsleepConfig.init(
apiKey = "YOUR_API_KEY",
userId = "user123",
callbackUrl = "https://your-domain.com/asleep-webhook"
)
3. Handle Webhook Events
Implement handlers for each event type:
Python Example:
def handle_inference_complete(event):
"""Process incremental sleep data"""
session_id = event['session_id']
sleep_stages = event['sleep_stages']
# Update real-time dashboard
redis_client.set(f"session:{session_id}:latest", json.dumps(sleep_stages))
# Notify connected clients via WebSocket
websocket_broadcast(session_id, sleep_stages)
# Store for analysis
db.incremental_data.insert_one(event)
def handle_session_complete(event):
"""Process complete sleep report"""
user_id = event['user_id']
session_id = event['session_id']
stat = event['stat']
# Store complete report
db.sleep_reports.insert_one({
'user_id': user_id,
'session_id': session_id,
'date': event['session']['start_time'],
'statistics': stat,
'created_at': datetime.now()
})
# Update user's latest statistics
update_user_stats(user_id)
# Send push notification
send_notification(user_id, {
'title': 'Sleep Report Ready',
'body': f"Sleep time: {stat['sleep_time']}, Efficiency: {stat['sleep_efficiency']:.1f}%"
})
# Trigger downstream processes
calculate_weekly_trends(user_id)
check_sleep_goals(user_id, stat)
4. Error Handling
Implement robust error handling:
Retry Logic:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def process_webhook(event):
"""Process webhook with automatic retry"""
# Your processing logic here
pass
@app.route('/webhook', methods=['POST'])
def webhook_endpoint():
try:
event = request.json
process_webhook(event)
return jsonify({"status": "success"}), 200
except Exception as e:
logger.error(f"Webhook processing failed: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
Idempotency:
def handle_session_complete(event):
session_id = event['session_id']
# Check if already processed
if db.processed_webhooks.find_one({'session_id': session_id}):
logger.info(f"Session {session_id} already processed")
return
# Process event
save_sleep_report(event)
# Mark as processed
db.processed_webhooks.insert_one({
'session_id': session_id,
'processed_at': datetime.now()
})
5. Testing
Test webhook handling locally:
ngrok for Local Testing:
# Start your local server
python app.py
# In another terminal, expose with ngrok
ngrok http 5000
# Use the ngrok URL as your webhook URL
# Example: https://abc123.ngrok.io/webhook
Mock Webhook Requests:
# Test INFERENCE_COMPLETE
curl -X POST http://localhost:5000/webhook \
-H "Content-Type: application/json" \
-H "x-api-key: YOUR_API_KEY" \
-H "x-user-id: test_user" \
-d '{
"event": "INFERENCE_COMPLETE",
"version": "V3",
"session_id": "test123",
"sleep_stages": [1, 1, 2]
}'
# Test SESSION_COMPLETE
curl -X POST http://localhost:5000/webhook \
-H "Content-Type: application/json" \
-H "x-api-key: YOUR_API_KEY" \
-H "x-user-id: test_user" \
-d '{
"event": "SESSION_COMPLETE",
"version": "V3",
"session_id": "test123",
"stat": {
"sleep_time": "07:30:00",
"sleep_efficiency": 88.5
}
}'
Best Practices
Security
- Always verify
x-api-keyheader - Use HTTPS for webhook endpoints
- Implement request signing if handling sensitive data
- Rate limit webhook endpoint
- Log all webhook attempts
Reliability
- Respond quickly (< 5 seconds ideal)
- Process asynchronously if needed
- Implement idempotency checks
- Handle duplicate events gracefully
- Return 2xx status even if processing fails (retry logic)
Performance
- Use message queues for heavy processing
- Implement caching where appropriate
- Batch database operations
- Monitor webhook response times
- Scale horizontally if needed
Monitoring
- Log all webhook events
- Track processing success/failure rates
- Monitor response times
- Set up alerts for failures
- Dashboard for webhook metrics
Error Handling
- Catch and log all exceptions
- Return appropriate HTTP status codes
- Implement exponential backoff
- Dead letter queue for failed events
- Manual review process for failures
Common Use Cases
Real-Time Dashboard Updates
@app.route('/webhook', methods=['POST'])
def webhook():
event = request.json
if event['event'] == 'INFERENCE_COMPLETE':
# Broadcast to connected WebSocket clients
socketio.emit('sleep_update', {
'session_id': event['session_id'],
'sleep_stages': event['sleep_stages'],
'timestamp': event['timestamp']
}, room=event['user_id'])
return jsonify({"status": "success"}), 200
User Notifications
def handle_session_complete(event):
user_id = event['user_id']
stat = event['stat']
# Generate insights
insights = generate_sleep_insights(stat)
# Send push notification
send_push_notification(user_id, {
'title': 'Your Sleep Report is Ready!',
'body': f"You slept for {stat['sleep_time']} with {stat['sleep_efficiency']:.0f}% efficiency",
'data': {
'session_id': event['session_id'],
'insights': insights
}
})
Data Analytics Pipeline
def handle_session_complete(event):
# Store in data warehouse
bigquery_client.insert_rows_json('sleep_data.sessions', [{
'user_id': event['user_id'],
'session_id': event['session_id'],
'date': event['session']['start_time'],
'statistics': json.dumps(event['stat']),
'ingested_at': datetime.now().isoformat()
}])
# Trigger analytics jobs
trigger_weekly_report_job(event['user_id'])
update_cohort_analysis()
Integration with Other Systems
def handle_session_complete(event):
user_id = event['user_id']
stat = event['stat']
# Sync to Apple Health
sync_to_apple_health(user_id, {
'sleep_analysis': stat,
'date': event['session']['start_time']
})
# Update CRM
update_crm_profile(user_id, {
'last_sleep_date': event['session']['start_time'],
'avg_sleep_efficiency': calculate_avg_efficiency(user_id)
})
Troubleshooting
Webhook Not Received
Check:
- Endpoint is publicly accessible
- HTTPS is properly configured
- Firewall allows incoming requests
- Webhook URL is correctly configured
- Server is running and healthy
Authentication Failures
Check:
x-api-keyvalidation logic- API key matches dashboard
- Headers are correctly parsed
- Case sensitivity of header names
Duplicate Events
Solution:
def handle_webhook(event):
event_id = f"{event['session_id']}:{event['event']}:{event['timestamp']}"
# Check if already processed
if redis_client.exists(f"processed:{event_id}"):
return
# Process event
process_event(event)
# Mark as processed (expire after 24 hours)
redis_client.setex(f"processed:{event_id}", 86400, "1")
Processing Delays
Solution:
from celery import Celery
celery = Celery('tasks', broker='redis://localhost:6379')
@app.route('/webhook', methods=['POST'])
def webhook():
event = request.json
# Queue for async processing
process_webhook_async.delay(event)
# Respond immediately
return jsonify({"status": "queued"}), 200
@celery.task
def process_webhook_async(event):
# Heavy processing here
pass
Resources
- Official Documentation: https://docs-en.asleep.ai/docs/webhook.md
- API Basics: https://docs-en.asleep.ai/docs/api-basics.md
- Dashboard: https://dashboard.asleep.ai