Files
2025-11-30 09:04:41 +08:00

4.0 KiB

description
description
Set up RabbitMQ async processing

AMQP/Async Integration

Set up RabbitMQ asynchronous processing for background jobs and event-driven architecture.

When to Use

Use this when:

  • Async email sending (queue email operations)
  • Event processing (handle domain events asynchronously)
  • Background jobs (report generation, data import, long-running tasks)
  • Webhooks (process incoming webhooks asynchronously)
  • Notifications (push notifications, SMS in background)
  • Data sync (synchronize data between services)

Workflow

Interactive Discovery

Ask these questions:

  1. Use case? (Async Email, Event Processing, Background Jobs, Webhooks, Notifications, Data Sync, Other)
  2. Publisher feature? (Which feature produces messages?)
  3. Consumer feature?
    • Same feature consumes own messages
    • Different feature (specify)
    • New dedicated worker feature (specify name)
  4. Operations to queue? (operation name, trigger event, payload structure)
  5. Failure handling? (retry count, dead letter queue, custom handling)
  6. Queue configuration? (one queue or multiple queues per operation/priority)
  7. Performance requirements? (messages per minute, max delay, concurrent consumers)
  8. RabbitMQ already configured? (Check pkg/queue/, .env, docker-compose.yml)

Present complete plan before proceeding.

Implementation Phases

Execute these 5 phases in order:

  1. Infrastructure Setup - phases/amqp/01-infrastructure.md
  2. Publisher Implementation - phases/amqp/02-publisher.md
  3. Consumer Implementation - phases/amqp/03-consumer.md
  4. Application Integration - phases/amqp/04-integration.md
  5. Code Quality - phases/shared/code-quality.md

Components Created

Infrastructure:

  • pkg/queue/rabbitmq.go - Queue service package
  • internal/handler/amqp/consumers.go - AMQP handler
  • docker-compose.yml - RabbitMQ container
  • .env - RabbitMQ configuration

Publisher:

  • features/{feature}/domain/dto/event.{entity}.go - Event structs
  • Service method integration for publishing

Consumer:

  • features/{consumer_feature}/amqp/consumer.{entity}.go - Message consumer
  • features/{consumer_feature}/service/service.{entity}.go - Processing logic
  • features/{consumer_feature}/main.{consumer_feature}.go - Consumer module

Common Queue Patterns

One Queue Per Operation:

Queue: user_welcome_email
Queue: order_confirmation_email

Use when: Different processing logic per operation

One Queue With Event Types:

Queue: email_queue
Events: user.welcome, order.confirmation

Use when: Shared processing infrastructure

Priority Queues:

Queue: high_priority_emails
Queue: low_priority_emails

Use when: Different SLAs for operations

Fanout Pattern:

Exchange: user_events (fanout)
Queues: email_queue, analytics_queue, audit_queue

Use when: Multiple consumers need same event

Example

User: /venturo-go:add-amqp

Claude: What use case? (Async Email, Events, Jobs, Webhooks, Notifications...)

User: Async Email Sending

Claude: Which feature publishes messages?

User: user_management

Claude: Which feature consumes?
A) Same (user_management)
B) Different (specify)
C) New worker (specify name)

User: C) email_sender

Claude: What operations to queue?

User: Welcome email, password reset email, verification email

[Presents plan with queue names, events, error handling]

Claude: Checking RabbitMQ infrastructure... Not found.

I'll set up:
1. pkg/queue/ (RabbitMQ service)
2. Docker container
3. Publisher in user_management
4. Consumer in email_sender feature

Proceed?

[Executes phases]

Tips

  • Start simple (one queue, one consumer)
  • Test locally with Docker
  • Monitor queues via RabbitMQ Management UI (http://localhost:15672)
  • Implement dead letter queue for failed messages
  • Make consumers idempotent (handle duplicates)
  • Log message processing
  • Graceful shutdown (finish processing before exit)
  • Keep messages small (reference data by ID)
  • Version message schemas
  • Use TLS in production