You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

261 lines
9.9 KiB

  1. # -*- coding: utf-8 -*-
  2. # © 2016 Antonio Espinosa - <antonio.espinosa@tecnativa.com>
  3. # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
  4. import mock
  5. import base64
  6. import time
  7. from openerp.tests.common import TransactionCase
  8. from ..controllers.main import MailTrackingController, BLANK
  9. mock_request = 'openerp.http.request'
  10. mock_send_email = ('openerp.addons.base.ir.ir_mail_server.'
  11. 'ir_mail_server.send_email')
  12. class FakeUserAgent(object):
  13. browser = 'Test browser'
  14. platform = 'Test platform'
  15. def __str__(self):
  16. """Return name"""
  17. return 'Test suite'
  18. # One test case per method
  19. class TestMailTracking(TransactionCase):
  20. # Use case : Prepare some data for current test case
  21. def setUp(self):
  22. super(TestMailTracking, self).setUp()
  23. self.sender = self.env['res.partner'].create({
  24. 'name': 'Test sender',
  25. 'email': 'sender@example.com',
  26. 'notify_email': 'always',
  27. })
  28. self.recipient = self.env['res.partner'].create({
  29. 'name': 'Test recipient',
  30. 'email': 'recipient@example.com',
  31. 'notify_email': 'always',
  32. })
  33. self.request = {
  34. 'httprequest': type('obj', (object,), {
  35. 'remote_addr': '123.123.123.123',
  36. 'user_agent': FakeUserAgent(),
  37. }),
  38. }
  39. def test_message_post(self):
  40. # This message will generate a notification for recipient
  41. message = self.env['mail.message'].create({
  42. 'subject': 'Message test',
  43. 'author_id': self.sender.id,
  44. 'email_from': self.sender.email,
  45. 'type': 'comment',
  46. 'model': 'res.partner',
  47. 'res_id': self.recipient.id,
  48. 'partner_ids': [(4, self.recipient.id)],
  49. 'body': '<p>This is a test message</p>',
  50. })
  51. # Search tracking created
  52. tracking_email = self.env['mail.tracking.email'].search([
  53. ('mail_message_id', '=', message.id),
  54. ('partner_id', '=', self.recipient.id),
  55. ])
  56. # The tracking email must be sent
  57. self.assertTrue(tracking_email)
  58. self.assertEqual(tracking_email.state, 'sent')
  59. # message_dict read by web interface
  60. message_dict = self.env['mail.message'].message_read(message.id)
  61. # First item is message content
  62. self.assertTrue(len(message_dict) > 0)
  63. message_dict = message_dict[0]
  64. self.assertTrue(len(message_dict['partner_ids']) > 0)
  65. # First partner is recipient
  66. partner_id = message_dict['partner_ids'][0][0]
  67. self.assertEqual(partner_id, self.recipient.id)
  68. status = message_dict['partner_trackings'][str(partner_id)]
  69. # Tracking status must be sent and
  70. # mail tracking must be the one search before
  71. self.assertEqual(status[0], 'sent')
  72. self.assertEqual(status[1], tracking_email.id)
  73. # And now open the email
  74. metadata = {
  75. 'ip': '127.0.0.1',
  76. 'user_agent': 'Odoo Test/1.0',
  77. 'os_family': 'linux',
  78. 'ua_family': 'odoo',
  79. }
  80. tracking_email.event_create('open', metadata)
  81. self.assertEqual(tracking_email.state, 'opened')
  82. def mail_send(self, recipient):
  83. mail = self.env['mail.mail'].create({
  84. 'subject': 'Test subject',
  85. 'email_from': 'from@domain.com',
  86. 'email_to': recipient,
  87. 'body_html': '<p>This is a test message</p>',
  88. })
  89. mail.send()
  90. # Search tracking created
  91. tracking_email = self.env['mail.tracking.email'].search([
  92. ('mail_id', '=', mail.id),
  93. ])
  94. return mail, tracking_email
  95. def test_mail_send(self):
  96. controller = MailTrackingController()
  97. db = self.env.cr.dbname
  98. image = base64.decodestring(BLANK)
  99. mail, tracking = self.mail_send(self.recipient.email)
  100. self.assertEqual(mail.email_to, tracking.recipient)
  101. self.assertEqual(mail.email_from, tracking.sender)
  102. with mock.patch(mock_request) as mock_func:
  103. mock_func.return_value = type('obj', (object,), self.request)
  104. res = controller.mail_tracking_open(db, tracking.id)
  105. self.assertEqual(image, res.response[0])
  106. def test_concurrent_open(self):
  107. mail, tracking = self.mail_send(self.recipient.email)
  108. ts = time.time()
  109. metadata = {
  110. 'ip': '127.0.0.1',
  111. 'user_agent': 'Odoo Test/1.0',
  112. 'os_family': 'linux',
  113. 'ua_family': 'odoo',
  114. 'timestamp': ts,
  115. }
  116. # First open event
  117. tracking.event_create('open', metadata)
  118. opens = tracking.tracking_event_ids.filtered(
  119. lambda r: r.event_type == 'open'
  120. )
  121. self.assertEqual(len(opens), 1)
  122. # Concurrent open event
  123. metadata['timestamp'] = ts + 2
  124. tracking.event_create('open', metadata)
  125. opens = tracking.tracking_event_ids.filtered(
  126. lambda r: r.event_type == 'open'
  127. )
  128. self.assertEqual(len(opens), 1)
  129. # Second open event
  130. metadata['timestamp'] = ts + 350
  131. tracking.event_create('open', metadata)
  132. opens = tracking.tracking_event_ids.filtered(
  133. lambda r: r.event_type == 'open'
  134. )
  135. self.assertEqual(len(opens), 2)
  136. def test_concurrent_click(self):
  137. mail, tracking = self.mail_send(self.recipient.email)
  138. ts = time.time()
  139. metadata = {
  140. 'ip': '127.0.0.1',
  141. 'user_agent': 'Odoo Test/1.0',
  142. 'os_family': 'linux',
  143. 'ua_family': 'odoo',
  144. 'timestamp': ts,
  145. 'url': 'https://www.example.com/route/1',
  146. }
  147. # First click event (URL 1)
  148. tracking.event_create('click', metadata)
  149. opens = tracking.tracking_event_ids.filtered(
  150. lambda r: r.event_type == 'click'
  151. )
  152. self.assertEqual(len(opens), 1)
  153. # Concurrent click event (URL 1)
  154. metadata['timestamp'] = ts + 2
  155. tracking.event_create('click', metadata)
  156. opens = tracking.tracking_event_ids.filtered(
  157. lambda r: r.event_type == 'click'
  158. )
  159. self.assertEqual(len(opens), 1)
  160. # Second click event (URL 1)
  161. metadata['timestamp'] = ts + 350
  162. tracking.event_create('click', metadata)
  163. opens = tracking.tracking_event_ids.filtered(
  164. lambda r: r.event_type == 'click'
  165. )
  166. self.assertEqual(len(opens), 2)
  167. # Concurrent click event (URL 2)
  168. metadata['timestamp'] = ts + 2
  169. metadata['url'] = 'https://www.example.com/route/2'
  170. tracking.event_create('click', metadata)
  171. opens = tracking.tracking_event_ids.filtered(
  172. lambda r: r.event_type == 'click'
  173. )
  174. self.assertEqual(len(opens), 3)
  175. def test_smtp_error(self):
  176. with mock.patch(mock_send_email) as mock_func:
  177. mock_func.side_effect = Warning('Test error')
  178. mail, tracking = self.mail_send(self.recipient.email)
  179. self.assertEqual('error', tracking.state)
  180. self.assertEqual('Warning', tracking.error_type)
  181. self.assertEqual('Test error', tracking.error_description)
  182. def test_partner_email_change(self):
  183. mail, tracking = self.mail_send(self.recipient.email)
  184. tracking.event_create('open', {})
  185. orig_score = self.recipient.email_score
  186. orig_email = self.recipient.email
  187. self.recipient.email = orig_email + '2'
  188. self.assertEqual(50.0, self.recipient.email_score)
  189. self.recipient.email = orig_email
  190. self.assertEqual(orig_score, self.recipient.email_score)
  191. def test_process_hard_bounce(self):
  192. mail, tracking = self.mail_send(self.recipient.email)
  193. tracking.event_create('hard_bounce', {})
  194. self.assertEqual('bounced', tracking.state)
  195. def test_process_soft_bounce(self):
  196. mail, tracking = self.mail_send(self.recipient.email)
  197. tracking.event_create('soft_bounce', {})
  198. self.assertEqual('soft-bounced', tracking.state)
  199. def test_process_delivered(self):
  200. mail, tracking = self.mail_send(self.recipient.email)
  201. tracking.event_create('delivered', {})
  202. self.assertEqual('delivered', tracking.state)
  203. def test_process_deferral(self):
  204. mail, tracking = self.mail_send(self.recipient.email)
  205. tracking.event_create('deferral', {})
  206. self.assertEqual('deferred', tracking.state)
  207. def test_process_spam(self):
  208. mail, tracking = self.mail_send(self.recipient.email)
  209. tracking.event_create('spam', {})
  210. self.assertEqual('spam', tracking.state)
  211. def test_process_unsub(self):
  212. mail, tracking = self.mail_send(self.recipient.email)
  213. tracking.event_create('unsub', {})
  214. self.assertEqual('unsub', tracking.state)
  215. def test_process_reject(self):
  216. mail, tracking = self.mail_send(self.recipient.email)
  217. tracking.event_create('reject', {})
  218. self.assertEqual('rejected', tracking.state)
  219. def test_process_open(self):
  220. mail, tracking = self.mail_send(self.recipient.email)
  221. tracking.event_create('open', {})
  222. self.assertEqual('opened', tracking.state)
  223. def test_process_click(self):
  224. mail, tracking = self.mail_send(self.recipient.email)
  225. tracking.event_create('click', {})
  226. self.assertEqual('opened', tracking.state)
  227. def test_db(self):
  228. db = self.env.cr.dbname
  229. controller = MailTrackingController()
  230. with mock.patch(mock_request) as mock_func:
  231. mock_func.return_value = type('obj', (object,), self.request)
  232. not_found = controller.mail_tracking_all('not_found_db')
  233. self.assertEqual('NOT FOUND', not_found.response[0])
  234. none = controller.mail_tracking_all(db)
  235. self.assertEqual('NONE', none.response[0])
  236. none = controller.mail_tracking_event(db, 'open')
  237. self.assertEqual('NONE', none.response[0])