You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

291 lines
11 KiB

  1. # -*- coding: utf-8 -*-
  2. # © 2016 Antonio Espinosa - <antonio.espinosa@tecnativa.com>
  3. # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
  4. import mock
  5. import base64
  6. import time
  7. from openerp import http
  8. from openerp.tests.common import TransactionCase
  9. from ..controllers.main import MailTrackingController, BLANK
  10. mock_send_email = ('openerp.addons.base.ir.ir_mail_server.'
  11. 'ir_mail_server.send_email')
  12. class FakeUserAgent(object):
  13. browser = 'Test browser'
  14. platform = 'Test platform'
  15. def __str__(self):
  16. """Return name"""
  17. return 'Test suite'
  18. class TestMailTracking(TransactionCase):
  19. def setUp(self, *args, **kwargs):
  20. super(TestMailTracking, self).setUp(*args, **kwargs)
  21. self.sender = self.env['res.partner'].create({
  22. 'name': 'Test sender',
  23. 'email': 'sender@example.com',
  24. 'notify_email': 'always',
  25. })
  26. self.recipient = self.env['res.partner'].create({
  27. 'name': 'Test recipient',
  28. 'email': 'recipient@example.com',
  29. 'notify_email': 'always',
  30. })
  31. self.last_request = http.request
  32. http.request = type('obj', (object,), {
  33. 'db': self.env.cr.dbname,
  34. 'env': self.env,
  35. 'endpoint': type('obj', (object,), {
  36. 'routing': [],
  37. }),
  38. 'httprequest': type('obj', (object,), {
  39. 'remote_addr': '123.123.123.123',
  40. 'user_agent': FakeUserAgent(),
  41. }),
  42. })
  43. def tearDown(self, *args, **kwargs):
  44. http.request = self.last_request
  45. return super(TestMailTracking, self).tearDown(*args, **kwargs)
  46. def test_message_post(self):
  47. # This message will generate a notification for recipient
  48. message = self.env['mail.message'].create({
  49. 'subject': 'Message test',
  50. 'author_id': self.sender.id,
  51. 'email_from': self.sender.email,
  52. 'type': 'comment',
  53. 'model': 'res.partner',
  54. 'res_id': self.recipient.id,
  55. 'partner_ids': [(4, self.recipient.id)],
  56. 'body': '<p>This is a test message</p>',
  57. })
  58. # Search tracking created
  59. tracking_email = self.env['mail.tracking.email'].search([
  60. ('mail_message_id', '=', message.id),
  61. ('partner_id', '=', self.recipient.id),
  62. ])
  63. # The tracking email must be sent
  64. self.assertTrue(tracking_email)
  65. self.assertEqual(tracking_email.state, 'sent')
  66. # message_dict read by web interface
  67. message_dict = self.env['mail.message'].message_read(message.id)
  68. # First item is message content
  69. self.assertTrue(len(message_dict) > 0)
  70. message_dict = message_dict[0]
  71. self.assertTrue(len(message_dict['partner_ids']) > 0)
  72. # First partner is recipient
  73. partner_id = message_dict['partner_ids'][0][0]
  74. self.assertEqual(partner_id, self.recipient.id)
  75. status = message_dict['partner_trackings'][str(partner_id)]
  76. # Tracking status must be sent and
  77. # mail tracking must be the one search before
  78. self.assertEqual(status[0], 'sent')
  79. self.assertEqual(status[1], tracking_email.id)
  80. # And now open the email
  81. metadata = {
  82. 'ip': '127.0.0.1',
  83. 'user_agent': 'Odoo Test/1.0',
  84. 'os_family': 'linux',
  85. 'ua_family': 'odoo',
  86. }
  87. tracking_email.event_create('open', metadata)
  88. self.assertEqual(tracking_email.state, 'opened')
  89. def mail_send(self, recipient):
  90. mail = self.env['mail.mail'].create({
  91. 'subject': 'Test subject',
  92. 'email_from': 'from@domain.com',
  93. 'email_to': recipient,
  94. 'body_html': '<p>This is a test message</p>',
  95. })
  96. mail.send()
  97. # Search tracking created
  98. tracking_email = self.env['mail.tracking.email'].search([
  99. ('mail_id', '=', mail.id),
  100. ])
  101. return mail, tracking_email
  102. def test_mail_send(self):
  103. controller = MailTrackingController()
  104. db = self.env.cr.dbname
  105. image = base64.decodestring(BLANK)
  106. mail, tracking = self.mail_send(self.recipient.email)
  107. self.assertEqual(mail.email_to, tracking.recipient)
  108. self.assertEqual(mail.email_from, tracking.sender)
  109. res = controller.mail_tracking_open(db, tracking.id)
  110. self.assertEqual(image, res.response[0])
  111. # Two events: sent and open
  112. self.assertEqual(2, len(tracking.tracking_event_ids))
  113. # Fake event: tracking_email_id = False
  114. res = controller.mail_tracking_open(db, False)
  115. self.assertEqual(image, res.response[0])
  116. # Two events again because no tracking_email_id found for False
  117. self.assertEqual(2, len(tracking.tracking_event_ids))
  118. def test_concurrent_open(self):
  119. mail, tracking = self.mail_send(self.recipient.email)
  120. ts = time.time()
  121. metadata = {
  122. 'ip': '127.0.0.1',
  123. 'user_agent': 'Odoo Test/1.0',
  124. 'os_family': 'linux',
  125. 'ua_family': 'odoo',
  126. 'timestamp': ts,
  127. }
  128. # First open event
  129. tracking.event_create('open', metadata)
  130. opens = tracking.tracking_event_ids.filtered(
  131. lambda r: r.event_type == 'open'
  132. )
  133. self.assertEqual(len(opens), 1)
  134. # Concurrent open event
  135. metadata['timestamp'] = ts + 2
  136. tracking.event_create('open', metadata)
  137. opens = tracking.tracking_event_ids.filtered(
  138. lambda r: r.event_type == 'open'
  139. )
  140. self.assertEqual(len(opens), 1)
  141. # Second open event
  142. metadata['timestamp'] = ts + 350
  143. tracking.event_create('open', metadata)
  144. opens = tracking.tracking_event_ids.filtered(
  145. lambda r: r.event_type == 'open'
  146. )
  147. self.assertEqual(len(opens), 2)
  148. def test_concurrent_click(self):
  149. mail, tracking = self.mail_send(self.recipient.email)
  150. ts = time.time()
  151. metadata = {
  152. 'ip': '127.0.0.1',
  153. 'user_agent': 'Odoo Test/1.0',
  154. 'os_family': 'linux',
  155. 'ua_family': 'odoo',
  156. 'timestamp': ts,
  157. 'url': 'https://www.example.com/route/1',
  158. }
  159. # First click event (URL 1)
  160. tracking.event_create('click', metadata)
  161. opens = tracking.tracking_event_ids.filtered(
  162. lambda r: r.event_type == 'click'
  163. )
  164. self.assertEqual(len(opens), 1)
  165. # Concurrent click event (URL 1)
  166. metadata['timestamp'] = ts + 2
  167. tracking.event_create('click', metadata)
  168. opens = tracking.tracking_event_ids.filtered(
  169. lambda r: r.event_type == 'click'
  170. )
  171. self.assertEqual(len(opens), 1)
  172. # Second click event (URL 1)
  173. metadata['timestamp'] = ts + 350
  174. tracking.event_create('click', metadata)
  175. opens = tracking.tracking_event_ids.filtered(
  176. lambda r: r.event_type == 'click'
  177. )
  178. self.assertEqual(len(opens), 2)
  179. # Concurrent click event (URL 2)
  180. metadata['timestamp'] = ts + 2
  181. metadata['url'] = 'https://www.example.com/route/2'
  182. tracking.event_create('click', metadata)
  183. opens = tracking.tracking_event_ids.filtered(
  184. lambda r: r.event_type == 'click'
  185. )
  186. self.assertEqual(len(opens), 3)
  187. def test_smtp_error(self):
  188. with mock.patch(mock_send_email) as mock_func:
  189. mock_func.side_effect = Warning('Test error')
  190. mail, tracking = self.mail_send(self.recipient.email)
  191. self.assertEqual('error', tracking.state)
  192. self.assertEqual('Warning', tracking.error_type)
  193. self.assertEqual('Test error', tracking.error_description)
  194. self.assertTrue(self.recipient.email_bounced)
  195. def test_partner_email_change(self):
  196. mail, tracking = self.mail_send(self.recipient.email)
  197. tracking.event_create('open', {})
  198. orig_score = self.recipient.email_score
  199. orig_count = self.recipient.tracking_emails_count
  200. orig_email = self.recipient.email
  201. self.recipient.email = orig_email + '2'
  202. self.assertEqual(50.0, self.recipient.email_score)
  203. self.assertEqual(0, self.recipient.tracking_emails_count)
  204. self.recipient.email = orig_email
  205. self.assertEqual(orig_score, self.recipient.email_score)
  206. self.assertEqual(orig_count, self.recipient.tracking_emails_count)
  207. def test_process_hard_bounce(self):
  208. mail, tracking = self.mail_send(self.recipient.email)
  209. tracking.event_create('hard_bounce', {})
  210. self.assertEqual('bounced', tracking.state)
  211. self.assertTrue(self.recipient.email_score < 50.0)
  212. def test_process_soft_bounce(self):
  213. mail, tracking = self.mail_send(self.recipient.email)
  214. tracking.event_create('soft_bounce', {})
  215. self.assertEqual('soft-bounced', tracking.state)
  216. self.assertTrue(self.recipient.email_score < 50.0)
  217. def test_process_delivered(self):
  218. mail, tracking = self.mail_send(self.recipient.email)
  219. tracking.event_create('delivered', {})
  220. self.assertEqual('delivered', tracking.state)
  221. self.assertTrue(self.recipient.email_score > 50.0)
  222. def test_process_deferral(self):
  223. mail, tracking = self.mail_send(self.recipient.email)
  224. tracking.event_create('deferral', {})
  225. self.assertEqual('deferred', tracking.state)
  226. def test_process_spam(self):
  227. mail, tracking = self.mail_send(self.recipient.email)
  228. tracking.event_create('spam', {})
  229. self.assertEqual('spam', tracking.state)
  230. self.assertTrue(self.recipient.email_score < 50.0)
  231. def test_process_unsub(self):
  232. mail, tracking = self.mail_send(self.recipient.email)
  233. tracking.event_create('unsub', {})
  234. self.assertEqual('unsub', tracking.state)
  235. self.assertTrue(self.recipient.email_score < 50.0)
  236. def test_process_reject(self):
  237. mail, tracking = self.mail_send(self.recipient.email)
  238. tracking.event_create('reject', {})
  239. self.assertEqual('rejected', tracking.state)
  240. self.assertTrue(self.recipient.email_score < 50.0)
  241. def test_process_open(self):
  242. mail, tracking = self.mail_send(self.recipient.email)
  243. tracking.event_create('open', {})
  244. self.assertEqual('opened', tracking.state)
  245. self.assertTrue(self.recipient.email_score > 50.0)
  246. def test_process_click(self):
  247. mail, tracking = self.mail_send(self.recipient.email)
  248. tracking.event_create('click', {})
  249. self.assertEqual('opened', tracking.state)
  250. self.assertTrue(self.recipient.email_score > 50.0)
  251. def test_process_several_bounce(self):
  252. for i in range(1, 10):
  253. mail, tracking = self.mail_send(self.recipient.email)
  254. tracking.event_create('hard_bounce', {})
  255. self.assertEqual('bounced', tracking.state)
  256. self.assertEqual(0.0, self.recipient.email_score)
  257. def test_db(self):
  258. db = self.env.cr.dbname
  259. controller = MailTrackingController()
  260. not_found = controller.mail_tracking_all('not_found_db')
  261. self.assertEqual('NOT FOUND', not_found.response[0])
  262. none = controller.mail_tracking_all(db)
  263. self.assertEqual('NONE', none.response[0])
  264. none = controller.mail_tracking_event(db, 'open')
  265. self.assertEqual('NONE', none.response[0])