You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

180 lines
6.6 KiB

# -*- coding: utf-8 -*-
##############################################################################
#
# Daniel Reis
# 2011
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
import os
import logging
import psycopg2
from openerp import models, fields, api, _
from openerp.exceptions import Warning as UserError
import openerp.tools as tools
_logger = logging.getLogger(__name__)
CONNECTORS = []
try:
import sqlalchemy
CONNECTORS.append(('sqlite', 'SQLite'))
try:
import pymssql
CONNECTORS.append(('mssql', 'Microsoft SQL Server'))
assert pymssql
except (ImportError, AssertionError):
_logger.info('MS SQL Server not available. Please install "pymssql"\
python package.')
try:
import MySQLdb
CONNECTORS.append(('mysql', 'MySQL'))
assert MySQLdb
except (ImportError, AssertionError):
_logger.info('MySQL not available. Please install "mysqldb"\
python package.')
except:
_logger.info('SQL Alchemy not available. Please install "slqalchemy"\
python package.')
try:
import pyodbc
CONNECTORS.append(('pyodbc', 'ODBC'))
except:
_logger.info('ODBC libraries not available. Please install "unixodbc"\
and "python-pyodbc" packages.')
try:
import cx_Oracle
CONNECTORS.append(('cx_Oracle', 'Oracle'))
except:
_logger.info('Oracle libraries not available. Please install "cx_Oracle"\
python package.')
CONNECTORS.append(('postgresql', 'PostgreSQL'))
class BaseExternalDbsource(models.Model):
_name = "base.external.dbsource"
_description = 'External Database Sources'
name = fields.Char('Datasource name', required=True, size=64)
conn_string = fields.Text('Connection string', help="""
Sample connection strings:
- Microsoft SQL Server:
mssql+pymssql://username:%s@server:port/dbname?charset=utf8
- MySQL: mysql://user:%s@server:port/dbname
- ODBC: DRIVER={FreeTDS};SERVER=server.address;Database=mydb;UID=sa
- ORACLE: username/%s@//server.address:port/instance
- PostgreSQL:
dbname='template1' user='dbuser' host='localhost' port='5432' \
password=%s
- SQLite: sqlite:///test.db
""")
password = fields.Char('Password', size=40)
connector = fields.Selection(CONNECTORS, 'Connector', required=True,
help="If a connector is missing from the\
list, check the server log to confirm\
that the required components were\
detected.")
@api.multi
def conn_open(self):
"""The connection is open here."""
self.ensure_one()
# Get dbsource record
# data = self.browse(cr, uid, id1)
# Build the full connection string
connStr = self.conn_string
if self.password:
if '%s' not in self.conn_string:
connStr += ';PWD=%s'
connStr = connStr % self.password
# Try to connect
if self.connector == 'cx_Oracle':
os.environ['NLS_LANG'] = 'AMERICAN_AMERICA.UTF8'
conn = cx_Oracle.connect(connStr)
elif self.connector == 'pyodbc':
conn = pyodbc.connect(connStr)
elif self.connector in ('sqlite', 'mysql', 'mssql'):
conn = sqlalchemy.create_engine(connStr).connect()
elif self.connector == 'postgresql':
conn = psycopg2.connect(connStr)
return conn
@api.multi
def execute(self, sqlquery, sqlparams=None, metadata=False,
context=None):
"""Executes SQL and returns a list of rows.
"sqlparams" can be a dict of values, that can be referenced in
the SQL statement using "%(key)s" or, in the case of Oracle,
":key".
Example:
sqlquery = "select * from mytable where city = %(city)s and
date > %(dt)s"
params = {'city': 'Lisbon',
'dt': datetime.datetime(2000, 12, 31)}
If metadata=True, it will instead return a dict containing the
rows list and the columns list, in the format:
{ 'cols': [ 'col_a', 'col_b', ...]
, 'rows': [ (a0, b0, ...), (a1, b1, ...), ...] }
"""
# data = self.browse(cr, uid, ids)
rows, cols = list(), list()
for obj in self:
conn = obj.conn_open()
if obj.connector in ["sqlite", "mysql", "mssql"]:
# using sqlalchemy
cur = conn.execute(sqlquery, sqlparams)
if metadata:
cols = cur.keys()
rows = [r for r in cur]
else:
# using other db connectors
cur = conn.cursor()
cur.execute(sqlquery, sqlparams)
if metadata:
cols = [d[0] for d in cur.description]
rows = cur.fetchall()
conn.close()
if metadata:
return{'cols': cols, 'rows': rows}
else:
return rows
@api.multi
def connection_test(self):
"""Test of connection."""
for obj in self:
conn = False
try:
conn = self.conn_open()
except Exception as e:
raise UserError(_("Connection test failed: \
Here is what we got instead:\n %s") % tools.ustr(e))
finally:
try:
if conn:
conn.close()
except Exception:
# ignored, just a consequence of the previous exception
pass
# TODO: if OK a (wizard) message box should be displayed
raise UserError(_("Connection test succeeded: \
Everything seems properly set up!"))