Files
2025-11-29 17:51:22 +08:00

14 KiB

Logging Guide

Structured Logging

Why Structured Logs?

Unstructured (text):

2024-10-28 14:32:15 User john@example.com logged in from 192.168.1.1

Structured (JSON):

{
  "timestamp": "2024-10-28T14:32:15Z",
  "level": "info",
  "message": "User logged in",
  "user": "john@example.com",
  "ip": "192.168.1.1",
  "event_type": "user_login"
}

Benefits:

  • Easy to parse and query
  • Consistent format
  • Machine-readable
  • Efficient storage and indexing

Log Levels

Use appropriate log levels for better filtering and alerting.

DEBUG

When: Development, troubleshooting Examples:

  • Function entry/exit
  • Variable values
  • Internal state changes
logger.debug("Processing request", extra={
    "request_id": req_id,
    "params": params
})

INFO

When: Important business events Examples:

  • User actions (login, purchase)
  • System state changes (started, stopped)
  • Significant milestones
logger.info("Order placed", extra={
    "order_id": "12345",
    "user_id": "user123",
    "amount": 99.99
})

WARN

When: Potentially problematic situations Examples:

  • Deprecated API usage
  • Slow operations (but not failing)
  • Retry attempts
  • Resource usage approaching limits
logger.warning("API response slow", extra={
    "endpoint": "/api/users",
    "duration_ms": 2500,
    "threshold_ms": 1000
})

ERROR

When: Error conditions that need attention Examples:

  • Failed requests
  • Exceptions caught and handled
  • Integration failures
  • Data validation errors
logger.error("Payment processing failed", extra={
    "order_id": "12345",
    "error": str(e),
    "payment_gateway": "stripe"
}, exc_info=True)

FATAL/CRITICAL

When: Severe errors causing shutdown Examples:

  • Database connection lost
  • Out of memory
  • Configuration errors preventing startup
logger.critical("Database connection lost", extra={
    "database": "postgres",
    "host": "db.example.com",
    "attempt": 3
})

Required Fields

Every log entry should include:

1. Timestamp

ISO 8601 format with timezone:

{
  "timestamp": "2024-10-28T14:32:15.123Z"
}

2. Level

Standard levels: debug, info, warn, error, critical

{
  "level": "error"
}

3. Message

Human-readable description:

{
  "message": "User authentication failed"
}

4. Service/Application

What component logged this:

{
  "service": "api-gateway",
  "version": "1.2.3"
}

5. Environment

{
  "environment": "production"
}

Request Context

{
  "request_id": "550e8400-e29b-41d4-a716-446655440000",
  "user_id": "user123",
  "session_id": "sess_abc",
  "ip_address": "192.168.1.1",
  "user_agent": "Mozilla/5.0..."
}

Performance Metrics

{
  "duration_ms": 245,
  "response_size_bytes": 1024
}

Error Details

{
  "error_type": "ValidationError",
  "error_message": "Invalid email format",
  "stack_trace": "...",
  "error_code": "VAL_001"
}

Business Context

{
  "order_id": "ORD-12345",
  "customer_id": "CUST-789",
  "transaction_amount": 99.99,
  "payment_method": "credit_card"
}

Implementation Examples

Python (using structlog)

import structlog

logger = structlog.get_logger()

# Configure structured logging
structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.add_log_level,
        structlog.processors.JSONRenderer()
    ]
)

# Usage
logger.info(
    "user_logged_in",
    user_id="user123",
    ip_address="192.168.1.1",
    login_method="oauth"
)

Node.js (using Winston)

const winston = require('winston');

const logger = winston.createLogger({
  format: winston.format.json(),
  defaultMeta: { service: 'api-gateway' },
  transports: [
    new winston.transports.Console()
  ]
});

logger.info('User logged in', {
  userId: 'user123',
  ipAddress: '192.168.1.1',
  loginMethod: 'oauth'
});

Go (using zap)

import "go.uber.org/zap"

logger, _ := zap.NewProduction()
defer logger.Sync()

logger.Info("User logged in",
    zap.String("userId", "user123"),
    zap.String("ipAddress", "192.168.1.1"),
    zap.String("loginMethod", "oauth"),
)

Java (using Logback with JSON)

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import net.logstash.logback.argument.StructuredArguments;

Logger logger = LoggerFactory.getLogger(MyClass.class);

logger.info("User logged in",
    StructuredArguments.kv("userId", "user123"),
    StructuredArguments.kv("ipAddress", "192.168.1.1"),
    StructuredArguments.kv("loginMethod", "oauth")
);

Log Aggregation Patterns

Pattern 1: ELK Stack (Elasticsearch, Logstash, Kibana)

Architecture:

Application → Filebeat → Logstash → Elasticsearch → Kibana

filebeat.yml:

filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/app/*.log
    json.keys_under_root: true
    json.add_error_key: true

output.logstash:
  hosts: ["logstash:5044"]

logstash.conf:

input {
  beats {
    port => 5044
  }
}

filter {
  json {
    source => "message"
  }

  date {
    match => ["timestamp", "ISO8601"]
  }

  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "app-logs-%{+YYYY.MM.dd}"
  }
}

Pattern 2: Loki (Grafana Loki)

Architecture:

Application → Promtail → Loki → Grafana

promtail-config.yml:

server:
  http_listen_port: 9080

positions:
  filename: /tmp/positions.yaml

clients:
  - url: http://loki:3100/loki/api/v1/push

scrape_configs:
  - job_name: app
    static_configs:
      - targets:
          - localhost
        labels:
          job: app
          __path__: /var/log/app/*.log
    pipeline_stages:
      - json:
          expressions:
            level: level
            timestamp: timestamp
      - labels:
          level:
          service:
      - timestamp:
          source: timestamp
          format: RFC3339

Query in Grafana:

{job="app"} |= "error" | json | level="error"

Pattern 3: CloudWatch Logs

Install CloudWatch agent:

{
  "logs": {
    "logs_collected": {
      "files": {
        "collect_list": [
          {
            "file_path": "/var/log/app/*.log",
            "log_group_name": "/aws/app/production",
            "log_stream_name": "{instance_id}",
            "timezone": "UTC"
          }
        ]
      }
    }
  }
}

Query with CloudWatch Insights:

fields @timestamp, level, message, user_id
| filter level = "error"
| sort @timestamp desc
| limit 100

Pattern 4: Fluentd/Fluent Bit

fluent-bit.conf:

[INPUT]
    Name              tail
    Path              /var/log/app/*.log
    Parser            json
    Tag               app.*

[FILTER]
    Name              record_modifier
    Match             *
    Record            hostname ${HOSTNAME}
    Record            cluster production

[OUTPUT]
    Name              es
    Match             *
    Host              elasticsearch
    Port              9200
    Index             app-logs
    Type              _doc

Query Patterns

Find Errors in Time Range

Elasticsearch:

GET /app-logs-*/_search
{
  "query": {
    "bool": {
      "must": [
        { "match": { "level": "error" } },
        { "range": { "@timestamp": {
            "gte": "now-1h",
            "lte": "now"
        }}}
      ]
    }
  }
}

Loki (LogQL):

{job="app", level="error"} |= "error"

CloudWatch Insights:

fields @timestamp, @message
| filter level = "error"
| filter @timestamp > ago(1h)

Count Errors by Type

Elasticsearch:

GET /app-logs-*/_search
{
  "size": 0,
  "query": { "match": { "level": "error" } },
  "aggs": {
    "error_types": {
      "terms": { "field": "error_type.keyword" }
    }
  }
}

Loki:

sum by (error_type) (count_over_time({job="app", level="error"}[1h]))

Find Slow Requests

Elasticsearch:

GET /app-logs-*/_search
{
  "query": {
    "range": { "duration_ms": { "gte": 1000 } }
  },
  "sort": [ { "duration_ms": "desc" } ]
}

Trace Request Through Services

Elasticsearch (using request_id):

GET /_search
{
  "query": {
    "match": { "request_id": "550e8400-e29b-41d4-a716-446655440000" }
  },
  "sort": [ { "@timestamp": "asc" } ]
}

Sampling and Rate Limiting

When to Sample

  • High volume services: > 10,000 logs/second
  • Debug logs in production: Sample 1-10%
  • Cost optimization: Reduce storage costs

Sampling Strategies

1. Random Sampling:

import random

if random.random() < 0.1:  # Sample 10%
    logger.debug("Debug message", ...)

2. Rate Limiting:

from rate_limiter import RateLimiter

limiter = RateLimiter(max_per_second=100)

if limiter.allow():
    logger.info("Rate limited log", ...)

3. Error-Biased Sampling:

# Always log errors, sample successful requests
if level == "error" or random.random() < 0.01:
    logger.log(level, message, ...)

4. Head-Based Sampling (trace-aware):

# If trace is sampled, log all related logs
if trace_context.is_sampled():
    logger.info("Traced log", trace_id=trace_context.trace_id)

Log Retention

Retention Strategy

Hot tier (fast SSD): 7-30 days

  • Recent logs
  • Full query performance
  • High cost

Warm tier (regular disk): 30-90 days

  • Older logs
  • Slower queries acceptable
  • Medium cost

Cold tier (object storage): 90+ days

  • Archive logs
  • Query via restore
  • Low cost

Example: Elasticsearch ILM Policy

{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_size": "50GB",
            "max_age": "1d"
          }
        }
      },
      "warm": {
        "min_age": "7d",
        "actions": {
          "allocate": { "number_of_replicas": 1 },
          "shrink": { "number_of_shards": 1 }
        }
      },
      "cold": {
        "min_age": "30d",
        "actions": {
          "allocate": { "require": { "box_type": "cold" } }
        }
      },
      "delete": {
        "min_age": "90d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

Security and Compliance

PII Redaction

Before logging:

import re

def redact_pii(data):
    # Redact email
    data = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
                  '[EMAIL]', data)
    # Redact credit card
    data = re.sub(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
                  '[CARD]', data)
    # Redact SSN
    data = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', data)
    return data

logger.info("User data", user_input=redact_pii(user_input))

In Logstash:

filter {
  mutate {
    gsub => [
      "message", "\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "[EMAIL]",
      "message", "\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", "[CARD]"
    ]
  }
}

Access Control

Elasticsearch (with Security):

# Role for developers
dev_logs:
  indices:
    - names: ['app-logs-*']
      privileges: ['read']
      query: '{"match": {"environment": "development"}}'

CloudWatch (IAM Policy):

{
  "Effect": "Allow",
  "Action": [
    "logs:DescribeLogGroups",
    "logs:GetLogEvents",
    "logs:FilterLogEvents"
  ],
  "Resource": "arn:aws:logs:*:*:log-group:/aws/app/production:*"
}

Common Pitfalls

1. Logging Sensitive Data

logger.info("Login", password=password) logger.info("Login", user_id=user_id)

2. Excessive Logging

Logging every iteration of a loop Log aggregate results or sample

3. Not Including Context

logger.error("Failed") logger.error("Payment failed", order_id=order_id, error=str(e))

4. Inconsistent Formats

Mix of JSON and plain text Pick one format and stick to it

5. No Request IDs

Can't trace request across services Generate and propagate request_id

6. Logging to Multiple Places

Log to file AND stdout AND syslog Log to stdout, let agent handle routing

7. Blocking on Log Writes

Synchronous writes to remote systems Asynchronous buffered writes


Performance Optimization

1. Async Logging

import logging
from logging.handlers import QueueHandler, QueueListener
import queue

# Create queue
log_queue = queue.Queue()

# Configure async handler
queue_handler = QueueHandler(log_queue)
logger.addHandler(queue_handler)

# Process logs in background thread
listener = QueueListener(log_queue, *handlers)
listener.start()

2. Conditional Logging

# Avoid expensive operations if not logging
if logger.isEnabledFor(logging.DEBUG):
    logger.debug("Details", data=expensive_serialization(obj))

3. Batching

# Batch logs before sending
batch = []
for log in logs:
    batch.append(log)
    if len(batch) >= 100:
        send_to_aggregator(batch)
        batch = []

4. Compression

# Filebeat with compression
output.logstash:
  hosts: ["logstash:5044"]
  compression_level: 3

Monitoring Log Pipeline

Track pipeline health with metrics:

# Log ingestion rate
rate(logs_ingested_total[5m])

# Pipeline lag
log_processing_lag_seconds

# Dropped logs
rate(logs_dropped_total[5m])

# Error parsing rate
rate(logs_parse_errors_total[5m])

Alert on:

  • Sudden drop in log volume (service down?)
  • High parse error rate (format changed?)
  • Pipeline lag > 1 minute (capacity issue?)