Files
gh-asleep-ai-sleeptrack-skills/skills/sleeptrack-be/references/webhook_reference.md
2025-11-29 17:58:20 +08:00

20 KiB

Asleep Webhook Reference

This reference provides comprehensive documentation for implementing Asleep webhooks in backend applications.

Overview

Asleep webhooks enable real-time notifications about sleep session events. The system sends HTTP POST requests to your configured callback URL when specific events occur.

Webhook Configuration

Webhooks are configured by providing a callback URL during session operations (via SDK) or through the Asleep Dashboard.

Callback URL Requirements:

  • Must be publicly accessible HTTPS endpoint
  • Should respond with 2xx status code
  • Should handle requests within 30 seconds

Authentication

Webhook requests include authentication headers:

x-api-key: YOUR_API_KEY
x-user-id: USER_ID

Security Best Practices:

  • Verify the x-api-key matches your expected API key
  • Validate the x-user-id belongs to your system
  • Use HTTPS for your webhook endpoint
  • Implement request signing if needed
  • Log all webhook attempts for audit

Supported Events

Asleep webhooks support two primary event types:

1. INFERENCE_COMPLETE

Triggered during sleep session analysis at regular intervals (every 5 or 40 minutes).

Use Cases:

  • Real-time sleep stage monitoring
  • Live dashboard updates
  • Progressive data analysis
  • User notifications during tracking

Timing:

  • Fires every 5 minutes during active tracking
  • May also fire at 40-minute intervals
  • Multiple events per session

2. SESSION_COMPLETE

Triggered when complete sleep session analysis finishes.

Use Cases:

  • Final report generation
  • User notifications
  • Data storage
  • Statistics calculation
  • Integration with other systems

Timing:

  • Fires once per session
  • Occurs after session end
  • Contains complete analysis

Webhook Payload Schemas

INFERENCE_COMPLETE Payload

Provides incremental sleep analysis data.

Structure:

{
  "event": "INFERENCE_COMPLETE",
  "version": "V3",
  "timestamp": "2024-01-21T06:15:00Z",
  "user_id": "550e8400-e29b-41d4-a716-446655440000",
  "session_id": "session123",
  "seq_num": 60,
  "inference_seq_num": 12,
  "sleep_stages": [1, 1, 2, 2, 2],
  "breath_stages": [0, 0, 0, 0, 0],
  "snoring_stages": [0, 0, 1, 1, 0],
  "time_window": {
    "start": "2024-01-21T06:10:00Z",
    "end": "2024-01-21T06:15:00Z"
  }
}

Field Descriptions:

Field Type Description
event String Always "INFERENCE_COMPLETE"
version String API version (V1, V2, V3)
timestamp String (ISO 8601) Event generation time
user_id String User identifier
session_id String Session identifier
seq_num Integer Audio data upload sequence number
inference_seq_num Integer Analysis sequence (5-minute increments)
sleep_stages Array[Integer] Sleep stage values for time window
breath_stages Array[Integer] Breathing stability indicators
snoring_stages Array[Integer] Snoring detection values
time_window Object Time range for this analysis chunk

Sleep Stage Values:

  • -1: Unknown/No data
  • 0: Wake
  • 1: Light sleep
  • 2: Deep sleep
  • 3: REM sleep

Snoring Stage Values:

  • 0: No snoring
  • 1: Snoring detected

Example Handler (Python):

from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/webhook', methods=['POST'])
def handle_inference():
    # Verify authentication
    api_key = request.headers.get('x-api-key')
    user_id = request.headers.get('x-user-id')

    if api_key != EXPECTED_API_KEY:
        return jsonify({"error": "Unauthorized"}), 401

    # Parse payload
    data = request.json

    if data['event'] == 'INFERENCE_COMPLETE':
        session_id = data['session_id']
        sleep_stages = data['sleep_stages']

        # Process incremental data
        update_live_dashboard(session_id, sleep_stages)

        # Store for real-time analysis
        store_incremental_data(data)

    return jsonify({"status": "received"}), 200

Example Handler (Node.js):

const express = require('express');
const app = express();

app.use(express.json());

app.post('/webhook', async (req, res) => {
  // Verify authentication
  const apiKey = req.headers['x-api-key'];
  const userId = req.headers['x-user-id'];

  if (apiKey !== process.env.ASLEEP_API_KEY) {
    return res.status(401).json({ error: 'Unauthorized' });
  }

  const { event, session_id, sleep_stages } = req.body;

  if (event === 'INFERENCE_COMPLETE') {
    // Update real-time dashboard
    await updateLiveDashboard(session_id, sleep_stages);

    // Store incremental data
    await storeIncrementalData(req.body);
  }

  res.status(200).json({ status: 'received' });
});

SESSION_COMPLETE Payload

Provides comprehensive final sleep analysis.

Structure:

{
  "event": "SESSION_COMPLETE",
  "version": "V3",
  "timestamp": "2024-01-21T06:30:00Z",
  "user_id": "550e8400-e29b-41d4-a716-446655440000",
  "session_id": "session123",
  "session": {
    "id": "session123",
    "state": "COMPLETE",
    "start_time": "2024-01-20T22:00:00+00:00",
    "end_time": "2024-01-21T06:30:00+00:00",
    "timezone": "UTC",
    "sleep_stages": [0, 0, 1, 1, 2, 3, 2, 1, 0],
    "snoring_stages": [0, 0, 0, 1, 1, 0, 0, 0, 0]
  },
  "stat": {
    "sleep_time": "06:30:00",
    "sleep_index": 85.5,
    "sleep_latency": 900,
    "time_in_bed": 30600,
    "time_in_sleep": 27000,
    "time_in_light": 13500,
    "time_in_deep": 6750,
    "time_in_rem": 6750,
    "sleep_efficiency": 88.24,
    "waso_count": 2,
    "longest_waso": 300,
    "sleep_cycle": [
      {
        "order": 1,
        "start_time": "2024-01-20T22:15:00+00:00",
        "end_time": "2024-01-21T01:30:00+00:00"
      }
    ]
  },
  "peculiarities": []
}

Field Descriptions:

Field Type Description
event String Always "SESSION_COMPLETE"
version String API version (V1, V2, V3)
timestamp String (ISO 8601) Event generation time
user_id String User identifier
session_id String Session identifier
session Object Complete session data
stat Object Comprehensive sleep statistics
peculiarities Array[String] Special session conditions

Session Object Fields:

  • id: Session identifier
  • state: Always "COMPLETE" for this event
  • start_time, end_time: Session timestamps (ISO 8601)
  • timezone: Timezone of the session
  • sleep_stages: Complete sleep stage timeline
  • snoring_stages: Complete snoring timeline

Stat Object Fields:

  • sleep_time: Total sleep duration (HH:MM:SS)
  • sleep_index: Overall sleep quality score (0-100)
  • sleep_latency: Time to fall asleep (seconds)
  • time_in_bed: Total time in bed (seconds)
  • time_in_sleep: Total actual sleep time (seconds)
  • time_in_light/deep/rem: Stage durations (seconds)
  • sleep_efficiency: Percentage of time spent sleeping
  • waso_count: Wake after sleep onset episodes
  • longest_waso: Longest wake episode (seconds)
  • sleep_cycle: Array of sleep cycle objects

Peculiarities:

  • IN_PROGRESS: Analysis still ongoing (shouldn't occur for COMPLETE)
  • NEVER_SLEPT: No sleep detected
  • TOO_SHORT_FOR_ANALYSIS: Session < 5 minutes
  • NO_BREATHING_STABILITY: Inconsistent breathing data

Example Handler (Python):

from flask import Flask, request, jsonify
import logging

app = Flask(__name__)
logger = logging.getLogger(__name__)

@app.route('/webhook', methods=['POST'])
def handle_session_complete():
    # Verify authentication
    api_key = request.headers.get('x-api-key')
    user_id = request.headers.get('x-user-id')

    if api_key != EXPECTED_API_KEY:
        logger.warning(f"Unauthorized webhook attempt from {request.remote_addr}")
        return jsonify({"error": "Unauthorized"}), 401

    # Parse payload
    data = request.json

    if data['event'] == 'SESSION_COMPLETE':
        session_id = data['session_id']
        stat = data['stat']

        # Store complete report
        save_sleep_report(user_id, session_id, data)

        # Send user notification
        notify_user(user_id, {
            'session_id': session_id,
            'sleep_time': stat['sleep_time'],
            'sleep_efficiency': stat['sleep_efficiency'],
            'sleep_index': stat['sleep_index']
        })

        # Update user statistics
        update_user_statistics(user_id)

        # Trigger integrations
        sync_to_health_platform(user_id, data)

        logger.info(f"Processed SESSION_COMPLETE for {session_id}")

    return jsonify({"status": "processed"}), 200

Example Handler (Node.js):

const express = require('express');
const app = express();

app.use(express.json());

app.post('/webhook', async (req, res) => {
  // Verify authentication
  const apiKey = req.headers['x-api-key'];
  const userId = req.headers['x-user-id'];

  if (apiKey !== process.env.ASLEEP_API_KEY) {
    console.warn('Unauthorized webhook attempt');
    return res.status(401).json({ error: 'Unauthorized' });
  }

  const { event, session_id, stat } = req.body;

  if (event === 'SESSION_COMPLETE') {
    try {
      // Store complete report
      await saveSleepReport(userId, session_id, req.body);

      // Send user notification
      await notifyUser(userId, {
        sessionId: session_id,
        sleepTime: stat.sleep_time,
        sleepEfficiency: stat.sleep_efficiency,
        sleepIndex: stat.sleep_index
      });

      // Update statistics
      await updateUserStatistics(userId);

      // Sync to integrations
      await syncToHealthPlatform(userId, req.body);

      console.log(`Processed SESSION_COMPLETE for ${session_id}`);

      res.status(200).json({ status: 'processed' });
    } catch (error) {
      console.error('Webhook processing error:', error);
      res.status(500).json({ error: 'Processing failed' });
    }
  } else {
    res.status(200).json({ status: 'received' });
  }
});

Webhook Versioning

Webhooks support three format versions for backward compatibility:

V1 (Legacy)

Original webhook format. Use V3 for new implementations.

V2 (Legacy)

Updated format with additional fields. Use V3 for new implementations.

V3 (Current)

Latest format with comprehensive data structures. Recommended for all new integrations.

Version Selection: Configure webhook version through SDK initialization or Dashboard settings.


Implementation Guide

1. Set Up Webhook Endpoint

Create a public HTTPS endpoint to receive webhook events:

Python (Flask):

from flask import Flask, request, jsonify
import hmac
import hashlib

app = Flask(__name__)

@app.route('/asleep-webhook', methods=['POST'])
def asleep_webhook():
    # Verify authentication
    if not verify_webhook(request):
        return jsonify({"error": "Unauthorized"}), 401

    # Parse event
    event = request.json
    event_type = event.get('event')

    # Route to appropriate handler
    if event_type == 'INFERENCE_COMPLETE':
        handle_inference_complete(event)
    elif event_type == 'SESSION_COMPLETE':
        handle_session_complete(event)

    return jsonify({"status": "success"}), 200

def verify_webhook(request):
    api_key = request.headers.get('x-api-key')
    return api_key == EXPECTED_API_KEY

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=443, ssl_context='adhoc')

Node.js (Express):

const express = require('express');
const https = require('https');
const fs = require('fs');

const app = express();
app.use(express.json());

app.post('/asleep-webhook', async (req, res) => {
  // Verify authentication
  if (!verifyWebhook(req)) {
    return res.status(401).json({ error: 'Unauthorized' });
  }

  const { event } = req.body;

  try {
    switch (event) {
      case 'INFERENCE_COMPLETE':
        await handleInferenceComplete(req.body);
        break;
      case 'SESSION_COMPLETE':
        await handleSessionComplete(req.body);
        break;
      default:
        console.warn(`Unknown event type: ${event}`);
    }

    res.status(200).json({ status: 'success' });
  } catch (error) {
    console.error('Webhook error:', error);
    res.status(500).json({ error: 'Processing failed' });
  }
});

function verifyWebhook(req) {
  const apiKey = req.headers['x-api-key'];
  return apiKey === process.env.ASLEEP_API_KEY;
}

// HTTPS server
const options = {
  key: fs.readFileSync('private-key.pem'),
  cert: fs.readFileSync('certificate.pem')
};

https.createServer(options, app).listen(443);

2. Configure Webhook URL

Configure your webhook URL through:

  • SDK initialization (for mobile apps)
  • Asleep Dashboard (for backend integrations)

SDK Example (Android):

AsleepConfig.init(
    apiKey = "YOUR_API_KEY",
    userId = "user123",
    callbackUrl = "https://your-domain.com/asleep-webhook"
)

3. Handle Webhook Events

Implement handlers for each event type:

Python Example:

def handle_inference_complete(event):
    """Process incremental sleep data"""
    session_id = event['session_id']
    sleep_stages = event['sleep_stages']

    # Update real-time dashboard
    redis_client.set(f"session:{session_id}:latest", json.dumps(sleep_stages))

    # Notify connected clients via WebSocket
    websocket_broadcast(session_id, sleep_stages)

    # Store for analysis
    db.incremental_data.insert_one(event)

def handle_session_complete(event):
    """Process complete sleep report"""
    user_id = event['user_id']
    session_id = event['session_id']
    stat = event['stat']

    # Store complete report
    db.sleep_reports.insert_one({
        'user_id': user_id,
        'session_id': session_id,
        'date': event['session']['start_time'],
        'statistics': stat,
        'created_at': datetime.now()
    })

    # Update user's latest statistics
    update_user_stats(user_id)

    # Send push notification
    send_notification(user_id, {
        'title': 'Sleep Report Ready',
        'body': f"Sleep time: {stat['sleep_time']}, Efficiency: {stat['sleep_efficiency']:.1f}%"
    })

    # Trigger downstream processes
    calculate_weekly_trends(user_id)
    check_sleep_goals(user_id, stat)

4. Error Handling

Implement robust error handling:

Retry Logic:

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def process_webhook(event):
    """Process webhook with automatic retry"""
    # Your processing logic here
    pass

@app.route('/webhook', methods=['POST'])
def webhook_endpoint():
    try:
        event = request.json
        process_webhook(event)
        return jsonify({"status": "success"}), 200
    except Exception as e:
        logger.error(f"Webhook processing failed: {e}")
        return jsonify({"status": "error", "message": str(e)}), 500

Idempotency:

def handle_session_complete(event):
    session_id = event['session_id']

    # Check if already processed
    if db.processed_webhooks.find_one({'session_id': session_id}):
        logger.info(f"Session {session_id} already processed")
        return

    # Process event
    save_sleep_report(event)

    # Mark as processed
    db.processed_webhooks.insert_one({
        'session_id': session_id,
        'processed_at': datetime.now()
    })

5. Testing

Test webhook handling locally:

ngrok for Local Testing:

# Start your local server
python app.py

# In another terminal, expose with ngrok
ngrok http 5000

# Use the ngrok URL as your webhook URL
# Example: https://abc123.ngrok.io/webhook

Mock Webhook Requests:

# Test INFERENCE_COMPLETE
curl -X POST http://localhost:5000/webhook \
  -H "Content-Type: application/json" \
  -H "x-api-key: YOUR_API_KEY" \
  -H "x-user-id: test_user" \
  -d '{
    "event": "INFERENCE_COMPLETE",
    "version": "V3",
    "session_id": "test123",
    "sleep_stages": [1, 1, 2]
  }'

# Test SESSION_COMPLETE
curl -X POST http://localhost:5000/webhook \
  -H "Content-Type: application/json" \
  -H "x-api-key: YOUR_API_KEY" \
  -H "x-user-id: test_user" \
  -d '{
    "event": "SESSION_COMPLETE",
    "version": "V3",
    "session_id": "test123",
    "stat": {
      "sleep_time": "07:30:00",
      "sleep_efficiency": 88.5
    }
  }'

Best Practices

Security

  • Always verify x-api-key header
  • Use HTTPS for webhook endpoints
  • Implement request signing if handling sensitive data
  • Rate limit webhook endpoint
  • Log all webhook attempts

Reliability

  • Respond quickly (< 5 seconds ideal)
  • Process asynchronously if needed
  • Implement idempotency checks
  • Handle duplicate events gracefully
  • Return 2xx status even if processing fails (retry logic)

Performance

  • Use message queues for heavy processing
  • Implement caching where appropriate
  • Batch database operations
  • Monitor webhook response times
  • Scale horizontally if needed

Monitoring

  • Log all webhook events
  • Track processing success/failure rates
  • Monitor response times
  • Set up alerts for failures
  • Dashboard for webhook metrics

Error Handling

  • Catch and log all exceptions
  • Return appropriate HTTP status codes
  • Implement exponential backoff
  • Dead letter queue for failed events
  • Manual review process for failures

Common Use Cases

Real-Time Dashboard Updates

@app.route('/webhook', methods=['POST'])
def webhook():
    event = request.json

    if event['event'] == 'INFERENCE_COMPLETE':
        # Broadcast to connected WebSocket clients
        socketio.emit('sleep_update', {
            'session_id': event['session_id'],
            'sleep_stages': event['sleep_stages'],
            'timestamp': event['timestamp']
        }, room=event['user_id'])

    return jsonify({"status": "success"}), 200

User Notifications

def handle_session_complete(event):
    user_id = event['user_id']
    stat = event['stat']

    # Generate insights
    insights = generate_sleep_insights(stat)

    # Send push notification
    send_push_notification(user_id, {
        'title': 'Your Sleep Report is Ready!',
        'body': f"You slept for {stat['sleep_time']} with {stat['sleep_efficiency']:.0f}% efficiency",
        'data': {
            'session_id': event['session_id'],
            'insights': insights
        }
    })

Data Analytics Pipeline

def handle_session_complete(event):
    # Store in data warehouse
    bigquery_client.insert_rows_json('sleep_data.sessions', [{
        'user_id': event['user_id'],
        'session_id': event['session_id'],
        'date': event['session']['start_time'],
        'statistics': json.dumps(event['stat']),
        'ingested_at': datetime.now().isoformat()
    }])

    # Trigger analytics jobs
    trigger_weekly_report_job(event['user_id'])
    update_cohort_analysis()

Integration with Other Systems

def handle_session_complete(event):
    user_id = event['user_id']
    stat = event['stat']

    # Sync to Apple Health
    sync_to_apple_health(user_id, {
        'sleep_analysis': stat,
        'date': event['session']['start_time']
    })

    # Update CRM
    update_crm_profile(user_id, {
        'last_sleep_date': event['session']['start_time'],
        'avg_sleep_efficiency': calculate_avg_efficiency(user_id)
    })

Troubleshooting

Webhook Not Received

Check:

  • Endpoint is publicly accessible
  • HTTPS is properly configured
  • Firewall allows incoming requests
  • Webhook URL is correctly configured
  • Server is running and healthy

Authentication Failures

Check:

  • x-api-key validation logic
  • API key matches dashboard
  • Headers are correctly parsed
  • Case sensitivity of header names

Duplicate Events

Solution:

def handle_webhook(event):
    event_id = f"{event['session_id']}:{event['event']}:{event['timestamp']}"

    # Check if already processed
    if redis_client.exists(f"processed:{event_id}"):
        return

    # Process event
    process_event(event)

    # Mark as processed (expire after 24 hours)
    redis_client.setex(f"processed:{event_id}", 86400, "1")

Processing Delays

Solution:

from celery import Celery

celery = Celery('tasks', broker='redis://localhost:6379')

@app.route('/webhook', methods=['POST'])
def webhook():
    event = request.json

    # Queue for async processing
    process_webhook_async.delay(event)

    # Respond immediately
    return jsonify({"status": "queued"}), 200

@celery.task
def process_webhook_async(event):
    # Heavy processing here
    pass

Resources