You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

214 lines
9.9 KiB

  1. # -*- coding: utf-8 -*-
  2. ##############################################################################
  3. #
  4. # Daniel Reis
  5. # 2011
  6. #
  7. # This program is free software: you can redistribute it and/or modify
  8. # it under the terms of the GNU Affero General Public License as
  9. # published by the Free Software Foundation, either version 3 of the
  10. # License, or (at your option) any later version.
  11. #
  12. # This program is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. # GNU Affero General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU Affero General Public License
  18. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. #
  20. ##############################################################################
  21. import sys
  22. from datetime import datetime
  23. from osv import fields, osv
  24. import logging
  25. _logger = logging.getLogger(__name__)
  26. _loglvl = _logger.getEffectiveLevel()
  27. SEP = '|'
  28. class import_odbc_dbtable(osv.osv):
  29. _name="import.odbc.dbtable"
  30. _description = 'Import Table Data'
  31. _order = 'exec_order'
  32. _columns = {
  33. 'name': fields.char('Datasource name', required=True, size=64),
  34. 'enabled': fields.boolean('Execution enabled'),
  35. 'dbsource_id': fields.many2one('base.external.dbsource', 'Database source', required=True),
  36. 'sql_source': fields.text('SQL', required=True, help='Column names must be valid "import_data" columns.'),
  37. 'model_target': fields.many2one('ir.model','Target object'),
  38. 'noupdate': fields.boolean('No updates', help="Only create new records; disable updates to existing records."),
  39. 'exec_order': fields.integer('Execution order', help="Defines the order to perform the import"),
  40. 'last_sync': fields.datetime('Last sync date', help="Datetime for the last succesfull sync. Later changes on the source may not be replicated on the destination"),
  41. 'start_run': fields.datetime('Time started', readonly=True),
  42. 'last_run': fields.datetime('Time ended', readonly=True),
  43. 'last_record_count': fields.integer('Last record count', readonly=True),
  44. 'last_error_count': fields.integer('Last error count', readonly=True),
  45. 'last_warn_count': fields.integer('Last warning count', readonly=True),
  46. 'last_log': fields.text('Last run log', readonly=True),
  47. 'ignore_rel_errors': fields.boolean('Ignore relationship errors',
  48. help = "On error try to reimport rows ignoring relationships."),
  49. 'raise_import_errors': fields.boolean('Raise import errors',
  50. help = "Import errors not handled, intended for debugging purposes."
  51. + "\nAlso forces debug messages to be written to the server log."),
  52. }
  53. _defaults = {
  54. 'enabled': True,
  55. 'exec_order': 10,
  56. }
  57. def _import_data(self, cr, uid, flds, data, model_obj, table_obj, log):
  58. """Import data and returns error msg or empty string"""
  59. def find_m2o(field_list):
  60. """"Find index of first column with a one2many field"""
  61. for i, x in enumerate(field_list):
  62. if len(x)>3 and x[-3:] == ':id' or x[-3:] == '/id':
  63. return i
  64. return -1
  65. def append_to_log(log, level, obj_id = '', msg = '', rel_id = ''):
  66. if '_id_' in obj_id:
  67. obj_id = '.'.join(obj_id.split('_')[:-2]) + ': ' + obj_id.split('_')[-1]
  68. if ': .' in msg and not rel_id:
  69. rel_id = msg[msg.find(': .')+3:]
  70. if '_id_' in rel_id:
  71. rel_id = '.'.join(rel_id.split('_')[:-2]) + ': ' + rel_id.split('_')[-1]
  72. msg = msg[:msg.find(': .')]
  73. log['last_log'].append('%s|%s\t|%s\t|%s' % (level.ljust(5), obj_id, rel_id, msg))
  74. _logger.debug( data )
  75. cols = list(flds) #copy to avoid side effects
  76. errmsg = str()
  77. if table_obj.raise_import_errors:
  78. model_obj.import_data(cr, uid, cols, [data], noupdate=table_obj.noupdate)
  79. else:
  80. try:
  81. model_obj.import_data(cr, uid, cols, [data], noupdate=table_obj.noupdate)
  82. except:
  83. errmsg = str(sys.exc_info()[1])
  84. if errmsg and not table_obj.ignore_rel_errors:
  85. #Fail
  86. append_to_log(log, 'ERROR', data, errmsg )
  87. log['last_error_count'] += 1
  88. return False
  89. if errmsg and table_obj.ignore_rel_errors:
  90. #Warn and retry ignoring many2one fields...
  91. append_to_log(log, 'WARN', data, errmsg )
  92. log['last_warn_count'] += 1
  93. #Try ignoring each many2one (tip: in the SQL sentence select more problematic FKs first)
  94. i = find_m2o(cols)
  95. if i >= 0:
  96. #Try again without the [i] column
  97. del cols[i]
  98. del data[i]
  99. self._import_data(cr, uid, cols, data, model_obj, table_obj, log)
  100. else:
  101. #Fail
  102. append_to_log(log, 'ERROR', data, 'Removed all m2o keys and still fails.' )
  103. log['last_error_count'] += 1
  104. return False
  105. return True
  106. def import_run(self, cr, uid, ids=None, context=None):
  107. db_model = self.pool.get('base.external.dbsource')
  108. actions = self.read(cr, uid, ids, ['id', 'exec_order'])
  109. actions.sort(key = lambda x:(x['exec_order'], x['id']))
  110. #Consider each dbtable:
  111. for action_ref in actions:
  112. obj = self.browse(cr, uid, action_ref['id'])
  113. if not obj.enabled: continue #skip
  114. _logger.setLevel(obj.raise_import_errors and logging.DEBUG or _loglvl)
  115. _logger.debug('Importing %s...' % obj.name)
  116. #now() microseconds are stripped to avoid problem with SQL smalldate
  117. #TODO: convert UTC Now to local timezone (http://stackoverflow.com/questions/4770297/python-convert-utc-datetime-string-to-local-datetime)
  118. model_name = obj.model_target.model
  119. model_obj = self.pool.get(model_name)
  120. xml_prefix = model_name.replace('.', '_') + "_id_"
  121. log = {'start_run': datetime.now().replace(microsecond=0),
  122. 'last_run': None,
  123. 'last_record_count': 0,
  124. 'last_error_count': 0,
  125. 'last_warn_count': 0,
  126. 'last_log': list()}
  127. self.write(cr, uid, [obj.id], log)
  128. #Prepare SQL sentence; replace "%s" with the last_sync date
  129. if obj.last_sync: sync = datetime.strptime(obj.last_sync, "%Y-%m-%d %H:%M:%S")
  130. else: sync = datetime.datetime(1900, 1, 1, 0, 0, 0)
  131. params = {'sync': sync}
  132. res = db_model.execute(cr, uid, [obj.dbsource_id.id], obj.sql_source, params, metadata=True)
  133. #Exclude columns titled "None"; add (xml_)"id" column
  134. cidx = [i for i, x in enumerate(res['cols']) if x.upper() != 'NONE']
  135. cols = [x for i, x in enumerate(res['cols']) if x.upper() != 'NONE'] + ['id']
  136. #Import each row:
  137. for row in res['rows']:
  138. #Build data row; import only columns present in the "cols" list
  139. data = list()
  140. for i in cidx:
  141. #TODO: Handle imported datetimes properly - convert from localtime to UTC!
  142. v = row[i]
  143. if isinstance(v, str): v = v.strip()
  144. data.append(v)
  145. data.append( xml_prefix + str(row[0]).strip() )
  146. #Import the row; on error, write line to the log
  147. log['last_record_count'] += 1
  148. self._import_data(cr, uid, cols, data, model_obj, obj, log)
  149. if log['last_record_count'] % 500 == 0:
  150. _logger.info('...%s rows processed...' % (log['last_record_count']) )
  151. #Finished importing all rows
  152. #If no errors, write new sync date
  153. if not (log['last_error_count'] or log['last_warn_count']):
  154. log['last_sync'] = log['start_run']
  155. level = logging.DEBUG
  156. if log['last_warn_count']: level = logging.WARN
  157. if log['last_error_count']: level = logging.ERROR
  158. _logger.log(level, 'Imported %s , %d rows, %d errors, %d warnings.' % (
  159. model_name, log['last_record_count'], log['last_error_count'] ,
  160. log['last_warn_count'] ) )
  161. #Write run log, either if the table import is active or inactive
  162. if log['last_log']:
  163. log['last_log'].insert(0, 'LEVEL|== Line == |== Relationship ==|== Message ==')
  164. log.update( {'last_log': '\n'.join(log['last_log'])} )
  165. log.update({ 'last_run': datetime.now().replace(microsecond=0) }) #second=0,
  166. self.write(cr, uid, [obj.id], log)
  167. #Finished
  168. _logger.debug('Import job FINISHED.')
  169. return True
  170. def import_schedule(self, cr, uid, ids, context=None):
  171. cron_obj = self.pool.get('ir.cron')
  172. new_create_id = cron_obj.create(cr, uid, {
  173. 'name': 'Import ODBC tables',
  174. 'interval_type': 'hours',
  175. 'interval_number': 1,
  176. 'numbercall': -1,
  177. 'model': 'import.odbc.dbtable',
  178. 'function': 'import_run',
  179. 'doall': False,
  180. 'active': True
  181. })
  182. return {
  183. 'name': 'Import ODBC tables',
  184. 'view_type': 'form',
  185. 'view_mode': 'form,tree',
  186. 'res_model': 'ir.cron',
  187. 'res_id': new_create_id,
  188. 'type': 'ir.actions.act_window',
  189. }
  190. import_odbc_dbtable()