Files
2025-11-29 18:50:04 +08:00

15 KiB

API Integration Guide for Odoo Connectors

REST API Integration

Standard REST Pattern

Adapter Structure:

class RESTAdapter(GenericAdapter):
    def get_resource(self, resource_id):
        """GET /resources/{id}"""
        return self.get(f'/{self.resource_name}/{resource_id}')

    def list_resources(self, filters=None):
        """GET /resources"""
        return self.get(f'/{self.resource_name}', params=filters)

    def create_resource(self, data):
        """POST /resources"""
        return self.post(f'/{self.resource_name}', data=data)

    def update_resource(self, resource_id, data):
        """PUT /resources/{id}"""
        return self.put(f'/{self.resource_name}/{resource_id}', data=data)

    def delete_resource(self, resource_id):
        """DELETE /resources/{id}"""
        return self.delete(f'/{self.resource_name}/{resource_id}')

Pagination Handling

Offset-Based Pagination:

def get_all_resources(self, filters=None):
    """Fetch all resources with pagination."""
    all_resources = []
    page = 1
    per_page = 100

    while True:
        params = filters.copy() if filters else {}
        params.update({'page': page, 'per_page': per_page})

        response = self.get('/resources', params=params)
        resources = response.get('data', [])

        if not resources:
            break

        all_resources.extend(resources)

        # Check if more pages exist
        total = response.get('total', 0)
        if len(all_resources) >= total:
            break

        page += 1

    return all_resources

Cursor-Based Pagination:

def get_all_resources(self, filters=None):
    """Fetch all resources with cursor pagination."""
    all_resources = []
    cursor = None

    while True:
        params = filters.copy() if filters else {}
        if cursor:
            params['cursor'] = cursor

        response = self.get('/resources', params=params)
        resources = response.get('data', [])

        if not resources:
            break

        all_resources.extend(resources)

        # Get next cursor
        cursor = response.get('next_cursor')
        if not cursor:
            break

    return all_resources

Link Header Pagination:

def get_all_resources(self):
    """Follow Link headers for pagination."""
    all_resources = []
    url = '/resources'

    while url:
        response = requests.get(self.build_url(url), headers=self.get_api_headers())
        response.raise_for_status()

        all_resources.extend(response.json())

        # Parse Link header
        link_header = response.headers.get('Link', '')
        url = self._extract_next_url(link_header)

    return all_resources

def _extract_next_url(self, link_header):
    """Extract next URL from Link header."""
    import re
    match = re.search(r'<([^>]+)>; rel="next"', link_header)
    return match.group(1) if match else None

Response Envelope Handling

Wrapped Response:

def get_products(self):
    """Handle wrapped API response."""
    response = self.get('/products')

    # Response: {"status": "success", "data": {"products": [...]}}
    if response.get('status') == 'success':
        return response.get('data', {}).get('products', [])

    raise ValueError(f"API error: {response.get('message')}")

Nested Data:

def extract_data(self, response):
    """Extract data from nested structure."""
    # Response: {"response": {"result": {"items": [...]}}}
    return response.get('response', {}).get('result', {}).get('items', [])

GraphQL API Integration

GraphQL Adapter:

class GraphQLAdapter(GenericAdapter):
    def query(self, query, variables=None):
        """Execute GraphQL query."""
        payload = {'query': query}
        if variables:
            payload['variables'] = variables

        response = self.post('/graphql', data=payload)

        if 'errors' in response:
            raise ValueError(f"GraphQL errors: {response['errors']}")

        return response.get('data')

    def get_products(self, first=100, after=None):
        """Fetch products using GraphQL."""
        query = """
        query GetProducts($first: Int!, $after: String) {
            products(first: $first, after: $after) {
                edges {
                    node {
                        id
                        title
                        description
                        variants {
                            edges {
                                node {
                                    id
                                    price
                                    sku
                                }
                            }
                        }
                    }
                    cursor
                }
                pageInfo {
                    hasNextPage
                    endCursor
                }
            }
        }
        """

        variables = {'first': first}
        if after:
            variables['after'] = after

        return self.query(query, variables)

    def get_all_products(self):
        """Fetch all products with pagination."""
        all_products = []
        has_next_page = True
        cursor = None

        while has_next_page:
            data = self.get_products(after=cursor)
            products_data = data.get('products', {})

            edges = products_data.get('edges', [])
            all_products.extend([edge['node'] for edge in edges])

            page_info = products_data.get('pageInfo', {})
            has_next_page = page_info.get('hasNextPage', False)
            cursor = page_info.get('endCursor')

        return all_products

SOAP API Integration

SOAP Adapter:

from zeep import Client
from zeep.transports import Transport

class SOAPAdapter(GenericAdapter):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.client = self._create_client()

    def _create_client(self):
        """Create SOAP client."""
        wsdl = f'{self.backend_record.api_url}?wsdl'

        # Configure transport
        session = requests.Session()
        session.auth = (
            self.backend_record.api_username,
            self.backend_record.api_password
        )
        transport = Transport(session=session)

        return Client(wsdl, transport=transport)

    def get_products(self):
        """Call SOAP method."""
        try:
            response = self.client.service.GetProducts()
            return response
        except Exception as e:
            _logger.error("SOAP call failed: %s", str(e))
            raise

Webhook Integration

Webhook Controller

from odoo import http
from odoo.http import request
import json
import hmac
import hashlib

class MyConnectorWebhookController(http.Controller):

    @http.route('/myconnector/webhook', type='json', auth='none', csrf=False)
    def webhook(self):
        """Handle incoming webhooks."""
        try:
            # Get raw payload
            payload = request.httprequest.get_data(as_text=True)

            # Get headers
            signature = request.httprequest.headers.get('X-Signature')
            event_type = request.httprequest.headers.get('X-Event-Type')

            # Find backend (by API key or other identifier)
            api_key = request.httprequest.headers.get('X-API-Key')
            backend = request.env['myconnector.backend'].sudo().search([
                ('api_key', '=', api_key)
            ], limit=1)

            if not backend:
                return {'error': 'Invalid API key'}, 401

            # Verify signature
            if not self._verify_signature(payload, signature, backend.webhook_secret):
                return {'error': 'Invalid signature'}, 401

            # Create webhook record
            webhook = request.env['generic.webhook'].sudo().create({
                'backend_id': backend.id,
                'event_type': event_type,
                'payload': payload,
                'signature': signature,
                'processing_status': 'pending',
            })

            # Process asynchronously
            webhook.with_delay().process_webhook()

            return {'status': 'accepted', 'webhook_id': webhook.id}

        except Exception as e:
            _logger.exception("Webhook processing failed")
            return {'error': str(e)}, 500

    def _verify_signature(self, payload, signature, secret):
        """Verify HMAC signature."""
        expected = hmac.new(
            secret.encode('utf-8'),
            payload.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()

        return hmac.compare_digest(signature, expected)

Webhook Processing

class MyBackend(models.Model):
    def process_webhook(self, webhook):
        """Process webhook by event type."""
        handlers = {
            'order.created': self._handle_order_created,
            'order.updated': self._handle_order_updated,
            'product.updated': self._handle_product_updated,
            'inventory.updated': self._handle_inventory_updated,
        }

        handler = handlers.get(webhook.event_type)
        if handler:
            try:
                handler(webhook)
                webhook.mark_as_processed()
            except Exception as e:
                _logger.exception("Webhook handler failed")
                webhook.mark_as_failed(str(e))
        else:
            webhook.mark_as_ignored(f"No handler for {webhook.event_type}")

    def _handle_order_created(self, webhook):
        """Handle order.created event."""
        payload = json.loads(webhook.payload)
        order_id = payload['order']['id']

        # Import the order
        self.env['myconnector.sale.order'].import_record(
            backend=self,
            external_id=str(order_id)
        )

Rate Limiting

Token Bucket Implementation

from datetime import datetime, timedelta
from collections import defaultdict

class RateLimiter:
    def __init__(self, rate_limit=100, window=60):
        """
        Args:
            rate_limit: Number of requests allowed
            window: Time window in seconds
        """
        self.rate_limit = rate_limit
        self.window = window
        self.buckets = defaultdict(list)

    def allow_request(self, key):
        """Check if request is allowed."""
        now = datetime.now()
        window_start = now - timedelta(seconds=self.window)

        # Clean old requests
        self.buckets[key] = [
            req_time for req_time in self.buckets[key]
            if req_time > window_start
        ]

        # Check limit
        if len(self.buckets[key]) >= self.rate_limit:
            return False

        # Add current request
        self.buckets[key].append(now)
        return True

class RateLimitedAdapter(GenericAdapter):
    _rate_limiter = None

    @classmethod
    def get_rate_limiter(cls):
        if cls._rate_limiter is None:
            cls._rate_limiter = RateLimiter(rate_limit=100, window=60)
        return cls._rate_limiter

    def make_request(self, method, endpoint, **kwargs):
        """Make request with rate limiting."""
        limiter = self.get_rate_limiter()
        key = f"{self.backend_record.id}"

        if not limiter.allow_request(key):
            # Wait and retry
            import time
            time.sleep(1)
            return self.make_request(method, endpoint, **kwargs)

        return super().make_request(method, endpoint, **kwargs)

Response Header Rate Limiting

def make_request(self, method, endpoint, **kwargs):
    """Check rate limit from response headers."""
    response = super().make_request(method, endpoint, **kwargs)

    # Check rate limit headers
    remaining = response.headers.get('X-RateLimit-Remaining')
    reset_time = response.headers.get('X-RateLimit-Reset')

    if remaining and int(remaining) < 10:
        _logger.warning(
            "Rate limit nearly exceeded. Remaining: %s, Resets at: %s",
            remaining,
            reset_time
        )

        # Optionally delay next request
        if int(remaining) == 0:
            import time
            reset_timestamp = int(reset_time)
            wait_time = reset_timestamp - time.time()
            if wait_time > 0:
                time.sleep(wait_time)

    return response

Error Handling

Retry with Exponential Backoff

import time
from requests.exceptions import RequestException

class ResilientAdapter(GenericAdapter):
    def make_request(self, method, endpoint, max_retries=3, **kwargs):
        """Make request with retry logic."""
        for attempt in range(max_retries):
            try:
                return super().make_request(method, endpoint, **kwargs)

            except RequestException as e:
                if attempt == max_retries - 1:
                    # Last attempt, re-raise
                    raise

                # Calculate backoff
                wait_time = (2 ** attempt) + (random.random() * 0.1)

                _logger.warning(
                    "Request failed (attempt %d/%d): %s. Retrying in %.2fs",
                    attempt + 1,
                    max_retries,
                    str(e),
                    wait_time
                )

                time.sleep(wait_time)

Status Code Handling

def make_request(self, method, endpoint, **kwargs):
    """Handle different HTTP status codes."""
    response = requests.request(
        method=method,
        url=self.build_url(endpoint),
        headers=self.get_api_headers(),
        **kwargs
    )

    if response.status_code == 200:
        return response.json()

    elif response.status_code == 201:
        return response.json()

    elif response.status_code == 204:
        return None  # No content

    elif response.status_code == 400:
        raise ValueError(f"Bad request: {response.text}")

    elif response.status_code == 401:
        raise PermissionError("Unauthorized. Check API credentials.")

    elif response.status_code == 403:
        raise PermissionError("Forbidden. Insufficient permissions.")

    elif response.status_code == 404:
        return None  # Resource not found

    elif response.status_code == 429:
        # Rate limited
        retry_after = response.headers.get('Retry-After', 60)
        raise RateLimitExceeded(f"Rate limited. Retry after {retry_after}s")

    elif response.status_code >= 500:
        raise ServerError(f"Server error: {response.status_code}")

    else:
        response.raise_for_status()

Testing APIs

Mock Adapter for Testing

class MockAdapter(GenericAdapter):
    """Mock adapter for testing."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.mock_data = {}

    def set_mock_response(self, endpoint, data):
        """Set mock response for endpoint."""
        self.mock_data[endpoint] = data

    def get(self, endpoint, **kwargs):
        """Return mock data instead of making real request."""
        return self.mock_data.get(endpoint, {})

# In tests
def test_product_import(self):
    backend = self.env['myconnector.backend'].create({...})

    # Use mock adapter
    adapter = MockAdapter(self.env, backend)
    adapter.set_mock_response('/products/123', {
        'id': 123,
        'title': 'Test Product',
        'price': 99.99
    })

    # Test import
    importer = ProductImporter(...)
    result = importer.run(external_id='123')

    self.assertEqual(result.name, 'Test Product')