# -*- coding: utf-8 -*- # Copyright 2016 Antonio Espinosa - # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). import hashlib import hmac import threading from datetime import datetime from openerp import models, api, fields import logging _logger = logging.getLogger(__name__) class MailTrackingEmail(models.Model): _inherit = "mail.tracking.email" def _country_search(self, country_code): country = False if country_code: country = self.env['res.country'].search([ ('code', '=ilike', country_code), ]) if country: return country.id return False def _mailgun_mandatory_fields(self): return ('event', 'timestamp', 'token', 'signature', 'tracking_email_id', 'odoo_db') def _mailgun_event_type_mapping(self): return { # Mailgun event type: tracking event type 'delivered': 'delivered', 'opened': 'open', 'clicked': 'click', 'unsubscribed': 'unsub', 'complained': 'spam', 'bounced': 'hard_bounce', 'dropped': 'reject', } def _mailgun_supported_event_types(self): return self._mailgun_event_type_mapping().keys() def _mailgun_event_type_verify(self, event): event = event or {} mailgun_event_type = event.get('event') if mailgun_event_type not in self._mailgun_supported_event_types(): _logger.info("Mailgun: event type '%s' not supported", mailgun_event_type) return False # OK, event type is valid return True def _mailgun_signature(self, api_key, timestamp, token): return hmac.new( key=str(api_key), msg='{}{}'.format(str(timestamp), str(token)), digestmod=hashlib.sha256).hexdigest() def _mailgun_signature_verify(self, event): event = event or {} api_key = self.env['ir.config_parameter'].get_param('mailgun.apikey') if not api_key: _logger.info("No Mailgun api key configured. " "Please add 'mailgun.apikey' to System parameters " "to enable Mailgun authentication webhoook requests. " "More info at: " "https://documentation.mailgun.com/user_manual.html" "#webhooks") else: timestamp = event.get('timestamp') token = event.get('token') signature = event.get('signature') event_digest = self._mailgun_signature(api_key, timestamp, token) if signature != event_digest: _logger.info("Mailgun: Invalid signature '%s' != '%s'", signature, event_digest) return False # OK, signature is valid return True def _db_verify(self, event): event = event or {} odoo_db = event.get('odoo_db') current_db = getattr(threading.currentThread(), 'dbname', None) if odoo_db != current_db: _logger.info("Mailgun: Database '%s' is not the current database", odoo_db) return False # OK, DB is current return True def _mailgun_metadata(self, mailgun_event_type, event, metadata): # Get Mailgun timestamp when found ts = event.get('timestamp', False) try: ts = float(ts) except: ts = False if ts: dt = datetime.utcfromtimestamp(ts) metadata.update({ 'timestamp': ts, 'time': fields.Datetime.to_string(dt), 'date': fields.Date.to_string(dt), }) # Common field mapping mapping = { 'recipient': 'recipient', 'ip': 'ip', 'user_agent': 'user-agent', 'os_family': 'client-os', 'ua_family': 'client-name', 'ua_type': 'client-type', 'url': 'url', } for k, v in mapping.iteritems(): if event.get(v, False): metadata[k] = event[v] # Special field mapping metadata.update({ 'mobile': event.get('device-type') in ('mobile', 'tablet'), 'user_country_id': self._country_search( event.get('country', False)), }) # Mapping for special events if mailgun_event_type == 'bounced': metadata.update({ 'error_type': event.get('code', False), 'error_description': event.get('error', False), 'error_details': event.get('notification', False), }) elif mailgun_event_type == 'dropped': metadata.update({ 'error_type': event.get('reason', False), 'error_description': event.get('code', False), 'error_details': event.get('description', False), }) elif mailgun_event_type == 'complained': metadata.update({ 'error_type': 'spam', 'error_description': "Recipient '%s' mark this email as spam" % event.get('recipient', False), }) return metadata def _mailgun_tracking_get(self, event): tracking = False tracking_email_id = event.get('tracking_email_id', False) if tracking_email_id and tracking_email_id.isdigit(): tracking = self.search([('id', '=', tracking_email_id)]) return tracking def _event_is_from_mailgun(self, event): event = event or {} return all([k in event for k in self._mailgun_mandatory_fields()]) @api.model def event_process(self, request, post, metadata, event_type=None): res = super(MailTrackingEmail, self).event_process( request, post, metadata, event_type=event_type) if res == 'NONE' and self._event_is_from_mailgun(post): if not self._mailgun_signature_verify(post): res = 'ERROR: Signature' elif not self._mailgun_event_type_verify(post): res = 'ERROR: Event type not supported' elif not self._db_verify(post): res = 'ERROR: Invalid DB' else: res = 'OK' if res == 'OK': mailgun_event_type = post.get('event') mapped_event_type = self._mailgun_event_type_mapping().get( mailgun_event_type) or event_type if not mapped_event_type: # pragma: no cover res = 'ERROR: Bad event' tracking = self._mailgun_tracking_get(post) if not tracking: res = 'ERROR: Tracking not found' if res == 'OK': # Complete metadata with mailgun event info metadata = self._mailgun_metadata( mailgun_event_type, post, metadata) # Create event tracking.event_create(mapped_event_type, metadata) if res != 'NONE': if event_type: _logger.info( "Mailgun: event '%s' process '%s'", event_type, res) else: _logger.info("Mailgun: event process '%s'", res) return res