You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

693 lines
25 KiB

5 years ago
3 years ago
5 years ago
  1. # -*- coding: utf-8 -*-
  2. # Part of Odoo. See LICENSE file for full copyright and licensing details.
  3. """
  4. The PostgreSQL connector is a connectivity layer between the OpenERP code and
  5. the database, *not* a database abstraction toolkit. Database abstraction is what
  6. the ORM does, in fact.
  7. """
  8. from contextlib import contextmanager
  9. from functools import wraps
  10. import logging
  11. import time
  12. import urllib.parse
  13. import uuid
  14. import psycopg2
  15. import psycopg2.extras
  16. import psycopg2.extensions
  17. from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
  18. from psycopg2.pool import PoolError
  19. psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
  20. _logger = logging.getLogger(__name__)
  21. types_mapping = {
  22. 'date': (1082,),
  23. 'time': (1083,),
  24. 'datetime': (1114,),
  25. }
  26. def unbuffer(symb, cr):
  27. if symb is None:
  28. return None
  29. return str(symb)
  30. def undecimalize(symb, cr):
  31. if symb is None:
  32. return None
  33. return float(symb)
  34. for name, typeoid in types_mapping.items():
  35. psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
  36. psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
  37. import tools
  38. from tools.func import frame_codeinfo
  39. from datetime import timedelta
  40. import threading
  41. from inspect import currentframe
  42. import re
  43. re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
  44. re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
  45. sql_counter = 0
  46. class Cursor(object):
  47. """Represents an open transaction to the PostgreSQL DB backend,
  48. acting as a lightweight wrapper around psycopg2's
  49. ``cursor`` objects.
  50. ``Cursor`` is the object behind the ``cr`` variable used all
  51. over the OpenERP code.
  52. .. rubric:: Transaction Isolation
  53. One very important property of database transactions is the
  54. level of isolation between concurrent transactions.
  55. The SQL standard defines four levels of transaction isolation,
  56. ranging from the most strict *Serializable* level, to the least
  57. strict *Read Uncommitted* level. These levels are defined in
  58. terms of the phenomena that must not occur between concurrent
  59. transactions, such as *dirty read*, etc.
  60. In the context of a generic business data management software
  61. such as OpenERP, we need the best guarantees that no data
  62. corruption can ever be cause by simply running multiple
  63. transactions in parallel. Therefore, the preferred level would
  64. be the *serializable* level, which ensures that a set of
  65. transactions is guaranteed to produce the same effect as
  66. running them one at a time in some order.
  67. However, most database management systems implement a limited
  68. serializable isolation in the form of
  69. `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
  70. providing most of the same advantages as True Serializability,
  71. with a fraction of the performance cost.
  72. With PostgreSQL up to version 9.0, this snapshot isolation was
  73. the implementation of both the ``REPEATABLE READ`` and
  74. ``SERIALIZABLE`` levels of the SQL standard.
  75. As of PostgreSQL 9.1, the previous snapshot isolation implementation
  76. was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
  77. level was introduced, providing some additional heuristics to
  78. detect a concurrent update by parallel transactions, and forcing
  79. one of them to rollback.
  80. OpenERP implements its own level of locking protection
  81. for transactions that are highly likely to provoke concurrent
  82. updates, such as stock reservations or document sequences updates.
  83. Therefore we mostly care about the properties of snapshot isolation,
  84. but we don't really need additional heuristics to trigger transaction
  85. rollbacks, as we are taking care of triggering instant rollbacks
  86. ourselves when it matters (and we can save the additional performance
  87. hit of these heuristics).
  88. As a result of the above, we have selected ``REPEATABLE READ`` as
  89. the default transaction isolation level for OpenERP cursors, as
  90. it will be mapped to the desired ``snapshot isolation`` level for
  91. all supported PostgreSQL version (8.3 - 9.x).
  92. Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
  93. read level to serializable before sending it to the database, so it would
  94. actually select the new serializable mode on PostgreSQL 9.1. Make
  95. sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
  96. the performance hit is a concern for you.
  97. .. attribute:: cache
  98. Cache dictionary with a "request" (-ish) lifecycle, only lives as
  99. long as the cursor itself does and proactively cleared when the
  100. cursor is closed.
  101. This cache should *only* be used to store repeatable reads as it
  102. ignores rollbacks and savepoints, it should not be used to store
  103. *any* data which may be modified during the life of the cursor.
  104. """
  105. IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
  106. def check(f):
  107. @wraps(f)
  108. def wrapper(self, *args, **kwargs):
  109. if self._closed:
  110. msg = 'Unable to use a closed cursor.'
  111. if self.__closer:
  112. msg += ' It was closed at %s, line %s' % self.__closer
  113. raise psycopg2.OperationalError(msg)
  114. return f(self, *args, **kwargs)
  115. return wrapper
  116. def __init__(self, pool, dbname, dsn, serialized=True):
  117. self.sql_from_log = {}
  118. self.sql_into_log = {}
  119. # default log level determined at cursor creation, could be
  120. # overridden later for debugging purposes
  121. self.sql_log = _logger.isEnabledFor(logging.DEBUG)
  122. self.sql_log_count = 0
  123. # avoid the call of close() (by __del__) if an exception
  124. # is raised by any of the following initialisations
  125. self._closed = True
  126. self.__pool = pool
  127. self.dbname = dbname
  128. # Whether to enable snapshot isolation level for this cursor.
  129. # see also the docstring of Cursor.
  130. self._serialized = serialized
  131. self._cnx = pool.borrow(dsn)
  132. self._obj = self._cnx.cursor()
  133. if self.sql_log:
  134. self.__caller = frame_codeinfo(currentframe(), 2)
  135. else:
  136. self.__caller = False
  137. self._closed = False # real initialisation value
  138. self.autocommit(False)
  139. self.__closer = False
  140. self._default_log_exceptions = True
  141. self.cache = {}
  142. # event handlers, see method after() below
  143. self._event_handlers = {'commit': [], 'rollback': []}
  144. def __build_dict(self, row):
  145. return {d.name: row[i] for i, d in enumerate(self._obj.description)}
  146. def dictfetchone(self):
  147. row = self._obj.fetchone()
  148. return row and self.__build_dict(row)
  149. def dictfetchmany(self, size):
  150. return map(self.__build_dict, self._obj.fetchmany(size))
  151. def dictfetchall(self):
  152. return map(self.__build_dict, self._obj.fetchall())
  153. def __del__(self):
  154. if not self._closed and not self._cnx.closed:
  155. # Oops. 'self' has not been closed explicitly.
  156. # The cursor will be deleted by the garbage collector,
  157. # but the database connection is not put back into the connection
  158. # pool, preventing some operation on the database like dropping it.
  159. # This can also lead to a server overload.
  160. msg = "Cursor not closed explicitly\n"
  161. if self.__caller:
  162. msg += "Cursor was created at %s:%s" % self.__caller
  163. else:
  164. msg += "Please enable sql debugging to trace the caller."
  165. _logger.warning(msg)
  166. self._close(True)
  167. @check
  168. def execute(self, query, params=None, log_exceptions=None):
  169. if params and not isinstance(params, (tuple, list, dict)):
  170. # psycopg2's TypeError is not clear if you mess up the params
  171. raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
  172. if self.sql_log:
  173. now = time.time()
  174. _logger.debug("query: %s", query)
  175. try:
  176. params = params or None
  177. res = self._obj.execute(query, params)
  178. except Exception:
  179. if self._default_log_exceptions if log_exceptions is None else log_exceptions:
  180. _logger.info("bad query: %s", self._obj.query or query)
  181. raise
  182. # simple query count is always computed
  183. self.sql_log_count += 1
  184. # advanced stats only if sql_log is enabled
  185. if self.sql_log:
  186. delay = (time.time() - now) * 1E6
  187. res_from = re_from.match(query.lower())
  188. if res_from:
  189. self.sql_from_log.setdefault(res_from.group(1), [0, 0])
  190. self.sql_from_log[res_from.group(1)][0] += 1
  191. self.sql_from_log[res_from.group(1)][1] += delay
  192. res_into = re_into.match(query.lower())
  193. if res_into:
  194. self.sql_into_log.setdefault(res_into.group(1), [0, 0])
  195. self.sql_into_log[res_into.group(1)][0] += 1
  196. self.sql_into_log[res_into.group(1)][1] += delay
  197. return res
  198. def split_for_in_conditions(self, ids, size=None):
  199. """Split a list of identifiers into one or more smaller tuples
  200. safe for IN conditions, after uniquifying them."""
  201. return tools.misc.split_every(size or self.IN_MAX, ids)
  202. def print_log(self):
  203. global sql_counter
  204. if not self.sql_log:
  205. return
  206. def process(type):
  207. sqllogs = {'from': self.sql_from_log, 'into': self.sql_into_log}
  208. sum = 0
  209. if sqllogs[type]:
  210. sqllogitems = sqllogs[type].items()
  211. sqllogitems.sort(key=lambda k: k[1][1])
  212. _logger.debug("SQL LOG %s:", type)
  213. sqllogitems.sort(lambda x, y: cmp(x[1][0], y[1][0]))
  214. for r in sqllogitems:
  215. delay = timedelta(microseconds=r[1][1])
  216. _logger.debug("table: %s: %s/%s", r[0], delay, r[1][0])
  217. sum += r[1][1]
  218. sqllogs[type].clear()
  219. sum = timedelta(microseconds=sum)
  220. _logger.debug("SUM %s:%s/%d [%d]", type, sum, self.sql_log_count, sql_counter)
  221. sqllogs[type].clear()
  222. process('from')
  223. process('into')
  224. self.sql_log_count = 0
  225. self.sql_log = False
  226. @check
  227. def close(self):
  228. return self._close(False)
  229. def _close(self, leak=False):
  230. global sql_counter
  231. if not self._obj:
  232. return
  233. del self.cache
  234. if self.sql_log:
  235. self.__closer = frame_codeinfo(currentframe(), 3)
  236. # simple query count is always computed
  237. sql_counter += self.sql_log_count
  238. # advanced stats only if sql_log is enabled
  239. self.print_log()
  240. self._obj.close()
  241. # This force the cursor to be freed, and thus, available again. It is
  242. # important because otherwise we can overload the server very easily
  243. # because of a cursor shortage (because cursors are not garbage
  244. # collected as fast as they should). The problem is probably due in
  245. # part because browse records keep a reference to the cursor.
  246. del self._obj
  247. self._closed = True
  248. # Clean the underlying connection.
  249. self._cnx.rollback()
  250. if leak:
  251. self._cnx.leaked = True
  252. else:
  253. chosen_template = tools.config['db_template']
  254. templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
  255. keep_in_pool = self.dbname not in templates_list
  256. self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
  257. @check
  258. def autocommit(self, on):
  259. if on:
  260. isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
  261. else:
  262. # If a serializable cursor was requested, we
  263. # use the appropriate PotsgreSQL isolation level
  264. # that maps to snaphsot isolation.
  265. # For all supported PostgreSQL versions (8.3-9.x),
  266. # this is currently the ISOLATION_REPEATABLE_READ.
  267. # See also the docstring of this class.
  268. # NOTE: up to psycopg 2.4.2, repeatable read
  269. # is remapped to serializable before being
  270. # sent to the database, so it is in fact
  271. # unavailable for use with pg 9.1.
  272. isolation_level = \
  273. ISOLATION_LEVEL_REPEATABLE_READ \
  274. if self._serialized \
  275. else ISOLATION_LEVEL_READ_COMMITTED
  276. self._cnx.set_isolation_level(isolation_level)
  277. @check
  278. def after(self, event, func):
  279. """ Register an event handler.
  280. :param event: the event, either `'commit'` or `'rollback'`
  281. :param func: a callable object, called with no argument after the
  282. event occurs
  283. Be careful when coding an event handler, since any operation on the
  284. cursor that was just committed/rolled back will take place in the
  285. next transaction that has already begun, and may still be rolled
  286. back or committed independently. You may consider the use of a
  287. dedicated temporary cursor to do some database operation.
  288. """
  289. self._event_handlers[event].append(func)
  290. def _pop_event_handlers(self):
  291. # return the current handlers, and reset them on self
  292. result = self._event_handlers
  293. self._event_handlers = {'commit': [], 'rollback': []}
  294. return result
  295. @check
  296. def commit(self):
  297. """ Perform an SQL `COMMIT`
  298. """
  299. result = self._cnx.commit()
  300. for func in self._pop_event_handlers()['commit']:
  301. func()
  302. return result
  303. @check
  304. def rollback(self):
  305. """ Perform an SQL `ROLLBACK`
  306. """
  307. result = self._cnx.rollback()
  308. for func in self._pop_event_handlers()['rollback']:
  309. func()
  310. return result
  311. def __enter__(self):
  312. """ Using the cursor as a contextmanager automatically commits and
  313. closes it::
  314. with cr:
  315. cr.execute(...)
  316. # cr is committed if no failure occurred
  317. # cr is closed in any case
  318. """
  319. return self
  320. def __exit__(self, exc_type, exc_value, traceback):
  321. if exc_type is None:
  322. self.commit()
  323. self.close()
  324. @contextmanager
  325. @check
  326. def savepoint(self):
  327. """context manager entering in a new savepoint"""
  328. name = uuid.uuid1().hex
  329. self.execute('SAVEPOINT "%s"' % name)
  330. try:
  331. yield
  332. except Exception:
  333. self.execute('ROLLBACK TO SAVEPOINT "%s"' % name)
  334. raise
  335. else:
  336. self.execute('RELEASE SAVEPOINT "%s"' % name)
  337. @check
  338. def __getattr__(self, name):
  339. return getattr(self._obj, name)
  340. @property
  341. def closed(self):
  342. return self._closed
  343. class TestCursor(Cursor):
  344. """ A cursor to be used for tests. It keeps the transaction open across
  345. several requests, and simulates committing, rolling back, and closing.
  346. """
  347. def __init__(self, *args, **kwargs):
  348. super(TestCursor, self).__init__(*args, **kwargs)
  349. # in order to simulate commit and rollback, the cursor maintains a
  350. # savepoint at its last commit
  351. self.execute("SAVEPOINT test_cursor")
  352. # we use a lock to serialize concurrent requests
  353. self._lock = threading.RLock()
  354. def acquire(self):
  355. self._lock.acquire()
  356. def release(self):
  357. self._lock.release()
  358. def force_close(self):
  359. super(TestCursor, self).close()
  360. def close(self):
  361. if not self._closed:
  362. self.rollback() # for stuff that has not been committed
  363. self.release()
  364. def autocommit(self, on):
  365. _logger.debug("TestCursor.autocommit(%r) does nothing", on)
  366. def commit(self):
  367. self.execute("RELEASE SAVEPOINT test_cursor")
  368. self.execute("SAVEPOINT test_cursor")
  369. def rollback(self):
  370. self.execute("ROLLBACK TO SAVEPOINT test_cursor")
  371. self.execute("SAVEPOINT test_cursor")
  372. class LazyCursor(object):
  373. """ A proxy object to a cursor. The cursor itself is allocated only if it is
  374. needed. This class is useful for cached methods, that use the cursor
  375. only in the case of a cache miss.
  376. """
  377. def __init__(self, dbname=None):
  378. self._dbname = dbname
  379. self._cursor = None
  380. self._depth = 0
  381. @property
  382. def dbname(self):
  383. return self._dbname or threading.currentThread().dbname
  384. def __getattr__(self, name):
  385. cr = self._cursor
  386. if cr is None:
  387. from odoo import registry
  388. cr = self._cursor = registry(self.dbname).cursor()
  389. for _ in xrange(self._depth):
  390. cr.__enter__()
  391. return getattr(cr, name)
  392. def __enter__(self):
  393. self._depth += 1
  394. if self._cursor is not None:
  395. self._cursor.__enter__()
  396. return self
  397. def __exit__(self, exc_type, exc_value, traceback):
  398. self._depth -= 1
  399. if self._cursor is not None:
  400. self._cursor.__exit__(exc_type, exc_value, traceback)
  401. class PsycoConnection(psycopg2.extensions.connection):
  402. pass
  403. class ConnectionPool(object):
  404. """ The pool of connections to database(s)
  405. Keep a set of connections to pg databases open, and reuse them
  406. to open cursors for all transactions.
  407. The connections are *not* automatically closed. Only a close_db()
  408. can trigger that.
  409. """
  410. def locked(fun):
  411. @wraps(fun)
  412. def _locked(self, *args, **kwargs):
  413. self._lock.acquire()
  414. try:
  415. return fun(self, *args, **kwargs)
  416. finally:
  417. self._lock.release()
  418. return _locked
  419. def __init__(self, maxconn=64):
  420. self._connections = []
  421. self._maxconn = max(maxconn, 1)
  422. self._lock = threading.Lock()
  423. def __repr__(self):
  424. used = len([1 for c, u in self._connections[:] if u])
  425. count = len(self._connections)
  426. return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
  427. def _debug(self, msg, *args):
  428. _logger.debug(('%r ' + msg), self, *args)
  429. @locked
  430. def borrow(self, connection_info):
  431. """
  432. :param dict connection_info: dict of psql connection keywords
  433. :rtype: PsycoConnection
  434. """
  435. # free dead and leaked connections
  436. for i, (cnx, _) in tools.reverse_enumerate(self._connections):
  437. if cnx.closed:
  438. self._connections.pop(i)
  439. self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
  440. continue
  441. if getattr(cnx, 'leaked', False):
  442. delattr(cnx, 'leaked')
  443. self._connections.pop(i)
  444. self._connections.append((cnx, False))
  445. _logger.info('%r: Free leaked connection to %r', self, cnx.dsn)
  446. for i, (cnx, used) in enumerate(self._connections):
  447. if not used and cnx._original_dsn == connection_info:
  448. try:
  449. cnx.reset()
  450. except psycopg2.OperationalError:
  451. self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
  452. # psycopg2 2.4.4 and earlier do not allow closing a closed connection
  453. if not cnx.closed:
  454. cnx.close()
  455. continue
  456. self._connections.pop(i)
  457. self._connections.append((cnx, True))
  458. self._debug('Borrow existing connection to %r at index %d', cnx.dsn, i)
  459. return cnx
  460. if len(self._connections) >= self._maxconn:
  461. # try to remove the oldest connection not used
  462. for i, (cnx, used) in enumerate(self._connections):
  463. if not used:
  464. self._connections.pop(i)
  465. if not cnx.closed:
  466. cnx.close()
  467. self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
  468. break
  469. else:
  470. # note: this code is called only if the for loop has completed (no break)
  471. raise PoolError('The Connection Pool Is Full')
  472. try:
  473. result = psycopg2.connect(
  474. connection_factory=PsycoConnection,
  475. **connection_info)
  476. except psycopg2.Error:
  477. _logger.info('Connection to the database failed')
  478. raise
  479. result._original_dsn = connection_info
  480. self._connections.append((result, True))
  481. self._debug('Create new connection')
  482. return result
  483. @locked
  484. def give_back(self, connection, keep_in_pool=True):
  485. self._debug('Give back connection to %r', connection.dsn)
  486. for i, (cnx, used) in enumerate(self._connections):
  487. if cnx is connection:
  488. self._connections.pop(i)
  489. if keep_in_pool:
  490. self._connections.append((cnx, False))
  491. self._debug('Put connection to %r in pool', cnx.dsn)
  492. else:
  493. self._debug('Forgot connection to %r', cnx.dsn)
  494. cnx.close()
  495. break
  496. else:
  497. raise PoolError('This connection does not below to the pool')
  498. @locked
  499. def close_all(self, dsn=None):
  500. count = 0
  501. last = None
  502. for i, (cnx, used) in tools.reverse_enumerate(self._connections):
  503. if dsn is None or cnx._original_dsn == dsn:
  504. cnx.close()
  505. last = self._connections.pop(i)[0]
  506. count += 1
  507. _logger.info('%r: Closed %d connections %s', self, count,
  508. (dsn and last and 'to %r' % last.dsn) or '')
  509. class Connection(object):
  510. """ A lightweight instance of a connection to postgres
  511. """
  512. def __init__(self, pool, dbname, dsn):
  513. self.dbname = dbname
  514. self.dsn = dsn
  515. self.__pool = pool
  516. def cursor(self, serialized=True):
  517. cursor_type = serialized and 'serialized ' or ''
  518. _logger.debug('create %scursor to %r', cursor_type, self.dsn)
  519. return Cursor(self.__pool, self.dbname, self.dsn, serialized=serialized)
  520. def test_cursor(self, serialized=True):
  521. cursor_type = serialized and 'serialized ' or ''
  522. _logger.debug('create test %scursor to %r', cursor_type, self.dsn)
  523. return TestCursor(self.__pool, self.dbname, self.dsn, serialized=serialized)
  524. # serialized_cursor is deprecated - cursors are serialized by default
  525. serialized_cursor = cursor
  526. def __nonzero__(self):
  527. """Check if connection is possible"""
  528. try:
  529. _logger.info("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
  530. cr = self.cursor()
  531. cr.close()
  532. return True
  533. except Exception:
  534. return False
  535. def connection_info_for(db_or_uri):
  536. """ parse the given `db_or_uri` and return a 2-tuple (dbname, connection_params)
  537. Connection params are either a dictionary with a single key ``dsn``
  538. containing a connection URI, or a dictionary containing connection
  539. parameter keywords which psycopg2 can build a key/value connection string
  540. (dsn) from
  541. :param str db_or_uri: database name or postgres dsn
  542. :rtype: (str, dict)
  543. """
  544. if db_or_uri.startswith(('postgresql://', 'postgres://')):
  545. # extract db from uri
  546. us = urlparse.urlsplit(db_or_uri)
  547. if len(us.path) > 1:
  548. db_name = us.path[1:]
  549. elif us.username:
  550. db_name = us.username
  551. else:
  552. db_name = us.hostname
  553. return db_name, {'dsn': db_or_uri}
  554. connection_info = {'database': db_or_uri}
  555. for p in ('host', 'port', 'user', 'password', 'dbname'):
  556. cfg = tools.config['db_' + p]
  557. if cfg:
  558. connection_info[p] = cfg
  559. return db_or_uri, connection_info
  560. _Pool = None
  561. def db_connect(to, allow_uri=False):
  562. global _Pool
  563. if _Pool is None:
  564. _Pool = ConnectionPool(int(tools.config['db_maxconn']))
  565. db, info = connection_info_for(to)
  566. if not allow_uri and db != to:
  567. raise ValueError('URI connections not allowed')
  568. return Connection(_Pool, db, info)
  569. def close_db(db_name):
  570. """ You might want to call odoo.modules.registry.Registry.delete(db_name) along this function."""
  571. global _Pool
  572. if _Pool:
  573. _Pool.close_all(connection_info_for(db_name)[1])
  574. def close_all():
  575. global _Pool
  576. if _Pool:
  577. _Pool.close_all()