You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

259 lines
11 KiB

10 years ago
10 years ago
10 years ago
10 years ago
  1. # -*- coding: utf-8 -*-
  2. ##############################################################################
  3. #
  4. # Daniel Reis
  5. # 2011
  6. #
  7. # This program is free software: you can redistribute it and/or modify
  8. # it under the terms of the GNU Affero General Public License as
  9. # published by the Free Software Foundation, either version 3 of the
  10. # License, or (at your option) any later version.
  11. #
  12. # This program is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. # GNU Affero General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU Affero General Public License
  18. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. #
  20. ##############################################################################
  21. import sys
  22. from datetime import datetime
  23. from openerp.osv import orm, fields
  24. import logging
  25. _logger = logging.getLogger(__name__)
  26. _loglvl = _logger.getEffectiveLevel()
  27. SEP = '|'
  28. class import_odbc_dbtable(orm.Model):
  29. _name = "import.odbc.dbtable"
  30. _description = 'Import Table Data'
  31. _order = 'exec_order'
  32. _columns = {
  33. 'name': fields.char('Datasource name', required=True, size=64),
  34. 'enabled': fields.boolean('Execution enabled'),
  35. 'dbsource_id': fields.many2one('base.external.dbsource',
  36. 'Database source',
  37. required=True),
  38. 'sql_source': fields.text('SQL',
  39. required=True,
  40. help='Column names must be valid '
  41. '"import_data" columns.'),
  42. 'model_target': fields.many2one('ir.model', 'Target object'),
  43. 'noupdate': fields.boolean('No updates',
  44. help="Only create new records; disable "
  45. "updates to existing records."),
  46. 'exec_order': fields.integer('Execution order',
  47. help="Defines the order to perform "
  48. "the import"),
  49. 'last_sync': fields.datetime(
  50. 'Last sync date',
  51. help="Datetime for the last succesfull sync.\n"
  52. "Later changes on the source may not be replicated "
  53. "on the destination"),
  54. 'start_run': fields.datetime('Time started',
  55. readonly=True),
  56. 'last_run': fields.datetime('Time ended',
  57. readonly=True),
  58. 'last_record_count': fields.integer('Last record count',
  59. readonly=True),
  60. 'last_error_count': fields.integer('Last error count',
  61. readonly=True),
  62. 'last_warn_count': fields.integer('Last warning count',
  63. readonly=True),
  64. 'last_log': fields.text('Last run log', readonly=True),
  65. 'ignore_rel_errors': fields.boolean(
  66. 'Ignore relationship errors',
  67. help="On error try to reimport rows ignoring relationships."),
  68. 'raise_import_errors': fields.boolean(
  69. 'Raise import errors',
  70. help="Import errors not handled, intended for debugging purposes."
  71. "\nAlso forces debug messages to be written to the server log."),
  72. }
  73. _defaults = {
  74. 'enabled': True,
  75. 'exec_order': 10,
  76. }
  77. def _import_data(self, cr, uid, flds, data, model_obj, table_obj, log):
  78. """Import data and returns error msg or empty string"""
  79. def find_m2o(field_list):
  80. """Find index of first column with a one2many field"""
  81. for i, x in enumerate(field_list):
  82. if len(x) > 3 and x[-3:] == ':id' or x[-3:] == '/id':
  83. return i
  84. return -1
  85. def append_to_log(log, level, obj_id='', msg='', rel_id=''):
  86. if '_id_' in obj_id:
  87. obj_id = ('.'.join(obj_id.split('_')[:-2]) +
  88. ': ' + obj_id.split('_')[-1])
  89. if ': .' in msg and not rel_id:
  90. rel_id = msg[msg.find(': .') + 3:]
  91. if '_id_' in rel_id:
  92. rel_id = ('.'.join(rel_id.split('_')[:-2]) +
  93. ': ' + rel_id.split('_')[-1])
  94. msg = msg[:msg.find(': .')]
  95. log['last_log'].append('%s|%s\t|%s\t|%s' % (level.ljust(5),
  96. obj_id,
  97. rel_id,
  98. msg))
  99. _logger.debug(data)
  100. cols = list(flds) # copy to avoid side effects
  101. errmsg = str()
  102. if table_obj.raise_import_errors:
  103. model_obj.import_data(cr, uid, cols, [data],
  104. noupdate=table_obj.noupdate)
  105. else:
  106. try:
  107. model_obj.import_data(cr, uid, cols, [data],
  108. noupdate=table_obj.noupdate)
  109. except:
  110. errmsg = str(sys.exc_info()[1])
  111. if errmsg and not table_obj.ignore_rel_errors:
  112. # Fail
  113. append_to_log(log, 'ERROR', data, errmsg)
  114. log['last_error_count'] += 1
  115. return False
  116. if errmsg and table_obj.ignore_rel_errors:
  117. # Warn and retry ignoring many2one fields...
  118. append_to_log(log, 'WARN', data, errmsg)
  119. log['last_warn_count'] += 1
  120. # Try ignoring each many2one (tip: in the SQL sentence select more
  121. # problematic FKs first)
  122. i = find_m2o(cols)
  123. if i >= 0:
  124. # Try again without the [i] column
  125. del cols[i]
  126. del data[i]
  127. self._import_data(cr, uid, cols,
  128. data,
  129. model_obj,
  130. table_obj,
  131. log)
  132. else:
  133. # Fail
  134. append_to_log(log, 'ERROR', data,
  135. 'Removed all m2o keys and still fails.')
  136. log['last_error_count'] += 1
  137. return False
  138. return True
  139. def import_run(self, cr, uid, ids=None, context=None):
  140. db_model = self.pool.get('base.external.dbsource')
  141. actions = self.read(cr, uid, ids, ['id', 'exec_order'])
  142. actions.sort(key=lambda x: (x['exec_order'], x['id']))
  143. # Consider each dbtable:
  144. for action_ref in actions:
  145. obj = self.browse(cr, uid, action_ref['id'])
  146. if not obj.enabled:
  147. continue # skip
  148. _logger.setLevel(obj.raise_import_errors and
  149. logging.DEBUG or
  150. _loglvl)
  151. _logger.debug('Importing %s...', obj.name)
  152. # now() microseconds are stripped to avoid problem with SQL
  153. # smalldate
  154. # TODO: convert UTC Now to local timezone
  155. # http://stackoverflow.com/questions/4770297
  156. model_name = obj.model_target.model
  157. model_obj = self.pool.get(model_name)
  158. xml_prefix = model_name.replace('.', '_') + "_id_"
  159. log = {'start_run': datetime.now().replace(microsecond=0),
  160. 'last_run': None,
  161. 'last_record_count': 0,
  162. 'last_error_count': 0,
  163. 'last_warn_count': 0,
  164. 'last_log': list()}
  165. self.write(cr, uid, [obj.id], log)
  166. # Prepare SQL sentence; replace "%s" with the last_sync date
  167. if obj.last_sync:
  168. sync = datetime.strptime(obj.last_sync, "%Y-%m-%d %H:%M:%S")
  169. else:
  170. sync = datetime.datetime(1900, 1, 1, 0, 0, 0)
  171. params = {'sync': sync}
  172. res = db_model.execute(cr, uid, [obj.dbsource_id.id],
  173. obj.sql_source, params, metadata=True)
  174. # Exclude columns titled "None"; add (xml_)"id" column
  175. cidx = [i for i, x in enumerate(res['cols'])
  176. if x.upper() != 'NONE']
  177. cols = [x for i, x in enumerate(res['cols'])
  178. if x.upper() != 'NONE'] + ['id']
  179. # Import each row:
  180. for row in res['rows']:
  181. # Build data row; import only columns present in the "cols"
  182. # list
  183. data = list()
  184. for i in cidx:
  185. # TODO: Handle imported datetimes properly - convert from
  186. # localtime to UTC!
  187. v = row[i]
  188. if isinstance(v, str):
  189. v = v.strip()
  190. data.append(v)
  191. data.append(xml_prefix + str(row[0]).strip())
  192. # Import the row; on error, write line to the log
  193. log['last_record_count'] += 1
  194. self._import_data(cr, uid, cols, data, model_obj, obj, log)
  195. if log['last_record_count'] % 500 == 0:
  196. _logger.info('...%s rows processed...',
  197. (log['last_record_count']))
  198. # Finished importing all rows
  199. # If no errors, write new sync date
  200. if not (log['last_error_count'] or log['last_warn_count']):
  201. log['last_sync'] = log['start_run']
  202. level = logging.DEBUG
  203. if log['last_warn_count']:
  204. level = logging.WARN
  205. if log['last_error_count']:
  206. level = logging.ERROR
  207. _logger.log(level,
  208. 'Imported %s , %d rows, %d errors, %d warnings.',
  209. model_name,
  210. log['last_record_count'],
  211. log['last_error_count'],
  212. log['last_warn_count'])
  213. # Write run log, either if the table import is active or inactive
  214. if log['last_log']:
  215. _line = 'LEVEL|== Line == |== Relationship ==|== Message =='
  216. log['last_log'].insert(0, _line)
  217. log.update({'last_log': '\n'.join(log['last_log'])})
  218. log.update({'last_run': datetime.now().replace(microsecond=0)})
  219. self.write(cr, uid, [obj.id], log)
  220. # Finished
  221. _logger.debug('Import job FINISHED.')
  222. return True
  223. def import_schedule(self, cr, uid, ids, context=None):
  224. cron_obj = self.pool.get('ir.cron')
  225. new_create_id = cron_obj.create(cr, uid, {
  226. 'name': 'Import ODBC tables',
  227. 'interval_type': 'hours',
  228. 'interval_number': 1,
  229. 'numbercall': -1,
  230. 'model': 'import.odbc.dbtable',
  231. 'function': 'import_run',
  232. 'doall': False,
  233. 'active': True
  234. })
  235. return {
  236. 'name': 'Import ODBC tables',
  237. 'view_type': 'form',
  238. 'view_mode': 'form,tree',
  239. 'res_model': 'ir.cron',
  240. 'res_id': new_create_id,
  241. 'type': 'ir.actions.act_window',
  242. }