# -*- coding: utf-8 -*- # Copyright 2011 Daniel Reis # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl). import os import logging import psycopg2 from odoo import models, fields, api, _ from odoo.exceptions import Warning as UserError import odoo.tools as tools _logger = logging.getLogger(__name__) CONNECTORS = [] try: import sqlalchemy CONNECTORS.append(('sqlite', 'SQLite')) try: import pymssql CONNECTORS.append(('mssql', 'Microsoft SQL Server')) assert pymssql except (ImportError, AssertionError): _logger.info('MS SQL Server not available. Please install "pymssql"\ python package.') try: import MySQLdb CONNECTORS.append(('mysql', 'MySQL')) assert MySQLdb except (ImportError, AssertionError): _logger.info('MySQL not available. Please install "mysqldb"\ python package.') except: _logger.info('SQL Alchemy not available. Please install "slqalchemy"\ python package.') try: import pyodbc CONNECTORS.append(('pyodbc', 'ODBC')) except: _logger.info('ODBC libraries not available. Please install "unixodbc"\ and "python-pyodbc" packages.') try: import cx_Oracle CONNECTORS.append(('cx_Oracle', 'Oracle')) except: _logger.info('Oracle libraries not available. Please install "cx_Oracle"\ python package.') try: import fdb CONNECTORS.append(('fdb', 'Firebird')) except: _logger.info('Firebird libraries not available. Please install "fdb"\ python package.') CONNECTORS.append(('postgresql', 'PostgreSQL')) class BaseExternalDbsource(models.Model): _name = "base.external.dbsource" _description = 'External Database Sources' name = fields.Char('Datasource name', required=True, size=64) conn_string = fields.Text('Connection string', help=""" Sample connection strings: - Microsoft SQL Server: mssql+pymssql://username:%s@server:port/dbname?charset=utf8 - MySQL: mysql://user:%s@server:port/dbname - ODBC: DRIVER={FreeTDS};SERVER=server.address;Database=mydb;UID=sa - ORACLE: username/%s@//server.address:port/instance - FireBird: host=localhost;database=mydatabase.gdb;user=sysdba;password=%s; port=3050;charset=utf8 - PostgreSQL: dbname='template1' user='dbuser' host='localhost' port='5432' \ password=%s - SQLite: sqlite:///test.db """) password = fields.Char('Password', size=40) connector = fields.Selection(CONNECTORS, 'Connector', required=True, help="If a connector is missing from the\ list, check the server log to confirm\ that the required components were\ detected.") @api.multi def conn_open(self): """The connection is open here.""" self.ensure_one() # Get dbsource record # Build the full connection string connStr = self.conn_string if self.password: if '%s' not in self.conn_string: connStr += ';PWD=%s' connStr = connStr % self.password # Try to connect if self.connector == 'cx_Oracle': os.environ['NLS_LANG'] = 'AMERICAN_AMERICA.UTF8' conn = cx_Oracle.connect(connStr) elif self.connector == 'pyodbc': conn = pyodbc.connect(connStr) elif self.connector in ('sqlite', 'mysql', 'mssql'): conn = sqlalchemy.create_engine(connStr).connect() elif self.connector == 'fdb': kwargs = dict([x.split('=') for x in connStr.split(';')]) conn = fdb.connect(**kwargs) elif self.connector == 'postgresql': conn = psycopg2.connect(connStr) return conn @api.multi def execute(self, sqlquery, sqlparams=None, metadata=False, context=None): """Executes SQL and returns a list of rows. "sqlparams" can be a dict of values, that can be referenced in the SQL statement using "%(key)s" or, in the case of Oracle, ":key". Example: sqlquery = "select * from mytable where city = %(city)s and date > %(dt)s" params = {'city': 'Lisbon', 'dt': datetime.datetime(2000, 12, 31)} If metadata=True, it will instead return a dict containing the rows list and the columns list, in the format: { 'cols': [ 'col_a', 'col_b', ...] , 'rows': [ (a0, b0, ...), (a1, b1, ...), ...] } """ rows, cols = list(), list() for obj in self: conn = obj.conn_open() if obj.connector in ["sqlite", "mysql", "mssql"]: # using sqlalchemy cur = conn.execute(sqlquery, sqlparams) if metadata: cols = cur.keys() rows = [r for r in cur] elif obj.connector in ["fdb"]: # using other db connectors cur = conn.cursor() for key in sqlparams: sqlquery = sqlquery.replace('%%(%s)s' % key, str(sqlparams[key])) cur.execute(sqlquery) rows = cur.fetchall() else: # using other db connectors cur = conn.cursor() cur.execute(sqlquery, sqlparams) if metadata: cols = [d[0] for d in cur.description] rows = cur.fetchall() conn.close() if metadata: return{'cols': cols, 'rows': rows} else: return rows @api.multi def connection_test(self): """Test of connection.""" self.ensure_one() conn = False try: conn = self.conn_open() except Exception as e: raise UserError(_("Connection test failed: \ Here is what we got instead:\n %s") % tools.ustr(e)) finally: if conn: conn.close() # TODO: if OK a (wizard) message box should be displayed raise UserError(_("Connection test succeeded: \ Everything seems properly set up!"))