You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

161 lines
6.0 KiB

  1. # -*- coding: utf-8 -*-
  2. # © 2014-2016 Akretion (Alexis de Lattre <alexis.delattre@akretion.com>)
  3. # © 2014 Lorenzo Battistini <lorenzo.battistini@agilebg.com>
  4. # © 2016 Pedro M. Baeza <pedro.baeza@serviciosbaeza.com>
  5. # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
  6. from odoo import _, api, fields, models, tools
  7. from odoo.exceptions import UserError
  8. import requests
  9. import tempfile
  10. import StringIO
  11. import zipfile
  12. import os
  13. import logging
  14. try:
  15. import unicodecsv
  16. except ImportError:
  17. unicodecsv = None
  18. logger = logging.getLogger(__name__)
  19. class BetterZipGeonamesImport(models.TransientModel):
  20. _name = 'better.zip.geonames.import'
  21. _description = 'Import Better Zip from Geonames'
  22. _rec_name = 'country_id'
  23. country_id = fields.Many2one('res.country', 'Country', required=True)
  24. letter_case = fields.Selection([
  25. ('unchanged', 'Unchanged'),
  26. ('title', 'Title Case'),
  27. ('upper', 'Upper Case'),
  28. ], string='Letter Case', default='unchanged',
  29. help="Converts retreived city and state names to Title Case "
  30. "(upper case on each first letter of a word) or Upper Case "
  31. "(all letters upper case).")
  32. @api.model
  33. def transform_city_name(self, city, country):
  34. """Override it for transforming city name (if needed)
  35. :param city: Original city name
  36. :param country: Country record
  37. :return: Transformed city name
  38. """
  39. return city
  40. @api.model
  41. def _domain_search_better_zip(self, row, country):
  42. return [('name', '=', row[1]),
  43. ('city', '=', self.transform_city_name(row[2], country)),
  44. ('country_id', '=', country.id)]
  45. @api.model
  46. def _prepare_better_zip(self, row, country):
  47. state = self.select_or_create_state(row, country)
  48. vals = {
  49. 'name': row[1],
  50. 'city': self.transform_city_name(row[2], country),
  51. 'state_id': state.id,
  52. 'country_id': country.id,
  53. 'latitude': row[9],
  54. 'longitude': row[10],
  55. }
  56. return vals
  57. @api.model
  58. def create_better_zip(self, row, country):
  59. if row[0] != country.code:
  60. raise UserError(
  61. _("The country code inside the file (%s) doesn't "
  62. "correspond to the selected country (%s).")
  63. % (row[0], country.code))
  64. logger.debug('ZIP = %s - City = %s' % (row[1], row[2]))
  65. if self.letter_case == 'title':
  66. row[2] = row[2].title()
  67. row[3] = row[3].title()
  68. elif self.letter_case == 'upper':
  69. row[2] = row[2].upper()
  70. row[3] = row[3].upper()
  71. if row[1] and row[2]:
  72. zip_model = self.env['res.better.zip']
  73. zips = zip_model.search(self._domain_search_better_zip(
  74. row, country))
  75. if zips:
  76. return zips[0]
  77. else:
  78. vals = self._prepare_better_zip(row, country)
  79. if vals:
  80. return zip_model.create(vals)
  81. else: # pragma: no cover
  82. return False
  83. @tools.ormcache('country_id', 'code')
  84. def _get_state(self, country_id, code, name):
  85. state = self.env['res.country.state'].search(
  86. [('country_id', '=', country_id),
  87. ('code', '=', code)], limit=1,
  88. )
  89. if state: # pragma: no cover
  90. return state
  91. else:
  92. return self.env['res.country.state'].create({
  93. 'name': name,
  94. 'code': code,
  95. 'country_id': country_id,
  96. })
  97. @api.model
  98. def select_or_create_state(
  99. self, row, country, code_row_index=4, name_row_index=3):
  100. if country.geonames_state_code_column:
  101. code_row_index = country.geonames_state_code_column
  102. if country.geonames_state_name_column:
  103. name_row_index = country.geonames_state_name_column
  104. return self._get_state(
  105. country.id, row[code_row_index], row[name_row_index],
  106. )
  107. @api.multi
  108. def run_import(self):
  109. self.ensure_one()
  110. zip_model = self.env['res.better.zip']
  111. country_code = self.country_id.code
  112. config_url = self.env['ir.config_parameter'].get_param(
  113. 'geonames.url',
  114. default='http://download.geonames.org/export/zip/%s.zip')
  115. url = config_url % country_code
  116. logger.info('Starting to download %s' % url)
  117. res_request = requests.get(url)
  118. if res_request.status_code != requests.codes.ok:
  119. raise UserError(
  120. _('Got an error %d when trying to download the file %s.')
  121. % (res_request.status_code, url))
  122. # Store current record list
  123. zips_to_delete = zip_model.search(
  124. [('country_id', '=', self.country_id.id)])
  125. f_geonames = zipfile.ZipFile(StringIO.StringIO(res_request.content))
  126. tempdir = tempfile.mkdtemp(prefix='openerp')
  127. f_geonames.extract('%s.txt' % country_code, tempdir)
  128. logger.info('The geonames zipfile has been decompressed')
  129. data_file = open(os.path.join(tempdir, '%s.txt' % country_code), 'r')
  130. data_file.seek(0)
  131. logger.info('Starting to create the better zip entries')
  132. max_import = self.env.context.get('max_import', 0)
  133. reader = unicodecsv.reader(data_file, encoding='utf-8', delimiter=' ')
  134. for i, row in enumerate(reader):
  135. zip_code = self.create_better_zip(row, self.country_id)
  136. if zip_code in zips_to_delete:
  137. zips_to_delete -= zip_code
  138. if max_import and (i + 1) == max_import:
  139. break
  140. data_file.close()
  141. if zips_to_delete and not max_import:
  142. zips_to_delete.unlink()
  143. logger.info('%d better zip entries deleted for country %s' %
  144. (len(zips_to_delete), self.country_id.name))
  145. logger.info(
  146. 'The wizard to create better zip entries from geonames '
  147. 'has been successfully completed.')
  148. return True