You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

327 lines
13 KiB

  1. # Copyright 2016 Antonio Espinosa - <antonio.espinosa@tecnativa.com>
  2. # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
  3. import mock
  4. from odoo.tools import mute_logger
  5. import time
  6. from odoo import http
  7. from odoo.tests.common import TransactionCase
  8. from ..controllers.main import MailTrackingController, BLANK
  9. mock_send_email = ('odoo.addons.base.models.ir_mail_server.'
  10. 'IrMailServer.send_email')
  11. class FakeUserAgent(object):
  12. browser = 'Test browser'
  13. platform = 'Test platform'
  14. def __str__(self):
  15. """Return name"""
  16. return 'Test suite'
  17. class TestMailTracking(TransactionCase):
  18. def setUp(self, *args, **kwargs):
  19. super(TestMailTracking, self).setUp(*args, **kwargs)
  20. self.sender = self.env['res.partner'].create({
  21. 'name': 'Test sender',
  22. 'email': 'sender@example.com',
  23. })
  24. self.recipient = self.env['res.partner'].create({
  25. 'name': 'Test recipient',
  26. 'email': 'recipient@example.com',
  27. })
  28. self.last_request = http.request
  29. http.request = type('obj', (object,), {
  30. 'db': self.env.cr.dbname,
  31. 'env': self.env,
  32. 'endpoint': type('obj', (object,), {
  33. 'routing': [],
  34. }),
  35. 'httprequest': type('obj', (object,), {
  36. 'remote_addr': '123.123.123.123',
  37. 'user_agent': FakeUserAgent(),
  38. }),
  39. })
  40. def tearDown(self, *args, **kwargs):
  41. http.request = self.last_request
  42. return super(TestMailTracking, self).tearDown(*args, **kwargs)
  43. def test_empty_email(self):
  44. self.recipient.write({'email_bounced': True})
  45. self.recipient.write({'email': False})
  46. self.assertEqual(False, self.recipient.email)
  47. self.assertEqual(False, self.recipient.email_bounced)
  48. self.recipient.write({'email_bounced': True})
  49. self.recipient.write({'email': ''})
  50. self.assertEqual(False, self.recipient.email_bounced)
  51. self.assertEqual(
  52. False,
  53. self.env['mail.tracking.email'].email_is_bounced(False))
  54. self.assertEqual(
  55. 0.,
  56. self.env['mail.tracking.email'].email_score_from_email(False))
  57. def test_recipient_address_compute(self):
  58. mail, tracking = self.mail_send(self.recipient.email)
  59. tracking.write({'recipient': False})
  60. self.assertEqual(False, tracking.recipient_address)
  61. def test_message_post(self):
  62. # This message will generate a notification for recipient
  63. message = self.env['mail.message'].create({
  64. 'subject': 'Message test',
  65. 'author_id': self.sender.id,
  66. 'email_from': self.sender.email,
  67. 'message_type': 'comment',
  68. 'model': 'res.partner',
  69. 'res_id': self.recipient.id,
  70. 'partner_ids': [(4, self.recipient.id)],
  71. 'body': '<p>This is a test message</p>',
  72. })
  73. message._notify(message, {}, force_send=True)
  74. # Search tracking created
  75. tracking_email = self.env['mail.tracking.email'].search([
  76. ('mail_message_id', '=', message.id),
  77. ('partner_id', '=', self.recipient.id),
  78. ])
  79. # The tracking email must be sent
  80. self.assertTrue(tracking_email)
  81. self.assertEqual(tracking_email.state, 'sent')
  82. # message_dict read by web interface
  83. message_dict = message.message_format()[0]
  84. self.assertTrue(len(message_dict['partner_ids']) > 0)
  85. # First partner is recipient
  86. partner_id = message_dict['partner_ids'][0][0]
  87. self.assertEqual(partner_id, self.recipient.id)
  88. status = message_dict['partner_trackings'][0]
  89. # Tracking status must be sent and
  90. # mail tracking must be the one search before
  91. self.assertEqual(status[0], 'sent')
  92. self.assertEqual(status[1], tracking_email.id)
  93. self.assertEqual(status[2], self.recipient.display_name)
  94. self.assertEqual(status[3], self.recipient.id)
  95. # And now open the email
  96. metadata = {
  97. 'ip': '127.0.0.1',
  98. 'user_agent': 'Odoo Test/1.0',
  99. 'os_family': 'linux',
  100. 'ua_family': 'odoo',
  101. }
  102. tracking_email.event_create('open', metadata)
  103. self.assertEqual(tracking_email.state, 'opened')
  104. def mail_send(self, recipient):
  105. mail = self.env['mail.mail'].create({
  106. 'subject': 'Test subject',
  107. 'email_from': 'from@domain.com',
  108. 'email_to': recipient,
  109. 'body_html': '<p>This is a test message</p>',
  110. })
  111. mail.send()
  112. # Search tracking created
  113. tracking_email = self.env['mail.tracking.email'].search([
  114. ('mail_id', '=', mail.id),
  115. ])
  116. return mail, tracking_email
  117. def test_mail_send(self):
  118. controller = MailTrackingController()
  119. db = self.env.cr.dbname
  120. image = BLANK
  121. mail, tracking = self.mail_send(self.recipient.email)
  122. self.assertEqual(mail.email_to, tracking.recipient)
  123. self.assertEqual(mail.email_from, tracking.sender)
  124. res = controller.mail_tracking_open(db, tracking.id)
  125. self.assertEqual(image, res.response[0])
  126. # Two events: sent and open
  127. self.assertEqual(2, len(tracking.tracking_event_ids))
  128. # Fake event: tracking_email_id = False
  129. res = controller.mail_tracking_open(db, False)
  130. self.assertEqual(image, res.response[0])
  131. # Two events again because no tracking_email_id found for False
  132. self.assertEqual(2, len(tracking.tracking_event_ids))
  133. def test_concurrent_open(self):
  134. mail, tracking = self.mail_send(self.recipient.email)
  135. ts = time.time()
  136. metadata = {
  137. 'ip': '127.0.0.1',
  138. 'user_agent': 'Odoo Test/1.0',
  139. 'os_family': 'linux',
  140. 'ua_family': 'odoo',
  141. 'timestamp': ts,
  142. }
  143. # First open event
  144. tracking.event_create('open', metadata)
  145. opens = tracking.tracking_event_ids.filtered(
  146. lambda r: r.event_type == 'open'
  147. )
  148. self.assertEqual(len(opens), 1)
  149. # Concurrent open event
  150. metadata['timestamp'] = ts + 2
  151. tracking.event_create('open', metadata)
  152. opens = tracking.tracking_event_ids.filtered(
  153. lambda r: r.event_type == 'open'
  154. )
  155. self.assertEqual(len(opens), 1)
  156. # Second open event
  157. metadata['timestamp'] = ts + 350
  158. tracking.event_create('open', metadata)
  159. opens = tracking.tracking_event_ids.filtered(
  160. lambda r: r.event_type == 'open'
  161. )
  162. self.assertEqual(len(opens), 2)
  163. def test_concurrent_click(self):
  164. mail, tracking = self.mail_send(self.recipient.email)
  165. ts = time.time()
  166. metadata = {
  167. 'ip': '127.0.0.1',
  168. 'user_agent': 'Odoo Test/1.0',
  169. 'os_family': 'linux',
  170. 'ua_family': 'odoo',
  171. 'timestamp': ts,
  172. 'url': 'https://www.example.com/route/1',
  173. }
  174. # First click event (URL 1)
  175. tracking.event_create('click', metadata)
  176. opens = tracking.tracking_event_ids.filtered(
  177. lambda r: r.event_type == 'click'
  178. )
  179. self.assertEqual(len(opens), 1)
  180. # Concurrent click event (URL 1)
  181. metadata['timestamp'] = ts + 2
  182. tracking.event_create('click', metadata)
  183. opens = tracking.tracking_event_ids.filtered(
  184. lambda r: r.event_type == 'click'
  185. )
  186. self.assertEqual(len(opens), 1)
  187. # Second click event (URL 1)
  188. metadata['timestamp'] = ts + 350
  189. tracking.event_create('click', metadata)
  190. opens = tracking.tracking_event_ids.filtered(
  191. lambda r: r.event_type == 'click'
  192. )
  193. self.assertEqual(len(opens), 2)
  194. # Concurrent click event (URL 2)
  195. metadata['timestamp'] = ts + 2
  196. metadata['url'] = 'https://www.example.com/route/2'
  197. tracking.event_create('click', metadata)
  198. opens = tracking.tracking_event_ids.filtered(
  199. lambda r: r.event_type == 'click'
  200. )
  201. self.assertEqual(len(opens), 3)
  202. @mute_logger('odoo.addons.mail.models.mail_mail')
  203. def test_smtp_error(self):
  204. with mock.patch(mock_send_email) as mock_func:
  205. mock_func.side_effect = Warning('Test error')
  206. mail, tracking = self.mail_send(self.recipient.email)
  207. self.assertEqual('error', tracking.state)
  208. self.assertEqual('Warning', tracking.error_type)
  209. self.assertEqual('Test error', tracking.error_description)
  210. self.assertTrue(self.recipient.email_bounced)
  211. def test_partner_email_change(self):
  212. mail, tracking = self.mail_send(self.recipient.email)
  213. tracking.event_create('open', {})
  214. orig_score = self.recipient.email_score
  215. orig_count = self.recipient.tracking_emails_count
  216. orig_email = self.recipient.email
  217. self.recipient.email = orig_email + '2'
  218. self.assertEqual(50.0, self.recipient.email_score)
  219. self.assertEqual(0, self.recipient.tracking_emails_count)
  220. self.recipient.email = orig_email
  221. self.assertEqual(orig_score, self.recipient.email_score)
  222. self.assertEqual(orig_count, self.recipient.tracking_emails_count)
  223. def test_process_hard_bounce(self):
  224. mail, tracking = self.mail_send(self.recipient.email)
  225. tracking.event_create('hard_bounce', {})
  226. self.assertEqual('bounced', tracking.state)
  227. self.assertTrue(self.recipient.email_score < 50.0)
  228. def test_process_soft_bounce(self):
  229. mail, tracking = self.mail_send(self.recipient.email)
  230. tracking.event_create('soft_bounce', {})
  231. self.assertEqual('soft-bounced', tracking.state)
  232. self.assertTrue(self.recipient.email_score < 50.0)
  233. def test_process_delivered(self):
  234. mail, tracking = self.mail_send(self.recipient.email)
  235. tracking.event_create('delivered', {})
  236. self.assertEqual('delivered', tracking.state)
  237. self.assertTrue(self.recipient.email_score > 50.0)
  238. def test_process_deferral(self):
  239. mail, tracking = self.mail_send(self.recipient.email)
  240. tracking.event_create('deferral', {})
  241. self.assertEqual('deferred', tracking.state)
  242. def test_process_spam(self):
  243. mail, tracking = self.mail_send(self.recipient.email)
  244. tracking.event_create('spam', {})
  245. self.assertEqual('spam', tracking.state)
  246. self.assertTrue(self.recipient.email_score < 50.0)
  247. def test_process_unsub(self):
  248. mail, tracking = self.mail_send(self.recipient.email)
  249. tracking.event_create('unsub', {})
  250. self.assertEqual('unsub', tracking.state)
  251. self.assertTrue(self.recipient.email_score < 50.0)
  252. def test_process_reject(self):
  253. mail, tracking = self.mail_send(self.recipient.email)
  254. tracking.event_create('reject', {})
  255. self.assertEqual('rejected', tracking.state)
  256. self.assertTrue(self.recipient.email_score < 50.0)
  257. def test_process_open(self):
  258. mail, tracking = self.mail_send(self.recipient.email)
  259. tracking.event_create('open', {})
  260. self.assertEqual('opened', tracking.state)
  261. self.assertTrue(self.recipient.email_score > 50.0)
  262. def test_process_click(self):
  263. mail, tracking = self.mail_send(self.recipient.email)
  264. tracking.event_create('click', {})
  265. self.assertEqual('opened', tracking.state)
  266. self.assertTrue(self.recipient.email_score > 50.0)
  267. def test_process_several_bounce(self):
  268. for i in range(1, 10):
  269. mail, tracking = self.mail_send(self.recipient.email)
  270. tracking.event_create('hard_bounce', {})
  271. self.assertEqual('bounced', tracking.state)
  272. self.assertEqual(0.0, self.recipient.email_score)
  273. def test_bounce_new_partner(self):
  274. mail, tracking = self.mail_send(self.recipient.email)
  275. tracking.event_create('hard_bounce', {})
  276. new_partner = self.env['res.partner'].create({
  277. 'name': 'Test New Partner',
  278. })
  279. new_partner.email = self.recipient.email
  280. self.assertTrue(new_partner.email_bounced)
  281. def test_recordset_email_score(self):
  282. """For backwords compatibility sake"""
  283. trackings = self.env['mail.tracking.email']
  284. for i in range(11):
  285. mail, tracking = self.mail_send(self.recipient.email)
  286. tracking.event_create('click', {})
  287. trackings |= tracking
  288. self.assertEqual(100.0, trackings.email_score())
  289. def test_db(self):
  290. db = self.env.cr.dbname
  291. controller = MailTrackingController()
  292. not_found = controller.mail_tracking_all('not_found_db')
  293. self.assertEqual(b'NOT FOUND', not_found.response[0])
  294. none = controller.mail_tracking_all(db)
  295. self.assertEqual(b'NONE', none.response[0])
  296. none = controller.mail_tracking_event(db, 'open')
  297. self.assertEqual(b'NONE', none.response[0])