15 KiB
15 KiB
API Integration Guide for Odoo Connectors
REST API Integration
Standard REST Pattern
Adapter Structure:
class RESTAdapter(GenericAdapter):
def get_resource(self, resource_id):
"""GET /resources/{id}"""
return self.get(f'/{self.resource_name}/{resource_id}')
def list_resources(self, filters=None):
"""GET /resources"""
return self.get(f'/{self.resource_name}', params=filters)
def create_resource(self, data):
"""POST /resources"""
return self.post(f'/{self.resource_name}', data=data)
def update_resource(self, resource_id, data):
"""PUT /resources/{id}"""
return self.put(f'/{self.resource_name}/{resource_id}', data=data)
def delete_resource(self, resource_id):
"""DELETE /resources/{id}"""
return self.delete(f'/{self.resource_name}/{resource_id}')
Pagination Handling
Offset-Based Pagination:
def get_all_resources(self, filters=None):
"""Fetch all resources with pagination."""
all_resources = []
page = 1
per_page = 100
while True:
params = filters.copy() if filters else {}
params.update({'page': page, 'per_page': per_page})
response = self.get('/resources', params=params)
resources = response.get('data', [])
if not resources:
break
all_resources.extend(resources)
# Check if more pages exist
total = response.get('total', 0)
if len(all_resources) >= total:
break
page += 1
return all_resources
Cursor-Based Pagination:
def get_all_resources(self, filters=None):
"""Fetch all resources with cursor pagination."""
all_resources = []
cursor = None
while True:
params = filters.copy() if filters else {}
if cursor:
params['cursor'] = cursor
response = self.get('/resources', params=params)
resources = response.get('data', [])
if not resources:
break
all_resources.extend(resources)
# Get next cursor
cursor = response.get('next_cursor')
if not cursor:
break
return all_resources
Link Header Pagination:
def get_all_resources(self):
"""Follow Link headers for pagination."""
all_resources = []
url = '/resources'
while url:
response = requests.get(self.build_url(url), headers=self.get_api_headers())
response.raise_for_status()
all_resources.extend(response.json())
# Parse Link header
link_header = response.headers.get('Link', '')
url = self._extract_next_url(link_header)
return all_resources
def _extract_next_url(self, link_header):
"""Extract next URL from Link header."""
import re
match = re.search(r'<([^>]+)>; rel="next"', link_header)
return match.group(1) if match else None
Response Envelope Handling
Wrapped Response:
def get_products(self):
"""Handle wrapped API response."""
response = self.get('/products')
# Response: {"status": "success", "data": {"products": [...]}}
if response.get('status') == 'success':
return response.get('data', {}).get('products', [])
raise ValueError(f"API error: {response.get('message')}")
Nested Data:
def extract_data(self, response):
"""Extract data from nested structure."""
# Response: {"response": {"result": {"items": [...]}}}
return response.get('response', {}).get('result', {}).get('items', [])
GraphQL API Integration
GraphQL Adapter:
class GraphQLAdapter(GenericAdapter):
def query(self, query, variables=None):
"""Execute GraphQL query."""
payload = {'query': query}
if variables:
payload['variables'] = variables
response = self.post('/graphql', data=payload)
if 'errors' in response:
raise ValueError(f"GraphQL errors: {response['errors']}")
return response.get('data')
def get_products(self, first=100, after=None):
"""Fetch products using GraphQL."""
query = """
query GetProducts($first: Int!, $after: String) {
products(first: $first, after: $after) {
edges {
node {
id
title
description
variants {
edges {
node {
id
price
sku
}
}
}
}
cursor
}
pageInfo {
hasNextPage
endCursor
}
}
}
"""
variables = {'first': first}
if after:
variables['after'] = after
return self.query(query, variables)
def get_all_products(self):
"""Fetch all products with pagination."""
all_products = []
has_next_page = True
cursor = None
while has_next_page:
data = self.get_products(after=cursor)
products_data = data.get('products', {})
edges = products_data.get('edges', [])
all_products.extend([edge['node'] for edge in edges])
page_info = products_data.get('pageInfo', {})
has_next_page = page_info.get('hasNextPage', False)
cursor = page_info.get('endCursor')
return all_products
SOAP API Integration
SOAP Adapter:
from zeep import Client
from zeep.transports import Transport
class SOAPAdapter(GenericAdapter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.client = self._create_client()
def _create_client(self):
"""Create SOAP client."""
wsdl = f'{self.backend_record.api_url}?wsdl'
# Configure transport
session = requests.Session()
session.auth = (
self.backend_record.api_username,
self.backend_record.api_password
)
transport = Transport(session=session)
return Client(wsdl, transport=transport)
def get_products(self):
"""Call SOAP method."""
try:
response = self.client.service.GetProducts()
return response
except Exception as e:
_logger.error("SOAP call failed: %s", str(e))
raise
Webhook Integration
Webhook Controller
from odoo import http
from odoo.http import request
import json
import hmac
import hashlib
class MyConnectorWebhookController(http.Controller):
@http.route('/myconnector/webhook', type='json', auth='none', csrf=False)
def webhook(self):
"""Handle incoming webhooks."""
try:
# Get raw payload
payload = request.httprequest.get_data(as_text=True)
# Get headers
signature = request.httprequest.headers.get('X-Signature')
event_type = request.httprequest.headers.get('X-Event-Type')
# Find backend (by API key or other identifier)
api_key = request.httprequest.headers.get('X-API-Key')
backend = request.env['myconnector.backend'].sudo().search([
('api_key', '=', api_key)
], limit=1)
if not backend:
return {'error': 'Invalid API key'}, 401
# Verify signature
if not self._verify_signature(payload, signature, backend.webhook_secret):
return {'error': 'Invalid signature'}, 401
# Create webhook record
webhook = request.env['generic.webhook'].sudo().create({
'backend_id': backend.id,
'event_type': event_type,
'payload': payload,
'signature': signature,
'processing_status': 'pending',
})
# Process asynchronously
webhook.with_delay().process_webhook()
return {'status': 'accepted', 'webhook_id': webhook.id}
except Exception as e:
_logger.exception("Webhook processing failed")
return {'error': str(e)}, 500
def _verify_signature(self, payload, signature, secret):
"""Verify HMAC signature."""
expected = hmac.new(
secret.encode('utf-8'),
payload.encode('utf-8'),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
Webhook Processing
class MyBackend(models.Model):
def process_webhook(self, webhook):
"""Process webhook by event type."""
handlers = {
'order.created': self._handle_order_created,
'order.updated': self._handle_order_updated,
'product.updated': self._handle_product_updated,
'inventory.updated': self._handle_inventory_updated,
}
handler = handlers.get(webhook.event_type)
if handler:
try:
handler(webhook)
webhook.mark_as_processed()
except Exception as e:
_logger.exception("Webhook handler failed")
webhook.mark_as_failed(str(e))
else:
webhook.mark_as_ignored(f"No handler for {webhook.event_type}")
def _handle_order_created(self, webhook):
"""Handle order.created event."""
payload = json.loads(webhook.payload)
order_id = payload['order']['id']
# Import the order
self.env['myconnector.sale.order'].import_record(
backend=self,
external_id=str(order_id)
)
Rate Limiting
Token Bucket Implementation
from datetime import datetime, timedelta
from collections import defaultdict
class RateLimiter:
def __init__(self, rate_limit=100, window=60):
"""
Args:
rate_limit: Number of requests allowed
window: Time window in seconds
"""
self.rate_limit = rate_limit
self.window = window
self.buckets = defaultdict(list)
def allow_request(self, key):
"""Check if request is allowed."""
now = datetime.now()
window_start = now - timedelta(seconds=self.window)
# Clean old requests
self.buckets[key] = [
req_time for req_time in self.buckets[key]
if req_time > window_start
]
# Check limit
if len(self.buckets[key]) >= self.rate_limit:
return False
# Add current request
self.buckets[key].append(now)
return True
class RateLimitedAdapter(GenericAdapter):
_rate_limiter = None
@classmethod
def get_rate_limiter(cls):
if cls._rate_limiter is None:
cls._rate_limiter = RateLimiter(rate_limit=100, window=60)
return cls._rate_limiter
def make_request(self, method, endpoint, **kwargs):
"""Make request with rate limiting."""
limiter = self.get_rate_limiter()
key = f"{self.backend_record.id}"
if not limiter.allow_request(key):
# Wait and retry
import time
time.sleep(1)
return self.make_request(method, endpoint, **kwargs)
return super().make_request(method, endpoint, **kwargs)
Response Header Rate Limiting
def make_request(self, method, endpoint, **kwargs):
"""Check rate limit from response headers."""
response = super().make_request(method, endpoint, **kwargs)
# Check rate limit headers
remaining = response.headers.get('X-RateLimit-Remaining')
reset_time = response.headers.get('X-RateLimit-Reset')
if remaining and int(remaining) < 10:
_logger.warning(
"Rate limit nearly exceeded. Remaining: %s, Resets at: %s",
remaining,
reset_time
)
# Optionally delay next request
if int(remaining) == 0:
import time
reset_timestamp = int(reset_time)
wait_time = reset_timestamp - time.time()
if wait_time > 0:
time.sleep(wait_time)
return response
Error Handling
Retry with Exponential Backoff
import time
from requests.exceptions import RequestException
class ResilientAdapter(GenericAdapter):
def make_request(self, method, endpoint, max_retries=3, **kwargs):
"""Make request with retry logic."""
for attempt in range(max_retries):
try:
return super().make_request(method, endpoint, **kwargs)
except RequestException as e:
if attempt == max_retries - 1:
# Last attempt, re-raise
raise
# Calculate backoff
wait_time = (2 ** attempt) + (random.random() * 0.1)
_logger.warning(
"Request failed (attempt %d/%d): %s. Retrying in %.2fs",
attempt + 1,
max_retries,
str(e),
wait_time
)
time.sleep(wait_time)
Status Code Handling
def make_request(self, method, endpoint, **kwargs):
"""Handle different HTTP status codes."""
response = requests.request(
method=method,
url=self.build_url(endpoint),
headers=self.get_api_headers(),
**kwargs
)
if response.status_code == 200:
return response.json()
elif response.status_code == 201:
return response.json()
elif response.status_code == 204:
return None # No content
elif response.status_code == 400:
raise ValueError(f"Bad request: {response.text}")
elif response.status_code == 401:
raise PermissionError("Unauthorized. Check API credentials.")
elif response.status_code == 403:
raise PermissionError("Forbidden. Insufficient permissions.")
elif response.status_code == 404:
return None # Resource not found
elif response.status_code == 429:
# Rate limited
retry_after = response.headers.get('Retry-After', 60)
raise RateLimitExceeded(f"Rate limited. Retry after {retry_after}s")
elif response.status_code >= 500:
raise ServerError(f"Server error: {response.status_code}")
else:
response.raise_for_status()
Testing APIs
Mock Adapter for Testing
class MockAdapter(GenericAdapter):
"""Mock adapter for testing."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.mock_data = {}
def set_mock_response(self, endpoint, data):
"""Set mock response for endpoint."""
self.mock_data[endpoint] = data
def get(self, endpoint, **kwargs):
"""Return mock data instead of making real request."""
return self.mock_data.get(endpoint, {})
# In tests
def test_product_import(self):
backend = self.env['myconnector.backend'].create({...})
# Use mock adapter
adapter = MockAdapter(self.env, backend)
adapter.set_mock_response('/products/123', {
'id': 123,
'title': 'Test Product',
'price': 99.99
})
# Test import
importer = ProductImporter(...)
result = importer.run(external_id='123')
self.assertEqual(result.name, 'Test Product')