You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

77 lines
2.5 KiB

  1. # Copyright 2019 Trobz <https://trobz.com>
  2. # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
  3. import os
  4. import json
  5. import logging
  6. import select
  7. import psycopg2
  8. from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
  9. import odoo
  10. from odoo.tools import config
  11. from odoo.addons.bus.models.bus import hashable, TIMEOUT
  12. import odoo.addons.bus.models.bus
  13. _logger = logging.getLogger(__name__)
  14. def _connection_info_for(db_name):
  15. db_or_uri, connection_info = odoo.sql_db.connection_info_for(db_name)
  16. for p in ('host', 'port'):
  17. cfg = (os.environ.get('ODOO_IMDISPATCHER_DB_%s' % p.upper()) or
  18. config.get('imdispatcher_db_' + p))
  19. if cfg:
  20. connection_info[p] = cfg
  21. return connection_info
  22. class ImDispatch(odoo.addons.bus.models.bus.ImDispatch):
  23. def loop(self):
  24. """ Dispatch postgres notifications to the relevant
  25. polling threads/greenlets """
  26. connection_info = _connection_info_for('postgres')
  27. _logger.info("Bus.loop listen imbus on db postgres "
  28. "(via %(host)s:%(port)s)",
  29. connection_info)
  30. conn = psycopg2.connect(**connection_info)
  31. conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
  32. with conn.cursor() as cr:
  33. cr.execute("listen imbus")
  34. conn.commit()
  35. while True:
  36. if select.select([conn], [], [], TIMEOUT) == ([], [], []):
  37. pass
  38. else:
  39. conn.poll()
  40. channels = []
  41. while conn.notifies:
  42. channels.extend(json.loads(conn
  43. .notifies.pop().payload))
  44. # dispatch to local threads/greenlets
  45. events = set()
  46. for channel in channels:
  47. events.update(self.channels.pop(hashable(channel),
  48. set()))
  49. for event in events:
  50. event.set()
  51. odoo.addons.bus.models.bus.ImDispatch = ImDispatch
  52. # we can replace the existing dispatcher because its thread
  53. # has not been started yet; indeed, since a2ed3d it only starts
  54. # on first /poll request:
  55. # https://github.com/odoo/odoo/commit/a2ed3d3d5bdb6025a1ba14ad557a115a86413e65
  56. if not odoo.multi_process or odoo.evented:
  57. dispatch = ImDispatch()
  58. odoo.addons.bus.models.bus.dispatch = dispatch
  59. odoo.addons.bus.controllers.main.dispatch = dispatch