You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

317 lines
12 KiB

  1. # -*- coding: utf-8 -*-
  2. # © 2016 Antonio Espinosa - <antonio.espinosa@tecnativa.com>
  3. # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
  4. import mock
  5. from odoo.tools import mute_logger
  6. import base64
  7. import time
  8. from odoo import http
  9. from odoo.tests.common import TransactionCase
  10. from ..controllers.main import MailTrackingController, BLANK
  11. mock_send_email = ('odoo.addons.base.ir.ir_mail_server.'
  12. 'IrMailServer.send_email')
  13. class FakeUserAgent(object):
  14. browser = 'Test browser'
  15. platform = 'Test platform'
  16. def __str__(self):
  17. """Return name"""
  18. return 'Test suite'
  19. class TestMailTracking(TransactionCase):
  20. def setUp(self, *args, **kwargs):
  21. super(TestMailTracking, self).setUp(*args, **kwargs)
  22. self.sender = self.env['res.partner'].create({
  23. 'name': 'Test sender',
  24. 'email': 'sender@example.com',
  25. 'notify_email': 'always',
  26. })
  27. self.recipient = self.env['res.partner'].create({
  28. 'name': 'Test recipient',
  29. 'email': 'recipient@example.com',
  30. 'notify_email': 'always',
  31. })
  32. self.last_request = http.request
  33. http.request = type('obj', (object,), {
  34. 'db': self.env.cr.dbname,
  35. 'env': self.env,
  36. 'endpoint': type('obj', (object,), {
  37. 'routing': [],
  38. }),
  39. 'httprequest': type('obj', (object,), {
  40. 'remote_addr': '123.123.123.123',
  41. 'user_agent': FakeUserAgent(),
  42. }),
  43. })
  44. def tearDown(self, *args, **kwargs):
  45. http.request = self.last_request
  46. return super(TestMailTracking, self).tearDown(*args, **kwargs)
  47. def test_email_lower(self):
  48. self.recipient.write({'email': 'UPPER@example.com'})
  49. self.assertEqual('upper@example.com', self.recipient.email)
  50. def test_empty_email(self):
  51. self.recipient.write({'email_bounced': True})
  52. self.recipient.write({'email': False})
  53. self.assertEqual(False, self.recipient.email)
  54. self.assertEqual(False, self.recipient.email_bounced)
  55. self.recipient.write({'email_bounced': True})
  56. self.recipient.write({'email': ''})
  57. self.assertEqual(False, self.recipient.email)
  58. self.assertEqual(False, self.recipient.email_bounced)
  59. self.assertEqual(
  60. False,
  61. self.env['mail.tracking.email'].email_is_bounced(False))
  62. self.assertEqual(
  63. 0.,
  64. self.env['mail.tracking.email'].email_score_from_email(False))
  65. def test_recipient_address_compute(self):
  66. mail, tracking = self.mail_send(self.recipient.email)
  67. tracking.write({'recipient': False})
  68. self.assertEqual(False, tracking.recipient_address)
  69. def test_message_post(self):
  70. # This message will generate a notification for recipient
  71. message = self.env['mail.message'].create({
  72. 'subject': 'Message test',
  73. 'author_id': self.sender.id,
  74. 'email_from': self.sender.email,
  75. 'message_type': 'comment',
  76. 'model': 'res.partner',
  77. 'res_id': self.recipient.id,
  78. 'partner_ids': [(4, self.recipient.id)],
  79. 'body': '<p>This is a test message</p>',
  80. })
  81. # Search tracking created
  82. tracking_email = self.env['mail.tracking.email'].search([
  83. ('mail_message_id', '=', message.id),
  84. ('partner_id', '=', self.recipient.id),
  85. ])
  86. # The tracking email must be sent
  87. self.assertTrue(tracking_email)
  88. self.assertEqual(tracking_email.state, 'sent')
  89. # message_dict read by web interface
  90. message_dict = message.message_format()[0]
  91. self.assertTrue(len(message_dict['partner_ids']) > 0)
  92. # First partner is recipient
  93. partner_id = message_dict['partner_ids'][0][0]
  94. self.assertEqual(partner_id, self.recipient.id)
  95. status = message_dict['partner_trackings'][0]
  96. # Tracking status must be sent and
  97. # mail tracking must be the one search before
  98. self.assertEqual(status[0], 'sent')
  99. self.assertEqual(status[1], tracking_email.id)
  100. self.assertEqual(status[2], self.recipient.display_name)
  101. self.assertEqual(status[3], self.recipient.id)
  102. # And now open the email
  103. metadata = {
  104. 'ip': '127.0.0.1',
  105. 'user_agent': 'Odoo Test/1.0',
  106. 'os_family': 'linux',
  107. 'ua_family': 'odoo',
  108. }
  109. tracking_email.event_create('open', metadata)
  110. self.assertEqual(tracking_email.state, 'opened')
  111. def mail_send(self, recipient):
  112. mail = self.env['mail.mail'].create({
  113. 'subject': 'Test subject',
  114. 'email_from': 'from@domain.com',
  115. 'email_to': recipient,
  116. 'body_html': '<p>This is a test message</p>',
  117. })
  118. mail.send()
  119. # Search tracking created
  120. tracking_email = self.env['mail.tracking.email'].search([
  121. ('mail_id', '=', mail.id),
  122. ])
  123. return mail, tracking_email
  124. def test_mail_send(self):
  125. controller = MailTrackingController()
  126. db = self.env.cr.dbname
  127. image = base64.decodestring(BLANK)
  128. mail, tracking = self.mail_send(self.recipient.email)
  129. self.assertEqual(mail.email_to, tracking.recipient)
  130. self.assertEqual(mail.email_from, tracking.sender)
  131. res = controller.mail_tracking_open(db, tracking.id)
  132. self.assertEqual(image, res.response[0])
  133. # Two events: sent and open
  134. self.assertEqual(2, len(tracking.tracking_event_ids))
  135. # Fake event: tracking_email_id = False
  136. res = controller.mail_tracking_open(db, False)
  137. self.assertEqual(image, res.response[0])
  138. # Two events again because no tracking_email_id found for False
  139. self.assertEqual(2, len(tracking.tracking_event_ids))
  140. def test_concurrent_open(self):
  141. mail, tracking = self.mail_send(self.recipient.email)
  142. ts = time.time()
  143. metadata = {
  144. 'ip': '127.0.0.1',
  145. 'user_agent': 'Odoo Test/1.0',
  146. 'os_family': 'linux',
  147. 'ua_family': 'odoo',
  148. 'timestamp': ts,
  149. }
  150. # First open event
  151. tracking.event_create('open', metadata)
  152. opens = tracking.tracking_event_ids.filtered(
  153. lambda r: r.event_type == 'open'
  154. )
  155. self.assertEqual(len(opens), 1)
  156. # Concurrent open event
  157. metadata['timestamp'] = ts + 2
  158. tracking.event_create('open', metadata)
  159. opens = tracking.tracking_event_ids.filtered(
  160. lambda r: r.event_type == 'open'
  161. )
  162. self.assertEqual(len(opens), 1)
  163. # Second open event
  164. metadata['timestamp'] = ts + 350
  165. tracking.event_create('open', metadata)
  166. opens = tracking.tracking_event_ids.filtered(
  167. lambda r: r.event_type == 'open'
  168. )
  169. self.assertEqual(len(opens), 2)
  170. def test_concurrent_click(self):
  171. mail, tracking = self.mail_send(self.recipient.email)
  172. ts = time.time()
  173. metadata = {
  174. 'ip': '127.0.0.1',
  175. 'user_agent': 'Odoo Test/1.0',
  176. 'os_family': 'linux',
  177. 'ua_family': 'odoo',
  178. 'timestamp': ts,
  179. 'url': 'https://www.example.com/route/1',
  180. }
  181. # First click event (URL 1)
  182. tracking.event_create('click', metadata)
  183. opens = tracking.tracking_event_ids.filtered(
  184. lambda r: r.event_type == 'click'
  185. )
  186. self.assertEqual(len(opens), 1)
  187. # Concurrent click event (URL 1)
  188. metadata['timestamp'] = ts + 2
  189. tracking.event_create('click', metadata)
  190. opens = tracking.tracking_event_ids.filtered(
  191. lambda r: r.event_type == 'click'
  192. )
  193. self.assertEqual(len(opens), 1)
  194. # Second click event (URL 1)
  195. metadata['timestamp'] = ts + 350
  196. tracking.event_create('click', metadata)
  197. opens = tracking.tracking_event_ids.filtered(
  198. lambda r: r.event_type == 'click'
  199. )
  200. self.assertEqual(len(opens), 2)
  201. # Concurrent click event (URL 2)
  202. metadata['timestamp'] = ts + 2
  203. metadata['url'] = 'https://www.example.com/route/2'
  204. tracking.event_create('click', metadata)
  205. opens = tracking.tracking_event_ids.filtered(
  206. lambda r: r.event_type == 'click'
  207. )
  208. self.assertEqual(len(opens), 3)
  209. @mute_logger('odoo.addons.mail.models.mail_mail')
  210. def test_smtp_error(self):
  211. with mock.patch(mock_send_email) as mock_func:
  212. mock_func.side_effect = Warning('Test error')
  213. mail, tracking = self.mail_send(self.recipient.email)
  214. self.assertEqual('error', tracking.state)
  215. self.assertEqual('Warning', tracking.error_type)
  216. self.assertEqual('Test error', tracking.error_description)
  217. self.assertTrue(self.recipient.email_bounced)
  218. def test_partner_email_change(self):
  219. mail, tracking = self.mail_send(self.recipient.email)
  220. tracking.event_create('open', {})
  221. orig_score = self.recipient.email_score
  222. orig_count = self.recipient.tracking_emails_count
  223. orig_email = self.recipient.email
  224. self.recipient.email = orig_email + '2'
  225. self.assertEqual(50.0, self.recipient.email_score)
  226. self.assertEqual(0, self.recipient.tracking_emails_count)
  227. self.recipient.email = orig_email
  228. self.assertEqual(orig_score, self.recipient.email_score)
  229. self.assertEqual(orig_count, self.recipient.tracking_emails_count)
  230. def test_process_hard_bounce(self):
  231. mail, tracking = self.mail_send(self.recipient.email)
  232. tracking.event_create('hard_bounce', {})
  233. self.assertEqual('bounced', tracking.state)
  234. self.assertTrue(self.recipient.email_score < 50.0)
  235. def test_process_soft_bounce(self):
  236. mail, tracking = self.mail_send(self.recipient.email)
  237. tracking.event_create('soft_bounce', {})
  238. self.assertEqual('soft-bounced', tracking.state)
  239. self.assertTrue(self.recipient.email_score < 50.0)
  240. def test_process_delivered(self):
  241. mail, tracking = self.mail_send(self.recipient.email)
  242. tracking.event_create('delivered', {})
  243. self.assertEqual('delivered', tracking.state)
  244. self.assertTrue(self.recipient.email_score > 50.0)
  245. def test_process_deferral(self):
  246. mail, tracking = self.mail_send(self.recipient.email)
  247. tracking.event_create('deferral', {})
  248. self.assertEqual('deferred', tracking.state)
  249. def test_process_spam(self):
  250. mail, tracking = self.mail_send(self.recipient.email)
  251. tracking.event_create('spam', {})
  252. self.assertEqual('spam', tracking.state)
  253. self.assertTrue(self.recipient.email_score < 50.0)
  254. def test_process_unsub(self):
  255. mail, tracking = self.mail_send(self.recipient.email)
  256. tracking.event_create('unsub', {})
  257. self.assertEqual('unsub', tracking.state)
  258. self.assertTrue(self.recipient.email_score < 50.0)
  259. def test_process_reject(self):
  260. mail, tracking = self.mail_send(self.recipient.email)
  261. tracking.event_create('reject', {})
  262. self.assertEqual('rejected', tracking.state)
  263. self.assertTrue(self.recipient.email_score < 50.0)
  264. def test_process_open(self):
  265. mail, tracking = self.mail_send(self.recipient.email)
  266. tracking.event_create('open', {})
  267. self.assertEqual('opened', tracking.state)
  268. self.assertTrue(self.recipient.email_score > 50.0)
  269. def test_process_click(self):
  270. mail, tracking = self.mail_send(self.recipient.email)
  271. tracking.event_create('click', {})
  272. self.assertEqual('opened', tracking.state)
  273. self.assertTrue(self.recipient.email_score > 50.0)
  274. def test_process_several_bounce(self):
  275. for i in range(1, 10):
  276. mail, tracking = self.mail_send(self.recipient.email)
  277. tracking.event_create('hard_bounce', {})
  278. self.assertEqual('bounced', tracking.state)
  279. self.assertEqual(0.0, self.recipient.email_score)
  280. def test_db(self):
  281. db = self.env.cr.dbname
  282. controller = MailTrackingController()
  283. not_found = controller.mail_tracking_all('not_found_db')
  284. self.assertEqual('NOT FOUND', not_found.response[0])
  285. none = controller.mail_tracking_all(db)
  286. self.assertEqual('NONE', none.response[0])
  287. none = controller.mail_tracking_event(db, 'open')
  288. self.assertEqual('NONE', none.response[0])