Files
2025-11-29 18:50:04 +08:00

13 KiB

Authentication Patterns for Odoo Connectors

1. API Key Authentication

Usage: Simple, static authentication

Backend Fields:

class MyBackend(models.Model):
    api_key = fields.Char(string='API Key', required=True)
    api_secret = fields.Char(string='API Secret')  # Optional

Adapter Implementation:

class MyAdapter(GenericAdapter):
    def get_api_headers(self):
        headers = super().get_api_headers()
        headers['X-API-Key'] = self.backend_record.api_key
        return headers

Variants:

  • Header-based: Authorization: ApiKey YOUR_KEY
  • Query parameter: ?api_key=YOUR_KEY
  • Custom header: X-API-Key: YOUR_KEY

2. Bearer Token Authentication

Usage: Token-based auth (common in modern APIs)

Backend Fields:

class MyBackend(models.Model):
    access_token = fields.Char(string='Access Token')

Adapter Implementation:

class MyAdapter(GenericAdapter):
    def get_api_headers(self):
        headers = super().get_api_headers()
        headers['Authorization'] = f'Bearer {self.backend_record.access_token}'
        return headers

3. OAuth 2.0 Authentication

Usage: Delegated authorization (Shopify, Google, etc.)

Authorization Code Flow

Backend Fields:

class MyBackend(models.Model):
    oauth_client_id = fields.Char(string='Client ID', required=True)
    oauth_client_secret = fields.Char(string='Client Secret', required=True)
    oauth_redirect_uri = fields.Char(string='Redirect URI', compute='_compute_redirect_uri')

    access_token = fields.Char(string='Access Token', readonly=True)
    refresh_token = fields.Char(string='Refresh Token', readonly=True)
    token_expires_at = fields.Datetime(string='Token Expires At', readonly=True)
    token_type = fields.Char(string='Token Type', readonly=True, default='Bearer')

    @api.depends()
    def _compute_redirect_uri(self):
        """Compute OAuth redirect URI."""
        for backend in self:
            base_url = backend.env['ir.config_parameter'].sudo().get_param('web.base.url')
            backend.oauth_redirect_uri = f'{base_url}/myconnector/oauth/callback'

    def action_start_oauth_flow(self):
        """Start OAuth authorization flow."""
        self.ensure_one()

        auth_url = self._build_authorization_url()

        return {
            'type': 'ir.actions.act_url',
            'url': auth_url,
            'target': 'new',
        }

    def _build_authorization_url(self):
        """Build OAuth authorization URL."""
        from urllib.parse import urlencode

        params = {
            'client_id': self.oauth_client_id,
            'redirect_uri': self.oauth_redirect_uri,
            'response_type': 'code',
            'scope': 'read_products write_orders',  # Adjust scopes
            'state': self._generate_oauth_state(),
        }

        return f'{self.api_url}/oauth/authorize?{urlencode(params)}'

    def _generate_oauth_state(self):
        """Generate OAuth state parameter for CSRF protection."""
        import secrets
        state = secrets.token_urlsafe(32)
        # Store state in session or database for validation
        self.env['ir.config_parameter'].sudo().set_param(
            f'oauth_state_{self.id}',
            state
        )
        return state

    def exchange_code_for_token(self, code, state):
        """Exchange authorization code for access token."""
        self.ensure_one()

        # Validate state
        stored_state = self.env['ir.config_parameter'].sudo().get_param(
            f'oauth_state_{self.id}'
        )
        if state != stored_state:
            raise ValueError('Invalid OAuth state')

        # Exchange code for token
        token_url = f'{self.api_url}/oauth/token'

        data = {
            'client_id': self.oauth_client_id,
            'client_secret': self.oauth_client_secret,
            'code': code,
            'redirect_uri': self.oauth_redirect_uri,
            'grant_type': 'authorization_code',
        }

        response = requests.post(token_url, data=data)
        response.raise_for_status()

        token_data = response.json()
        self._save_token_data(token_data)

    def _save_token_data(self, token_data):
        """Save OAuth token data."""
        from datetime import datetime, timedelta

        expires_in = token_data.get('expires_in', 3600)
        expires_at = datetime.now() + timedelta(seconds=expires_in)

        self.write({
            'access_token': token_data['access_token'],
            'refresh_token': token_data.get('refresh_token'),
            'token_expires_at': expires_at,
            'token_type': token_data.get('token_type', 'Bearer'),
        })

    def refresh_access_token(self):
        """Refresh expired access token."""
        self.ensure_one()

        if not self.refresh_token:
            raise ValueError('No refresh token available')

        token_url = f'{self.api_url}/oauth/token'

        data = {
            'client_id': self.oauth_client_id,
            'client_secret': self.oauth_client_secret,
            'refresh_token': self.refresh_token,
            'grant_type': 'refresh_token',
        }

        response = requests.post(token_url, data=data)
        response.raise_for_status()

        token_data = response.json()
        self._save_token_data(token_data)

OAuth Callback Controller:

from odoo import http
from odoo.http import request

class MyConnectorOAuthController(http.Controller):

    @http.route('/myconnector/oauth/callback', type='http', auth='user', csrf=False)
    def oauth_callback(self, code=None, state=None, error=None):
        """Handle OAuth callback."""
        if error:
            return request.render('myconnector.oauth_error', {'error': error})

        if not code or not state:
            return request.render('myconnector.oauth_error',
                                {'error': 'Missing code or state'})

        # Find backend by state or use session
        backend_id = request.session.get('oauth_backend_id')
        if not backend_id:
            return request.render('myconnector.oauth_error',
                                {'error': 'Invalid session'})

        backend = request.env['myconnector.backend'].sudo().browse(backend_id)

        try:
            backend.exchange_code_for_token(code, state)
            return request.render('myconnector.oauth_success')
        except Exception as e:
            return request.render('myconnector.oauth_error', {'error': str(e)})

Adapter with Token Refresh:

class MyAdapter(GenericAdapter):
    def make_request(self, method, endpoint, **kwargs):
        """Make request with automatic token refresh."""
        # Check if token is expired
        if self._is_token_expired():
            self.backend_record.refresh_access_token()

        return super().make_request(method, endpoint, **kwargs)

    def _is_token_expired(self):
        """Check if access token is expired."""
        from datetime import datetime, timedelta

        if not self.backend_record.token_expires_at:
            return False

        # Refresh 5 minutes before expiry
        buffer = timedelta(minutes=5)
        return datetime.now() + buffer >= self.backend_record.token_expires_at

    def get_api_headers(self):
        headers = super().get_api_headers()
        headers['Authorization'] = (
            f'{self.backend_record.token_type} {self.backend_record.access_token}'
        )
        return headers

4. Basic Authentication

Usage: Username/password (less common, less secure)

Backend Fields:

class MyBackend(models.Model):
    api_username = fields.Char(string='Username', required=True)
    api_password = fields.Char(string='Password', required=True)

Adapter Implementation:

class MyAdapter(GenericAdapter):
    def get_api_auth(self):
        """Return (username, password) tuple for requests."""
        return (
            self.backend_record.api_username,
            self.backend_record.api_password
        )

    def make_request(self, method, endpoint, **kwargs):
        """Add basic auth to requests."""
        kwargs['auth'] = self.get_api_auth()
        return super().make_request(method, endpoint, **kwargs)

5. HMAC Signature Authentication

Usage: Signed requests (high security)

Backend Fields:

class MyBackend(models.Model):
    api_key = fields.Char(string='API Key', required=True)
    api_secret = fields.Char(string='API Secret', required=True)

Adapter Implementation:

import hmac
import hashlib
import base64
from datetime import datetime

class MyAdapter(GenericAdapter):
    def make_request(self, method, endpoint, **kwargs):
        """Add HMAC signature to request."""
        # Generate signature
        timestamp = str(int(datetime.now().timestamp()))
        signature = self._generate_signature(method, endpoint, timestamp, kwargs.get('data'))

        # Add to headers
        headers = kwargs.get('headers', {})
        headers.update({
            'X-API-Key': self.backend_record.api_key,
            'X-Signature': signature,
            'X-Timestamp': timestamp,
        })
        kwargs['headers'] = headers

        return super().make_request(method, endpoint, **kwargs)

    def _generate_signature(self, method, endpoint, timestamp, data=None):
        """Generate HMAC signature."""
        # Build signature string
        message_parts = [
            method.upper(),
            endpoint,
            timestamp,
        ]

        if data:
            import json
            message_parts.append(json.dumps(data, sort_keys=True))

        message = '\n'.join(message_parts)

        # Generate HMAC
        secret = self.backend_record.api_secret.encode('utf-8')
        signature = hmac.new(
            secret,
            message.encode('utf-8'),
            hashlib.sha256
        ).digest()

        # Return base64-encoded signature
        return base64.b64encode(signature).decode('utf-8')

6. JWT Authentication

Usage: JSON Web Tokens (stateless auth)

Backend Fields:

class MyBackend(models.Model):
    jwt_secret = fields.Char(string='JWT Secret', required=True)
    jwt_algorithm = fields.Selection([
        ('HS256', 'HMAC SHA-256'),
        ('RS256', 'RSA SHA-256'),
    ], default='HS256', string='Algorithm')
    jwt_expiration = fields.Integer(string='Token Expiration (seconds)', default=3600)

Adapter Implementation:

import jwt
from datetime import datetime, timedelta

class MyAdapter(GenericAdapter):
    def get_api_headers(self):
        headers = super().get_api_headers()
        token = self._generate_jwt()
        headers['Authorization'] = f'Bearer {token}'
        return headers

    def _generate_jwt(self):
        """Generate JWT token."""
        payload = {
            'iss': self.backend_record.api_key,  # Issuer
            'iat': datetime.utcnow(),  # Issued at
            'exp': datetime.utcnow() + timedelta(
                seconds=self.backend_record.jwt_expiration
            ),
        }

        return jwt.encode(
            payload,
            self.backend_record.jwt_secret,
            algorithm=self.backend_record.jwt_algorithm
        )

7. Store-Specific Headers (ZID Pattern)

Usage: Multi-tenant systems requiring store identification

Backend Fields:

class MyBackend(models.Model):
    store_id = fields.Char(string='Store ID', required=True)
    api_key = fields.Char(string='API Key', required=True)

Adapter Implementation:

class MyAdapter(GenericAdapter):
    def get_api_headers(self):
        headers = super().get_api_headers()
        headers.update({
            'X-Manager-Token': self.backend_record.api_key,
            'X-Store-Id': self.backend_record.store_id,
            'Accept': 'application/json',
        })
        return headers

Webhook Signature Verification

HMAC-SHA256 Verification

import hmac
import hashlib

class GenericWebhook(models.Model):
    def verify_signature(self, payload, signature, secret):
        """Verify webhook signature."""
        expected_signature = hmac.new(
            secret.encode('utf-8'),
            payload.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()

        return hmac.compare_digest(signature, expected_signature)

# In controller
class WebhookController(http.Controller):
    @http.route('/myconnector/webhook', type='json', auth='none', csrf=False)
    def webhook(self):
        payload = request.httprequest.get_data(as_text=True)
        signature = request.httprequest.headers.get('X-Signature')

        backend = self._find_backend()
        webhook_model = request.env['generic.webhook'].sudo()

        if not webhook_model.verify_signature(payload, signature, backend.webhook_secret):
            return {'error': 'Invalid signature'}, 401

        # Process webhook
        ...

Security Best Practices

  1. Never log credentials - Mask API keys/secrets in logs
  2. Use password fields - Set password=True for sensitive fields
  3. Rotate tokens - Implement token refresh before expiry
  4. Validate signatures - Always verify webhook signatures
  5. Use HTTPS - Never send credentials over HTTP
  6. Store securely - Consider using ir.config_parameter for secrets
  7. Limit scopes - Request minimum required OAuth scopes
  8. Handle expiry - Implement token refresh logic
  9. CSRF protection - Use state parameter in OAuth
  10. Rate limit - Implement rate limiting to prevent abuse