# API Integration Guide for Odoo Connectors ## REST API Integration ### Standard REST Pattern **Adapter Structure**: ```python class RESTAdapter(GenericAdapter): def get_resource(self, resource_id): """GET /resources/{id}""" return self.get(f'/{self.resource_name}/{resource_id}') def list_resources(self, filters=None): """GET /resources""" return self.get(f'/{self.resource_name}', params=filters) def create_resource(self, data): """POST /resources""" return self.post(f'/{self.resource_name}', data=data) def update_resource(self, resource_id, data): """PUT /resources/{id}""" return self.put(f'/{self.resource_name}/{resource_id}', data=data) def delete_resource(self, resource_id): """DELETE /resources/{id}""" return self.delete(f'/{self.resource_name}/{resource_id}') ``` ### Pagination Handling **Offset-Based Pagination**: ```python def get_all_resources(self, filters=None): """Fetch all resources with pagination.""" all_resources = [] page = 1 per_page = 100 while True: params = filters.copy() if filters else {} params.update({'page': page, 'per_page': per_page}) response = self.get('/resources', params=params) resources = response.get('data', []) if not resources: break all_resources.extend(resources) # Check if more pages exist total = response.get('total', 0) if len(all_resources) >= total: break page += 1 return all_resources ``` **Cursor-Based Pagination**: ```python def get_all_resources(self, filters=None): """Fetch all resources with cursor pagination.""" all_resources = [] cursor = None while True: params = filters.copy() if filters else {} if cursor: params['cursor'] = cursor response = self.get('/resources', params=params) resources = response.get('data', []) if not resources: break all_resources.extend(resources) # Get next cursor cursor = response.get('next_cursor') if not cursor: break return all_resources ``` **Link Header Pagination**: ```python def get_all_resources(self): """Follow Link headers for pagination.""" all_resources = [] url = '/resources' while url: response = requests.get(self.build_url(url), headers=self.get_api_headers()) response.raise_for_status() all_resources.extend(response.json()) # Parse Link header link_header = response.headers.get('Link', '') url = self._extract_next_url(link_header) return all_resources def _extract_next_url(self, link_header): """Extract next URL from Link header.""" import re match = re.search(r'<([^>]+)>; rel="next"', link_header) return match.group(1) if match else None ``` ### Response Envelope Handling **Wrapped Response**: ```python def get_products(self): """Handle wrapped API response.""" response = self.get('/products') # Response: {"status": "success", "data": {"products": [...]}} if response.get('status') == 'success': return response.get('data', {}).get('products', []) raise ValueError(f"API error: {response.get('message')}") ``` **Nested Data**: ```python def extract_data(self, response): """Extract data from nested structure.""" # Response: {"response": {"result": {"items": [...]}}} return response.get('response', {}).get('result', {}).get('items', []) ``` ## GraphQL API Integration **GraphQL Adapter**: ```python class GraphQLAdapter(GenericAdapter): def query(self, query, variables=None): """Execute GraphQL query.""" payload = {'query': query} if variables: payload['variables'] = variables response = self.post('/graphql', data=payload) if 'errors' in response: raise ValueError(f"GraphQL errors: {response['errors']}") return response.get('data') def get_products(self, first=100, after=None): """Fetch products using GraphQL.""" query = """ query GetProducts($first: Int!, $after: String) { products(first: $first, after: $after) { edges { node { id title description variants { edges { node { id price sku } } } } cursor } pageInfo { hasNextPage endCursor } } } """ variables = {'first': first} if after: variables['after'] = after return self.query(query, variables) def get_all_products(self): """Fetch all products with pagination.""" all_products = [] has_next_page = True cursor = None while has_next_page: data = self.get_products(after=cursor) products_data = data.get('products', {}) edges = products_data.get('edges', []) all_products.extend([edge['node'] for edge in edges]) page_info = products_data.get('pageInfo', {}) has_next_page = page_info.get('hasNextPage', False) cursor = page_info.get('endCursor') return all_products ``` ## SOAP API Integration **SOAP Adapter**: ```python from zeep import Client from zeep.transports import Transport class SOAPAdapter(GenericAdapter): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.client = self._create_client() def _create_client(self): """Create SOAP client.""" wsdl = f'{self.backend_record.api_url}?wsdl' # Configure transport session = requests.Session() session.auth = ( self.backend_record.api_username, self.backend_record.api_password ) transport = Transport(session=session) return Client(wsdl, transport=transport) def get_products(self): """Call SOAP method.""" try: response = self.client.service.GetProducts() return response except Exception as e: _logger.error("SOAP call failed: %s", str(e)) raise ``` ## Webhook Integration ### Webhook Controller ```python from odoo import http from odoo.http import request import json import hmac import hashlib class MyConnectorWebhookController(http.Controller): @http.route('/myconnector/webhook', type='json', auth='none', csrf=False) def webhook(self): """Handle incoming webhooks.""" try: # Get raw payload payload = request.httprequest.get_data(as_text=True) # Get headers signature = request.httprequest.headers.get('X-Signature') event_type = request.httprequest.headers.get('X-Event-Type') # Find backend (by API key or other identifier) api_key = request.httprequest.headers.get('X-API-Key') backend = request.env['myconnector.backend'].sudo().search([ ('api_key', '=', api_key) ], limit=1) if not backend: return {'error': 'Invalid API key'}, 401 # Verify signature if not self._verify_signature(payload, signature, backend.webhook_secret): return {'error': 'Invalid signature'}, 401 # Create webhook record webhook = request.env['generic.webhook'].sudo().create({ 'backend_id': backend.id, 'event_type': event_type, 'payload': payload, 'signature': signature, 'processing_status': 'pending', }) # Process asynchronously webhook.with_delay().process_webhook() return {'status': 'accepted', 'webhook_id': webhook.id} except Exception as e: _logger.exception("Webhook processing failed") return {'error': str(e)}, 500 def _verify_signature(self, payload, signature, secret): """Verify HMAC signature.""" expected = hmac.new( secret.encode('utf-8'), payload.encode('utf-8'), hashlib.sha256 ).hexdigest() return hmac.compare_digest(signature, expected) ``` ### Webhook Processing ```python class MyBackend(models.Model): def process_webhook(self, webhook): """Process webhook by event type.""" handlers = { 'order.created': self._handle_order_created, 'order.updated': self._handle_order_updated, 'product.updated': self._handle_product_updated, 'inventory.updated': self._handle_inventory_updated, } handler = handlers.get(webhook.event_type) if handler: try: handler(webhook) webhook.mark_as_processed() except Exception as e: _logger.exception("Webhook handler failed") webhook.mark_as_failed(str(e)) else: webhook.mark_as_ignored(f"No handler for {webhook.event_type}") def _handle_order_created(self, webhook): """Handle order.created event.""" payload = json.loads(webhook.payload) order_id = payload['order']['id'] # Import the order self.env['myconnector.sale.order'].import_record( backend=self, external_id=str(order_id) ) ``` ## Rate Limiting ### Token Bucket Implementation ```python from datetime import datetime, timedelta from collections import defaultdict class RateLimiter: def __init__(self, rate_limit=100, window=60): """ Args: rate_limit: Number of requests allowed window: Time window in seconds """ self.rate_limit = rate_limit self.window = window self.buckets = defaultdict(list) def allow_request(self, key): """Check if request is allowed.""" now = datetime.now() window_start = now - timedelta(seconds=self.window) # Clean old requests self.buckets[key] = [ req_time for req_time in self.buckets[key] if req_time > window_start ] # Check limit if len(self.buckets[key]) >= self.rate_limit: return False # Add current request self.buckets[key].append(now) return True class RateLimitedAdapter(GenericAdapter): _rate_limiter = None @classmethod def get_rate_limiter(cls): if cls._rate_limiter is None: cls._rate_limiter = RateLimiter(rate_limit=100, window=60) return cls._rate_limiter def make_request(self, method, endpoint, **kwargs): """Make request with rate limiting.""" limiter = self.get_rate_limiter() key = f"{self.backend_record.id}" if not limiter.allow_request(key): # Wait and retry import time time.sleep(1) return self.make_request(method, endpoint, **kwargs) return super().make_request(method, endpoint, **kwargs) ``` ### Response Header Rate Limiting ```python def make_request(self, method, endpoint, **kwargs): """Check rate limit from response headers.""" response = super().make_request(method, endpoint, **kwargs) # Check rate limit headers remaining = response.headers.get('X-RateLimit-Remaining') reset_time = response.headers.get('X-RateLimit-Reset') if remaining and int(remaining) < 10: _logger.warning( "Rate limit nearly exceeded. Remaining: %s, Resets at: %s", remaining, reset_time ) # Optionally delay next request if int(remaining) == 0: import time reset_timestamp = int(reset_time) wait_time = reset_timestamp - time.time() if wait_time > 0: time.sleep(wait_time) return response ``` ## Error Handling ### Retry with Exponential Backoff ```python import time from requests.exceptions import RequestException class ResilientAdapter(GenericAdapter): def make_request(self, method, endpoint, max_retries=3, **kwargs): """Make request with retry logic.""" for attempt in range(max_retries): try: return super().make_request(method, endpoint, **kwargs) except RequestException as e: if attempt == max_retries - 1: # Last attempt, re-raise raise # Calculate backoff wait_time = (2 ** attempt) + (random.random() * 0.1) _logger.warning( "Request failed (attempt %d/%d): %s. Retrying in %.2fs", attempt + 1, max_retries, str(e), wait_time ) time.sleep(wait_time) ``` ### Status Code Handling ```python def make_request(self, method, endpoint, **kwargs): """Handle different HTTP status codes.""" response = requests.request( method=method, url=self.build_url(endpoint), headers=self.get_api_headers(), **kwargs ) if response.status_code == 200: return response.json() elif response.status_code == 201: return response.json() elif response.status_code == 204: return None # No content elif response.status_code == 400: raise ValueError(f"Bad request: {response.text}") elif response.status_code == 401: raise PermissionError("Unauthorized. Check API credentials.") elif response.status_code == 403: raise PermissionError("Forbidden. Insufficient permissions.") elif response.status_code == 404: return None # Resource not found elif response.status_code == 429: # Rate limited retry_after = response.headers.get('Retry-After', 60) raise RateLimitExceeded(f"Rate limited. Retry after {retry_after}s") elif response.status_code >= 500: raise ServerError(f"Server error: {response.status_code}") else: response.raise_for_status() ``` ## Testing APIs ### Mock Adapter for Testing ```python class MockAdapter(GenericAdapter): """Mock adapter for testing.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.mock_data = {} def set_mock_response(self, endpoint, data): """Set mock response for endpoint.""" self.mock_data[endpoint] = data def get(self, endpoint, **kwargs): """Return mock data instead of making real request.""" return self.mock_data.get(endpoint, {}) # In tests def test_product_import(self): backend = self.env['myconnector.backend'].create({...}) # Use mock adapter adapter = MockAdapter(self.env, backend) adapter.set_mock_response('/products/123', { 'id': 123, 'title': 'Test Product', 'price': 99.99 }) # Test import importer = ProductImporter(...) result = importer.run(external_id='123') self.assertEqual(result.name, 'Test Product') ```