# Authentication Patterns for Odoo Connectors ## 1. API Key Authentication **Usage**: Simple, static authentication **Backend Fields**: ```python class MyBackend(models.Model): api_key = fields.Char(string='API Key', required=True) api_secret = fields.Char(string='API Secret') # Optional ``` **Adapter Implementation**: ```python class MyAdapter(GenericAdapter): def get_api_headers(self): headers = super().get_api_headers() headers['X-API-Key'] = self.backend_record.api_key return headers ``` **Variants**: - Header-based: `Authorization: ApiKey YOUR_KEY` - Query parameter: `?api_key=YOUR_KEY` - Custom header: `X-API-Key: YOUR_KEY` ## 2. Bearer Token Authentication **Usage**: Token-based auth (common in modern APIs) **Backend Fields**: ```python class MyBackend(models.Model): access_token = fields.Char(string='Access Token') ``` **Adapter Implementation**: ```python class MyAdapter(GenericAdapter): def get_api_headers(self): headers = super().get_api_headers() headers['Authorization'] = f'Bearer {self.backend_record.access_token}' return headers ``` ## 3. OAuth 2.0 Authentication **Usage**: Delegated authorization (Shopify, Google, etc.) ### Authorization Code Flow **Backend Fields**: ```python class MyBackend(models.Model): oauth_client_id = fields.Char(string='Client ID', required=True) oauth_client_secret = fields.Char(string='Client Secret', required=True) oauth_redirect_uri = fields.Char(string='Redirect URI', compute='_compute_redirect_uri') access_token = fields.Char(string='Access Token', readonly=True) refresh_token = fields.Char(string='Refresh Token', readonly=True) token_expires_at = fields.Datetime(string='Token Expires At', readonly=True) token_type = fields.Char(string='Token Type', readonly=True, default='Bearer') @api.depends() def _compute_redirect_uri(self): """Compute OAuth redirect URI.""" for backend in self: base_url = backend.env['ir.config_parameter'].sudo().get_param('web.base.url') backend.oauth_redirect_uri = f'{base_url}/myconnector/oauth/callback' def action_start_oauth_flow(self): """Start OAuth authorization flow.""" self.ensure_one() auth_url = self._build_authorization_url() return { 'type': 'ir.actions.act_url', 'url': auth_url, 'target': 'new', } def _build_authorization_url(self): """Build OAuth authorization URL.""" from urllib.parse import urlencode params = { 'client_id': self.oauth_client_id, 'redirect_uri': self.oauth_redirect_uri, 'response_type': 'code', 'scope': 'read_products write_orders', # Adjust scopes 'state': self._generate_oauth_state(), } return f'{self.api_url}/oauth/authorize?{urlencode(params)}' def _generate_oauth_state(self): """Generate OAuth state parameter for CSRF protection.""" import secrets state = secrets.token_urlsafe(32) # Store state in session or database for validation self.env['ir.config_parameter'].sudo().set_param( f'oauth_state_{self.id}', state ) return state def exchange_code_for_token(self, code, state): """Exchange authorization code for access token.""" self.ensure_one() # Validate state stored_state = self.env['ir.config_parameter'].sudo().get_param( f'oauth_state_{self.id}' ) if state != stored_state: raise ValueError('Invalid OAuth state') # Exchange code for token token_url = f'{self.api_url}/oauth/token' data = { 'client_id': self.oauth_client_id, 'client_secret': self.oauth_client_secret, 'code': code, 'redirect_uri': self.oauth_redirect_uri, 'grant_type': 'authorization_code', } response = requests.post(token_url, data=data) response.raise_for_status() token_data = response.json() self._save_token_data(token_data) def _save_token_data(self, token_data): """Save OAuth token data.""" from datetime import datetime, timedelta expires_in = token_data.get('expires_in', 3600) expires_at = datetime.now() + timedelta(seconds=expires_in) self.write({ 'access_token': token_data['access_token'], 'refresh_token': token_data.get('refresh_token'), 'token_expires_at': expires_at, 'token_type': token_data.get('token_type', 'Bearer'), }) def refresh_access_token(self): """Refresh expired access token.""" self.ensure_one() if not self.refresh_token: raise ValueError('No refresh token available') token_url = f'{self.api_url}/oauth/token' data = { 'client_id': self.oauth_client_id, 'client_secret': self.oauth_client_secret, 'refresh_token': self.refresh_token, 'grant_type': 'refresh_token', } response = requests.post(token_url, data=data) response.raise_for_status() token_data = response.json() self._save_token_data(token_data) ``` **OAuth Callback Controller**: ```python from odoo import http from odoo.http import request class MyConnectorOAuthController(http.Controller): @http.route('/myconnector/oauth/callback', type='http', auth='user', csrf=False) def oauth_callback(self, code=None, state=None, error=None): """Handle OAuth callback.""" if error: return request.render('myconnector.oauth_error', {'error': error}) if not code or not state: return request.render('myconnector.oauth_error', {'error': 'Missing code or state'}) # Find backend by state or use session backend_id = request.session.get('oauth_backend_id') if not backend_id: return request.render('myconnector.oauth_error', {'error': 'Invalid session'}) backend = request.env['myconnector.backend'].sudo().browse(backend_id) try: backend.exchange_code_for_token(code, state) return request.render('myconnector.oauth_success') except Exception as e: return request.render('myconnector.oauth_error', {'error': str(e)}) ``` **Adapter with Token Refresh**: ```python class MyAdapter(GenericAdapter): def make_request(self, method, endpoint, **kwargs): """Make request with automatic token refresh.""" # Check if token is expired if self._is_token_expired(): self.backend_record.refresh_access_token() return super().make_request(method, endpoint, **kwargs) def _is_token_expired(self): """Check if access token is expired.""" from datetime import datetime, timedelta if not self.backend_record.token_expires_at: return False # Refresh 5 minutes before expiry buffer = timedelta(minutes=5) return datetime.now() + buffer >= self.backend_record.token_expires_at def get_api_headers(self): headers = super().get_api_headers() headers['Authorization'] = ( f'{self.backend_record.token_type} {self.backend_record.access_token}' ) return headers ``` ## 4. Basic Authentication **Usage**: Username/password (less common, less secure) **Backend Fields**: ```python class MyBackend(models.Model): api_username = fields.Char(string='Username', required=True) api_password = fields.Char(string='Password', required=True) ``` **Adapter Implementation**: ```python class MyAdapter(GenericAdapter): def get_api_auth(self): """Return (username, password) tuple for requests.""" return ( self.backend_record.api_username, self.backend_record.api_password ) def make_request(self, method, endpoint, **kwargs): """Add basic auth to requests.""" kwargs['auth'] = self.get_api_auth() return super().make_request(method, endpoint, **kwargs) ``` ## 5. HMAC Signature Authentication **Usage**: Signed requests (high security) **Backend Fields**: ```python class MyBackend(models.Model): api_key = fields.Char(string='API Key', required=True) api_secret = fields.Char(string='API Secret', required=True) ``` **Adapter Implementation**: ```python import hmac import hashlib import base64 from datetime import datetime class MyAdapter(GenericAdapter): def make_request(self, method, endpoint, **kwargs): """Add HMAC signature to request.""" # Generate signature timestamp = str(int(datetime.now().timestamp())) signature = self._generate_signature(method, endpoint, timestamp, kwargs.get('data')) # Add to headers headers = kwargs.get('headers', {}) headers.update({ 'X-API-Key': self.backend_record.api_key, 'X-Signature': signature, 'X-Timestamp': timestamp, }) kwargs['headers'] = headers return super().make_request(method, endpoint, **kwargs) def _generate_signature(self, method, endpoint, timestamp, data=None): """Generate HMAC signature.""" # Build signature string message_parts = [ method.upper(), endpoint, timestamp, ] if data: import json message_parts.append(json.dumps(data, sort_keys=True)) message = '\n'.join(message_parts) # Generate HMAC secret = self.backend_record.api_secret.encode('utf-8') signature = hmac.new( secret, message.encode('utf-8'), hashlib.sha256 ).digest() # Return base64-encoded signature return base64.b64encode(signature).decode('utf-8') ``` ## 6. JWT Authentication **Usage**: JSON Web Tokens (stateless auth) **Backend Fields**: ```python class MyBackend(models.Model): jwt_secret = fields.Char(string='JWT Secret', required=True) jwt_algorithm = fields.Selection([ ('HS256', 'HMAC SHA-256'), ('RS256', 'RSA SHA-256'), ], default='HS256', string='Algorithm') jwt_expiration = fields.Integer(string='Token Expiration (seconds)', default=3600) ``` **Adapter Implementation**: ```python import jwt from datetime import datetime, timedelta class MyAdapter(GenericAdapter): def get_api_headers(self): headers = super().get_api_headers() token = self._generate_jwt() headers['Authorization'] = f'Bearer {token}' return headers def _generate_jwt(self): """Generate JWT token.""" payload = { 'iss': self.backend_record.api_key, # Issuer 'iat': datetime.utcnow(), # Issued at 'exp': datetime.utcnow() + timedelta( seconds=self.backend_record.jwt_expiration ), } return jwt.encode( payload, self.backend_record.jwt_secret, algorithm=self.backend_record.jwt_algorithm ) ``` ## 7. Store-Specific Headers (ZID Pattern) **Usage**: Multi-tenant systems requiring store identification **Backend Fields**: ```python class MyBackend(models.Model): store_id = fields.Char(string='Store ID', required=True) api_key = fields.Char(string='API Key', required=True) ``` **Adapter Implementation**: ```python class MyAdapter(GenericAdapter): def get_api_headers(self): headers = super().get_api_headers() headers.update({ 'X-Manager-Token': self.backend_record.api_key, 'X-Store-Id': self.backend_record.store_id, 'Accept': 'application/json', }) return headers ``` ## Webhook Signature Verification ### HMAC-SHA256 Verification ```python import hmac import hashlib class GenericWebhook(models.Model): def verify_signature(self, payload, signature, secret): """Verify webhook signature.""" expected_signature = hmac.new( secret.encode('utf-8'), payload.encode('utf-8'), hashlib.sha256 ).hexdigest() return hmac.compare_digest(signature, expected_signature) # In controller class WebhookController(http.Controller): @http.route('/myconnector/webhook', type='json', auth='none', csrf=False) def webhook(self): payload = request.httprequest.get_data(as_text=True) signature = request.httprequest.headers.get('X-Signature') backend = self._find_backend() webhook_model = request.env['generic.webhook'].sudo() if not webhook_model.verify_signature(payload, signature, backend.webhook_secret): return {'error': 'Invalid signature'}, 401 # Process webhook ... ``` ## Security Best Practices 1. **Never log credentials** - Mask API keys/secrets in logs 2. **Use password fields** - Set `password=True` for sensitive fields 3. **Rotate tokens** - Implement token refresh before expiry 4. **Validate signatures** - Always verify webhook signatures 5. **Use HTTPS** - Never send credentials over HTTP 6. **Store securely** - Consider using `ir.config_parameter` for secrets 7. **Limit scopes** - Request minimum required OAuth scopes 8. **Handle expiry** - Implement token refresh logic 9. **CSRF protection** - Use state parameter in OAuth 10. **Rate limit** - Implement rate limiting to prevent abuse