Files
2025-11-29 18:50:04 +08:00

16 KiB

Design Patterns in Odoo Connectors

1. Template Method Pattern

Usage: Backend model orchestration

Implementation:

# generic_connector provides template methods
class GenericBackend(models.Model):
    def sync_orders(self):
        """Template method."""
        self._pre_sync_validation()
        result = self._sync_orders_implementation()  # Hook
        self._post_sync_actions()
        return result

    def _sync_orders_implementation(self):
        """Override this in concrete implementations."""
        raise NotImplementedError()

# Concrete connector overrides the hook
class ShopifyBackend(models.Model):
    _inherit = 'generic.backend'

    def _sync_orders_implementation(self):
        """Shopify-specific implementation."""
        # Actual sync logic here
        pass

Benefits:

  • Enforces consistent workflow
  • Allows customization at specific points
  • Reduces code duplication

2. Adapter Pattern

Usage: API communication abstraction

Implementation:

class ShopifyAdapter(GenericAdapter):
    """Adapts Shopify API to generic interface."""

    def get_orders(self, filters=None):
        """Translate to Shopify API call."""
        # Shopify uses /admin/api/2024-01/orders.json
        endpoint = '/admin/api/2024-01/orders.json'
        response = self.get(endpoint, params=self._build_params(filters))

        # Shopify wraps response in 'orders' key
        return response.get('orders', [])

    def _build_params(self, filters):
        """Transform generic filters to Shopify params."""
        params = {}
        if filters and 'created_after' in filters:
            params['created_at_min'] = filters['created_after']
        return params

Benefits:

  • Hides API differences
  • Provides consistent interface
  • Simplifies testing (mock adapter)

3. Strategy Pattern

Usage: Different sync strategies per backend

Implementation:

class GenericImporter(Component):
    _name = 'generic.importer'

    def run(self, external_id, force=False):
        """Import strategy can vary."""
        if self._should_skip_import(external_id, force):
            return None

        # Different strategies:
        # - Direct import
        # - Delayed import
        # - Batch import
        return self._import_record(external_id)

class RealtimeImporter(GenericImporter):
    """Strategy: Import immediately."""
    _name = 'shopify.realtime.importer'

    def _import_record(self, external_id):
        # Import synchronously
        pass

class BatchImporter(GenericImporter):
    """Strategy: Queue for batch processing."""
    _name = 'shopify.batch.importer'

    def _import_record(self, external_id):
        # Queue for later
        self.with_delay().import_record(external_id)

4. Factory Pattern

Usage: Component selection based on context

Implementation:

# Component framework acts as factory
with backend.work_on('shopify.product.template') as work:
    # Factory automatically selects appropriate components
    adapter = work.component(usage='backend.adapter')
    # Returns ShopifyProductAdapter

    importer = work.component(usage='record.importer')
    # Returns ShopifyProductImporter

    mapper = work.component(usage='import.mapper')
    # Returns ShopifyProductImportMapper

Benefits:

  • Automatic component selection
  • Decoupled component creation
  • Easy to extend with new components

5. Observer Pattern

Usage: Webhook event handling

Implementation:

# Event source (webhook controller)
class WebhookController(http.Controller):
    @http.route('/shopify/webhook', type='json', auth='none')
    def webhook_handler(self):
        payload = request.jsonrequest
        event_type = request.httprequest.headers.get('X-Shopify-Topic')

        # Notify observers
        webhook = request.env['generic.webhook'].sudo().create({
            'event_type': event_type,
            'payload': json.dumps(payload),
            'processing_status': 'pending'
        })
        webhook.with_delay().process_webhook()

# Observer (backend model)
class ShopifyBackend(models.Model):
    def process_webhook(self, webhook):
        """Observe and handle webhook events."""
        handlers = {
            'orders/create': self._handle_order_created,
            'products/update': self._handle_product_updated,
        }

        handler = handlers.get(webhook.event_type)
        if handler:
            handler(webhook)

6. Delegation Pattern

Usage: Multi-level model inheritance

Implementation:

class GenericBackend(models.Model):
    _name = 'generic.backend'
    _inherits = {'connector.base.backend': 'connector_backend_id'}

    connector_backend_id = fields.Many2one(
        'connector.base.backend',
        required=True,
        ondelete='cascade'
    )

    # Delegates fields: name, version, etc.
    # Accessing backend.name actually accesses backend.connector_backend_id.name

Benefits:

  • Reuse existing model functionality
  • Avoid deep inheritance hierarchies
  • Maintain database normalization

7. Mapper Pattern (Data Transfer Object)

Usage: Data transformation between systems

Implementation:

class ProductImportMapper(GenericImportMapper):
    _name = 'shopify.product.import.mapper'

    # Simple mappings
    direct = [
        ('title', 'name'),
        ('vendor', 'manufacturer'),
    ]

    # Complex mapping
    @mapping
    def description(self, record):
        """Transform HTML description to plain text."""
        html_desc = record.get('body_html', '')
        return {'description': self._strip_html(html_desc)}

    @mapping
    def price(self, record):
        """Extract price from variants."""
        variants = record.get('variants', [])
        if variants:
            return {'list_price': float(variants[0].get('price', 0))}
        return {}

    @only_create
    def default_code(self, record):
        """Set SKU only when creating."""
        return {'default_code': record.get('sku')}

Benefits:

  • Centralized data transformation
  • Declarative mapping definitions
  • Reusable transformations

8. Retry Pattern

Usage: Handling transient failures

Implementation:

class GenericBinding(models.AbstractModel):
    retry_count = fields.Integer(default=0)
    max_retries = fields.Integer(default=3)

    def can_retry_sync(self):
        """Check if retry is allowed."""
        return self.retry_count < self.max_retries

    def export_with_retry(self):
        """Export with automatic retry."""
        try:
            self.export_record()
            self.mark_sync_success()
        except Exception as e:
            self.retry_count += 1
            self.last_error = str(e)

            if self.can_retry_sync():
                # Retry with exponential backoff
                delay = 60 * (2 ** self.retry_count)
                self.with_delay(eta=delay).export_with_retry()
            else:
                self.mark_sync_failed(str(e))

Benefits:

  • Resilient to temporary failures
  • Configurable retry behavior
  • Exponential backoff prevents API overload

9. Rate Limiting Pattern

Usage: Respect API rate limits

Implementation:

from datetime import datetime, timedelta
from cachetools import TTLCache

class RateLimitedAdapter(GenericAdapter):
    # Class-level cache (shared across instances)
    _rate_limit_cache = TTLCache(maxsize=100, ttl=60)

    def make_request(self, method, endpoint, **kwargs):
        """Make request with rate limiting."""
        cache_key = f"{self.backend_record.id}:requests"

        # Get request count in current window
        request_count = self._rate_limit_cache.get(cache_key, 0)
        max_requests = self.backend_record.rate_limit_calls or 100

        if request_count >= max_requests:
            # Wait for next window
            raise RateLimitExceeded(f"Rate limit of {max_requests}/min exceeded")

        # Make request
        response = super().make_request(method, endpoint, **kwargs)

        # Increment counter
        self._rate_limit_cache[cache_key] = request_count + 1

        return response

Variants:

  • Token Bucket: For bursty traffic
  • Leaky Bucket: For steady rate
  • Sliding Window: For precise limits

10. Circuit Breaker Pattern

Usage: Prevent cascading failures

Implementation:

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN

    def call(self, func, *args, **kwargs):
        if self.state == 'OPEN':
            if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
                self.state = 'HALF_OPEN'
            else:
                raise CircuitBreakerOpen("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            if self.state == 'HALF_OPEN':
                self.state = 'CLOSED'
                self.failures = 0
            return result
        except Exception as e:
            self.failures += 1
            self.last_failure_time = datetime.now()

            if self.failures >= self.failure_threshold:
                self.state = 'OPEN'
            raise

class ResilientAdapter(GenericAdapter):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.circuit_breaker = CircuitBreaker()

    def make_request(self, *args, **kwargs):
        return self.circuit_breaker.call(
            super().make_request,
            *args,
            **kwargs
        )

11. Saga Pattern

Usage: Distributed transaction management

Implementation:

class OrderImportSaga:
    """Multi-step order import with compensation."""

    def __init__(self, backend, external_order_id):
        self.backend = backend
        self.external_order_id = external_order_id
        self.steps_completed = []

    def execute(self):
        """Execute saga steps."""
        try:
            # Step 1: Import customer
            customer = self._import_customer()
            self.steps_completed.append(('customer', customer.id))

            # Step 2: Import order
            order = self._import_order(customer)
            self.steps_completed.append(('order', order.id))

            # Step 3: Import order lines
            lines = self._import_order_lines(order)
            self.steps_completed.append(('lines', [l.id for l in lines]))

            # Step 4: Confirm order
            order.action_confirm()

            return order

        except Exception as e:
            # Compensate (rollback completed steps)
            self._compensate()
            raise

    def _compensate(self):
        """Rollback completed steps."""
        for step_type, record_ids in reversed(self.steps_completed):
            if step_type == 'order':
                self.env['sale.order'].browse(record_ids).action_cancel()
            elif step_type == 'customer':
                # Mark as not synced (don't delete)
                partner = self.env['res.partner'].browse(record_ids)
                partner.write({'active': False})

12. Repository Pattern

Usage: Centralize data access logic

Implementation:

class ProductBindingRepository:
    """Repository for product bindings."""

    def __init__(self, env, backend):
        self.env = env
        self.backend = backend
        self.model = env['shopify.product.template']

    def find_by_external_id(self, external_id):
        """Find binding by external ID."""
        return self.model.search([
            ('backend_id', '=', self.backend.id),
            ('external_id', '=', str(external_id))
        ], limit=1)

    def find_by_sku(self, sku):
        """Find binding by SKU."""
        return self.model.search([
            ('backend_id', '=', self.backend.id),
            ('default_code', '=', sku)
        ], limit=1)

    def find_or_create(self, external_id, defaults=None):
        """Find existing or create new binding."""
        binding = self.find_by_external_id(external_id)
        if not binding:
            values = {'backend_id': self.backend.id, 'external_id': str(external_id)}
            if defaults:
                values.update(defaults)
            binding = self.model.create(values)
        return binding

    def find_pending_export(self, limit=100):
        """Find bindings pending export."""
        return self.model.search([
            ('backend_id', '=', self.backend.id),
            ('sync_status', '=', 'pending'),
            ('external_id', '=', False)
        ], limit=limit)

Pattern Selection Guidelines

Use Case Pattern Reason
Define sync workflow Template Method Consistent process, extensible hooks
API communication Adapter Abstract API differences
Different import modes Strategy Pluggable algorithms
Select components Factory Automatic selection based on context
Handle webhooks Observer Event-driven architecture
Extend core models Delegation Reuse without deep inheritance
Transform data Mapper Declarative transformations
Handle failures Retry + Circuit Breaker Resilient operations
Respect API limits Rate Limiting Prevent API throttling
Multi-step operations Saga Rollback on failure
Data access Repository Centralized queries

Anti-Patterns to Avoid

Direct Odoo Record Modification

# BAD: Directly modify product
product = self.env['product.template'].browse(product_id)
product.write({'name': external_data['title']})
# GOOD: Use binding
binding = self.env['shopify.product.template'].search([
    ('odoo_id', '=', product_id)
])
binding.write({'name': external_data['title']})

Synchronous Long Operations

# BAD: Block user while importing 1000 products
def import_all_products(self):
    for product_id in range(1, 1000):
        self.import_product(product_id)  # Takes 30 minutes!
# GOOD: Queue async job
def import_all_products(self):
    self.with_delay().import_products_batch()

No Error Handling

# BAD: Unhandled API errors crash sync
def sync_orders(self):
    response = adapter.get_orders()  # What if API is down?
    for order in response:
        self.import_order(order)
# GOOD: Graceful error handling
def sync_orders(self):
    try:
        response = adapter.get_orders()
    except requests.HTTPError as e:
        _logger.error("Failed to fetch orders: %s", e)
        return False

    for order in response:
        try:
            self.import_order(order)
        except Exception as e:
            _logger.error("Failed to import order %s: %s", order['id'], e)
            continue  # Continue with next order

Hardcoded Configuration

# BAD: Hardcoded values
API_URL = 'https://api.shopify.com'
API_KEY = 'hardcoded-key-123'
# GOOD: Backend configuration
api_url = self.backend_record.api_url
api_key = self.backend_record.api_key

God Object

# BAD: Backend does everything
class ShopifyBackend(models.Model):
    def sync_orders(self):
        # 500 lines of code doing:
        # - API calls
        # - Data transformation
        # - Validation
        # - Record creation
        # - Email notifications
        # etc.
# GOOD: Separated concerns
class ShopifyBackend(models.Model):
    def sync_orders(self):
        # Orchestration only
        with self.work_on('shopify.sale.order') as work:
            importer = work.component(usage='batch.importer')
            return importer.run()

# Adapter handles API
# Mapper handles transformation
# Importer handles record creation