You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

262 lines
9.9 KiB

  1. # -*- coding: utf-8 -*-
  2. # © 2016 Antonio Espinosa - <antonio.espinosa@tecnativa.com>
  3. # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
  4. import mock
  5. import base64
  6. import time
  7. from openerp.tests.common import TransactionCase
  8. from ..controllers.main import MailTrackingController, BLANK
  9. mock_request = 'openerp.http.request'
  10. mock_send_email = ('openerp.addons.base.ir.ir_mail_server.'
  11. 'ir_mail_server.send_email')
  12. class FakeUserAgent(object):
  13. browser = 'Test browser'
  14. platform = 'Test platform'
  15. def __str__(self):
  16. """Return name"""
  17. return 'Test suite'
  18. # One test case per method
  19. class TestMailTracking(TransactionCase):
  20. # Use case : Prepare some data for current test case
  21. def setUp(self):
  22. super(TestMailTracking, self).setUp()
  23. self.sender = self.env['res.partner'].create({
  24. 'name': 'Test sender',
  25. 'email': 'sender@example.com',
  26. 'notify_email': 'always',
  27. })
  28. self.recipient = self.env['res.partner'].create({
  29. 'name': 'Test recipient',
  30. 'email': 'recipient@example.com',
  31. 'notify_email': 'always',
  32. })
  33. self.request = {
  34. 'httprequest': type('obj', (object,), {
  35. 'remote_addr': '123.123.123.123',
  36. 'user_agent': FakeUserAgent(),
  37. }),
  38. }
  39. def test_message_post(self):
  40. # This message will generate a notification for recipient
  41. message = self.env['mail.message'].create({
  42. 'subject': 'Message test',
  43. 'author_id': self.sender.id,
  44. 'email_from': self.sender.email,
  45. 'type': 'comment',
  46. 'model': 'res.partner',
  47. 'res_id': self.recipient.id,
  48. 'partner_ids': [(4, self.recipient.id)],
  49. 'body': '<p>This is a test message</p>',
  50. })
  51. # Search tracking created
  52. tracking_email = self.env['mail.tracking.email'].search([
  53. ('mail_message_id', '=', message.id),
  54. ('partner_id', '=', self.recipient.id),
  55. ])
  56. # The tracking email must be sent
  57. self.assertTrue(tracking_email)
  58. self.assertEqual(tracking_email.state, 'sent')
  59. # message_dict read by web interface
  60. message_dict = message.message_read()
  61. # First item in threads is message content
  62. message_dict = message_dict['threads'][0][0]
  63. self.assertTrue(len(message_dict['partner_ids']) > 0)
  64. # First partner is recipient
  65. partner_id = message_dict['partner_ids'][0][0]
  66. self.assertEqual(partner_id, self.recipient.id)
  67. status = message_dict['partner_trackings'][0]
  68. # Tracking status must be sent and
  69. # mail tracking must be the one search before
  70. self.assertEqual(status[0], 'sent')
  71. self.assertEqual(status[1], tracking_email.id)
  72. self.assertEqual(status[2], self.recipient.display_name)
  73. self.assertEqual(status[3], self.recipient.id)
  74. # And now open the email
  75. metadata = {
  76. 'ip': '127.0.0.1',
  77. 'user_agent': 'Odoo Test/1.0',
  78. 'os_family': 'linux',
  79. 'ua_family': 'odoo',
  80. }
  81. tracking_email.event_create('open', metadata)
  82. self.assertEqual(tracking_email.state, 'opened')
  83. def mail_send(self, recipient):
  84. mail = self.env['mail.mail'].create({
  85. 'subject': 'Test subject',
  86. 'email_from': 'from@domain.com',
  87. 'email_to': recipient,
  88. 'body_html': '<p>This is a test message</p>',
  89. })
  90. mail.send()
  91. # Search tracking created
  92. tracking_email = self.env['mail.tracking.email'].search([
  93. ('mail_id', '=', mail.id),
  94. ])
  95. return mail, tracking_email
  96. def test_mail_send(self):
  97. controller = MailTrackingController()
  98. db = self.env.cr.dbname
  99. image = base64.decodestring(BLANK)
  100. mail, tracking = self.mail_send(self.recipient.email)
  101. self.assertEqual(mail.email_to, tracking.recipient)
  102. self.assertEqual(mail.email_from, tracking.sender)
  103. with mock.patch(mock_request) as mock_func:
  104. mock_func.return_value = type('obj', (object,), self.request)
  105. res = controller.mail_tracking_open(db, tracking.id)
  106. self.assertEqual(image, res.response[0])
  107. def test_concurrent_open(self):
  108. mail, tracking = self.mail_send(self.recipient.email)
  109. ts = time.time()
  110. metadata = {
  111. 'ip': '127.0.0.1',
  112. 'user_agent': 'Odoo Test/1.0',
  113. 'os_family': 'linux',
  114. 'ua_family': 'odoo',
  115. 'timestamp': ts,
  116. }
  117. # First open event
  118. tracking.event_create('open', metadata)
  119. opens = tracking.tracking_event_ids.filtered(
  120. lambda r: r.event_type == 'open'
  121. )
  122. self.assertEqual(len(opens), 1)
  123. # Concurrent open event
  124. metadata['timestamp'] = ts + 2
  125. tracking.event_create('open', metadata)
  126. opens = tracking.tracking_event_ids.filtered(
  127. lambda r: r.event_type == 'open'
  128. )
  129. self.assertEqual(len(opens), 1)
  130. # Second open event
  131. metadata['timestamp'] = ts + 350
  132. tracking.event_create('open', metadata)
  133. opens = tracking.tracking_event_ids.filtered(
  134. lambda r: r.event_type == 'open'
  135. )
  136. self.assertEqual(len(opens), 2)
  137. def test_concurrent_click(self):
  138. mail, tracking = self.mail_send(self.recipient.email)
  139. ts = time.time()
  140. metadata = {
  141. 'ip': '127.0.0.1',
  142. 'user_agent': 'Odoo Test/1.0',
  143. 'os_family': 'linux',
  144. 'ua_family': 'odoo',
  145. 'timestamp': ts,
  146. 'url': 'https://www.example.com/route/1',
  147. }
  148. # First click event (URL 1)
  149. tracking.event_create('click', metadata)
  150. opens = tracking.tracking_event_ids.filtered(
  151. lambda r: r.event_type == 'click'
  152. )
  153. self.assertEqual(len(opens), 1)
  154. # Concurrent click event (URL 1)
  155. metadata['timestamp'] = ts + 2
  156. tracking.event_create('click', metadata)
  157. opens = tracking.tracking_event_ids.filtered(
  158. lambda r: r.event_type == 'click'
  159. )
  160. self.assertEqual(len(opens), 1)
  161. # Second click event (URL 1)
  162. metadata['timestamp'] = ts + 350
  163. tracking.event_create('click', metadata)
  164. opens = tracking.tracking_event_ids.filtered(
  165. lambda r: r.event_type == 'click'
  166. )
  167. self.assertEqual(len(opens), 2)
  168. # Concurrent click event (URL 2)
  169. metadata['timestamp'] = ts + 2
  170. metadata['url'] = 'https://www.example.com/route/2'
  171. tracking.event_create('click', metadata)
  172. opens = tracking.tracking_event_ids.filtered(
  173. lambda r: r.event_type == 'click'
  174. )
  175. self.assertEqual(len(opens), 3)
  176. def test_smtp_error(self):
  177. with mock.patch(mock_send_email) as mock_func:
  178. mock_func.side_effect = Warning('Test error')
  179. mail, tracking = self.mail_send(self.recipient.email)
  180. self.assertEqual('error', tracking.state)
  181. self.assertEqual('Warning', tracking.error_type)
  182. self.assertEqual('Test error', tracking.error_description)
  183. def test_partner_email_change(self):
  184. mail, tracking = self.mail_send(self.recipient.email)
  185. tracking.event_create('open', {})
  186. orig_score = self.recipient.email_score
  187. orig_email = self.recipient.email
  188. self.recipient.email = orig_email + '2'
  189. self.assertEqual(50.0, self.recipient.email_score)
  190. self.recipient.email = orig_email
  191. self.assertEqual(orig_score, self.recipient.email_score)
  192. def test_process_hard_bounce(self):
  193. mail, tracking = self.mail_send(self.recipient.email)
  194. tracking.event_create('hard_bounce', {})
  195. self.assertEqual('bounced', tracking.state)
  196. def test_process_soft_bounce(self):
  197. mail, tracking = self.mail_send(self.recipient.email)
  198. tracking.event_create('soft_bounce', {})
  199. self.assertEqual('soft-bounced', tracking.state)
  200. def test_process_delivered(self):
  201. mail, tracking = self.mail_send(self.recipient.email)
  202. tracking.event_create('delivered', {})
  203. self.assertEqual('delivered', tracking.state)
  204. def test_process_deferral(self):
  205. mail, tracking = self.mail_send(self.recipient.email)
  206. tracking.event_create('deferral', {})
  207. self.assertEqual('deferred', tracking.state)
  208. def test_process_spam(self):
  209. mail, tracking = self.mail_send(self.recipient.email)
  210. tracking.event_create('spam', {})
  211. self.assertEqual('spam', tracking.state)
  212. def test_process_unsub(self):
  213. mail, tracking = self.mail_send(self.recipient.email)
  214. tracking.event_create('unsub', {})
  215. self.assertEqual('unsub', tracking.state)
  216. def test_process_reject(self):
  217. mail, tracking = self.mail_send(self.recipient.email)
  218. tracking.event_create('reject', {})
  219. self.assertEqual('rejected', tracking.state)
  220. def test_process_open(self):
  221. mail, tracking = self.mail_send(self.recipient.email)
  222. tracking.event_create('open', {})
  223. self.assertEqual('opened', tracking.state)
  224. def test_process_click(self):
  225. mail, tracking = self.mail_send(self.recipient.email)
  226. tracking.event_create('click', {})
  227. self.assertEqual('opened', tracking.state)
  228. def test_db(self):
  229. db = self.env.cr.dbname
  230. controller = MailTrackingController()
  231. with mock.patch(mock_request) as mock_func:
  232. mock_func.return_value = type('obj', (object,), self.request)
  233. not_found = controller.mail_tracking_all('not_found_db')
  234. self.assertEqual('NOT FOUND', not_found.response[0])
  235. none = controller.mail_tracking_all(db)
  236. self.assertEqual('NONE', none.response[0])
  237. none = controller.mail_tracking_event(db, 'open')
  238. self.assertEqual('NONE', none.response[0])