You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

326 lines
13 KiB

  1. # Copyright 2016 Antonio Espinosa - <antonio.espinosa@tecnativa.com>
  2. # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
  3. import mock
  4. from odoo.tools import mute_logger
  5. import time
  6. from odoo import http
  7. from odoo.tests.common import TransactionCase
  8. from ..controllers.main import MailTrackingController, BLANK
  9. mock_send_email = ('odoo.addons.base.ir.ir_mail_server.'
  10. 'IrMailServer.send_email')
  11. class FakeUserAgent(object):
  12. browser = 'Test browser'
  13. platform = 'Test platform'
  14. def __str__(self):
  15. """Return name"""
  16. return 'Test suite'
  17. class TestMailTracking(TransactionCase):
  18. def setUp(self, *args, **kwargs):
  19. super(TestMailTracking, self).setUp(*args, **kwargs)
  20. self.sender = self.env['res.partner'].create({
  21. 'name': 'Test sender',
  22. 'email': 'sender@example.com',
  23. })
  24. self.recipient = self.env['res.partner'].create({
  25. 'name': 'Test recipient',
  26. 'email': 'recipient@example.com',
  27. })
  28. self.last_request = http.request
  29. http.request = type('obj', (object,), {
  30. 'db': self.env.cr.dbname,
  31. 'env': self.env,
  32. 'endpoint': type('obj', (object,), {
  33. 'routing': [],
  34. }),
  35. 'httprequest': type('obj', (object,), {
  36. 'remote_addr': '123.123.123.123',
  37. 'user_agent': FakeUserAgent(),
  38. }),
  39. })
  40. def tearDown(self, *args, **kwargs):
  41. http.request = self.last_request
  42. return super(TestMailTracking, self).tearDown(*args, **kwargs)
  43. def test_empty_email(self):
  44. self.recipient.write({'email_bounced': True})
  45. self.recipient.write({'email': False})
  46. self.assertEqual(False, self.recipient.email)
  47. self.assertEqual(False, self.recipient.email_bounced)
  48. self.recipient.write({'email_bounced': True})
  49. self.recipient.write({'email': ''})
  50. self.assertEqual(False, self.recipient.email_bounced)
  51. self.assertEqual(
  52. False,
  53. self.env['mail.tracking.email'].email_is_bounced(False))
  54. self.assertEqual(
  55. 0.,
  56. self.env['mail.tracking.email'].email_score_from_email(False))
  57. def test_recipient_address_compute(self):
  58. mail, tracking = self.mail_send(self.recipient.email)
  59. tracking.write({'recipient': False})
  60. self.assertEqual(False, tracking.recipient_address)
  61. def test_message_post(self):
  62. # This message will generate a notification for recipient
  63. message = self.env['mail.message'].create({
  64. 'subject': 'Message test',
  65. 'author_id': self.sender.id,
  66. 'email_from': self.sender.email,
  67. 'message_type': 'comment',
  68. 'model': 'res.partner',
  69. 'res_id': self.recipient.id,
  70. 'partner_ids': [(4, self.recipient.id)],
  71. 'body': '<p>This is a test message</p>',
  72. })
  73. # Search tracking created
  74. tracking_email = self.env['mail.tracking.email'].search([
  75. ('mail_message_id', '=', message.id),
  76. ('partner_id', '=', self.recipient.id),
  77. ])
  78. # The tracking email must be sent
  79. self.assertTrue(tracking_email)
  80. self.assertEqual(tracking_email.state, 'sent')
  81. # message_dict read by web interface
  82. message_dict = message.message_format()[0]
  83. self.assertTrue(len(message_dict['partner_ids']) > 0)
  84. # First partner is recipient
  85. partner_id = message_dict['partner_ids'][0][0]
  86. self.assertEqual(partner_id, self.recipient.id)
  87. status = message_dict['partner_trackings'][0]
  88. # Tracking status must be sent and
  89. # mail tracking must be the one search before
  90. self.assertEqual(status[0], 'sent')
  91. self.assertEqual(status[1], tracking_email.id)
  92. self.assertEqual(status[2], self.recipient.display_name)
  93. self.assertEqual(status[3], self.recipient.id)
  94. # And now open the email
  95. metadata = {
  96. 'ip': '127.0.0.1',
  97. 'user_agent': 'Odoo Test/1.0',
  98. 'os_family': 'linux',
  99. 'ua_family': 'odoo',
  100. }
  101. tracking_email.event_create('open', metadata)
  102. self.assertEqual(tracking_email.state, 'opened')
  103. def mail_send(self, recipient):
  104. mail = self.env['mail.mail'].create({
  105. 'subject': 'Test subject',
  106. 'email_from': 'from@domain.com',
  107. 'email_to': recipient,
  108. 'body_html': '<p>This is a test message</p>',
  109. })
  110. mail.send()
  111. # Search tracking created
  112. tracking_email = self.env['mail.tracking.email'].search([
  113. ('mail_id', '=', mail.id),
  114. ])
  115. return mail, tracking_email
  116. def test_mail_send(self):
  117. controller = MailTrackingController()
  118. db = self.env.cr.dbname
  119. image = BLANK
  120. mail, tracking = self.mail_send(self.recipient.email)
  121. self.assertEqual(mail.email_to, tracking.recipient)
  122. self.assertEqual(mail.email_from, tracking.sender)
  123. res = controller.mail_tracking_open(db, tracking.id)
  124. self.assertEqual(image, res.response[0])
  125. # Two events: sent and open
  126. self.assertEqual(2, len(tracking.tracking_event_ids))
  127. # Fake event: tracking_email_id = False
  128. res = controller.mail_tracking_open(db, False)
  129. self.assertEqual(image, res.response[0])
  130. # Two events again because no tracking_email_id found for False
  131. self.assertEqual(2, len(tracking.tracking_event_ids))
  132. def test_concurrent_open(self):
  133. mail, tracking = self.mail_send(self.recipient.email)
  134. ts = time.time()
  135. metadata = {
  136. 'ip': '127.0.0.1',
  137. 'user_agent': 'Odoo Test/1.0',
  138. 'os_family': 'linux',
  139. 'ua_family': 'odoo',
  140. 'timestamp': ts,
  141. }
  142. # First open event
  143. tracking.event_create('open', metadata)
  144. opens = tracking.tracking_event_ids.filtered(
  145. lambda r: r.event_type == 'open'
  146. )
  147. self.assertEqual(len(opens), 1)
  148. # Concurrent open event
  149. metadata['timestamp'] = ts + 2
  150. tracking.event_create('open', metadata)
  151. opens = tracking.tracking_event_ids.filtered(
  152. lambda r: r.event_type == 'open'
  153. )
  154. self.assertEqual(len(opens), 1)
  155. # Second open event
  156. metadata['timestamp'] = ts + 350
  157. tracking.event_create('open', metadata)
  158. opens = tracking.tracking_event_ids.filtered(
  159. lambda r: r.event_type == 'open'
  160. )
  161. self.assertEqual(len(opens), 2)
  162. def test_concurrent_click(self):
  163. mail, tracking = self.mail_send(self.recipient.email)
  164. ts = time.time()
  165. metadata = {
  166. 'ip': '127.0.0.1',
  167. 'user_agent': 'Odoo Test/1.0',
  168. 'os_family': 'linux',
  169. 'ua_family': 'odoo',
  170. 'timestamp': ts,
  171. 'url': 'https://www.example.com/route/1',
  172. }
  173. # First click event (URL 1)
  174. tracking.event_create('click', metadata)
  175. opens = tracking.tracking_event_ids.filtered(
  176. lambda r: r.event_type == 'click'
  177. )
  178. self.assertEqual(len(opens), 1)
  179. # Concurrent click event (URL 1)
  180. metadata['timestamp'] = ts + 2
  181. tracking.event_create('click', metadata)
  182. opens = tracking.tracking_event_ids.filtered(
  183. lambda r: r.event_type == 'click'
  184. )
  185. self.assertEqual(len(opens), 1)
  186. # Second click event (URL 1)
  187. metadata['timestamp'] = ts + 350
  188. tracking.event_create('click', metadata)
  189. opens = tracking.tracking_event_ids.filtered(
  190. lambda r: r.event_type == 'click'
  191. )
  192. self.assertEqual(len(opens), 2)
  193. # Concurrent click event (URL 2)
  194. metadata['timestamp'] = ts + 2
  195. metadata['url'] = 'https://www.example.com/route/2'
  196. tracking.event_create('click', metadata)
  197. opens = tracking.tracking_event_ids.filtered(
  198. lambda r: r.event_type == 'click'
  199. )
  200. self.assertEqual(len(opens), 3)
  201. @mute_logger('odoo.addons.mail.models.mail_mail')
  202. def test_smtp_error(self):
  203. with mock.patch(mock_send_email) as mock_func:
  204. mock_func.side_effect = Warning('Test error')
  205. mail, tracking = self.mail_send(self.recipient.email)
  206. self.assertEqual('error', tracking.state)
  207. self.assertEqual('Warning', tracking.error_type)
  208. self.assertEqual('Test error', tracking.error_description)
  209. self.assertTrue(self.recipient.email_bounced)
  210. def test_partner_email_change(self):
  211. mail, tracking = self.mail_send(self.recipient.email)
  212. tracking.event_create('open', {})
  213. orig_score = self.recipient.email_score
  214. orig_count = self.recipient.tracking_emails_count
  215. orig_email = self.recipient.email
  216. self.recipient.email = orig_email + '2'
  217. self.assertEqual(50.0, self.recipient.email_score)
  218. self.assertEqual(0, self.recipient.tracking_emails_count)
  219. self.recipient.email = orig_email
  220. self.assertEqual(orig_score, self.recipient.email_score)
  221. self.assertEqual(orig_count, self.recipient.tracking_emails_count)
  222. def test_process_hard_bounce(self):
  223. mail, tracking = self.mail_send(self.recipient.email)
  224. tracking.event_create('hard_bounce', {})
  225. self.assertEqual('bounced', tracking.state)
  226. self.assertTrue(self.recipient.email_score < 50.0)
  227. def test_process_soft_bounce(self):
  228. mail, tracking = self.mail_send(self.recipient.email)
  229. tracking.event_create('soft_bounce', {})
  230. self.assertEqual('soft-bounced', tracking.state)
  231. self.assertTrue(self.recipient.email_score < 50.0)
  232. def test_process_delivered(self):
  233. mail, tracking = self.mail_send(self.recipient.email)
  234. tracking.event_create('delivered', {})
  235. self.assertEqual('delivered', tracking.state)
  236. self.assertTrue(self.recipient.email_score > 50.0)
  237. def test_process_deferral(self):
  238. mail, tracking = self.mail_send(self.recipient.email)
  239. tracking.event_create('deferral', {})
  240. self.assertEqual('deferred', tracking.state)
  241. def test_process_spam(self):
  242. mail, tracking = self.mail_send(self.recipient.email)
  243. tracking.event_create('spam', {})
  244. self.assertEqual('spam', tracking.state)
  245. self.assertTrue(self.recipient.email_score < 50.0)
  246. def test_process_unsub(self):
  247. mail, tracking = self.mail_send(self.recipient.email)
  248. tracking.event_create('unsub', {})
  249. self.assertEqual('unsub', tracking.state)
  250. self.assertTrue(self.recipient.email_score < 50.0)
  251. def test_process_reject(self):
  252. mail, tracking = self.mail_send(self.recipient.email)
  253. tracking.event_create('reject', {})
  254. self.assertEqual('rejected', tracking.state)
  255. self.assertTrue(self.recipient.email_score < 50.0)
  256. def test_process_open(self):
  257. mail, tracking = self.mail_send(self.recipient.email)
  258. tracking.event_create('open', {})
  259. self.assertEqual('opened', tracking.state)
  260. self.assertTrue(self.recipient.email_score > 50.0)
  261. def test_process_click(self):
  262. mail, tracking = self.mail_send(self.recipient.email)
  263. tracking.event_create('click', {})
  264. self.assertEqual('opened', tracking.state)
  265. self.assertTrue(self.recipient.email_score > 50.0)
  266. def test_process_several_bounce(self):
  267. for i in range(1, 10):
  268. mail, tracking = self.mail_send(self.recipient.email)
  269. tracking.event_create('hard_bounce', {})
  270. self.assertEqual('bounced', tracking.state)
  271. self.assertEqual(0.0, self.recipient.email_score)
  272. def test_bounce_new_partner(self):
  273. mail, tracking = self.mail_send(self.recipient.email)
  274. tracking.event_create('hard_bounce', {})
  275. new_partner = self.env['res.partner'].create({
  276. 'name': 'Test New Partner',
  277. })
  278. new_partner.email = self.recipient.email
  279. self.assertTrue(new_partner.email_bounced)
  280. def test_recordset_email_score(self):
  281. """For backwords compatibility sake"""
  282. trackings = self.env['mail.tracking.email']
  283. for i in range(11):
  284. mail, tracking = self.mail_send(self.recipient.email)
  285. tracking.event_create('click', {})
  286. trackings |= tracking
  287. self.assertEqual(100.0, trackings.email_score())
  288. def test_db(self):
  289. db = self.env.cr.dbname
  290. controller = MailTrackingController()
  291. not_found = controller.mail_tracking_all('not_found_db')
  292. self.assertEqual(b'NOT FOUND', not_found.response[0])
  293. none = controller.mail_tracking_all(db)
  294. self.assertEqual(b'NONE', none.response[0])
  295. none = controller.mail_tracking_event(db, 'open')
  296. self.assertEqual(b'NONE', none.response[0])