You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

230 lines
8.9 KiB

  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 Akretion (Alexis de Lattre
  3. # <alexis.delattre@akretion.com>)
  4. # Copyright 2014 Lorenzo Battistini <lorenzo.battistini@agilebg.com>
  5. # Copyright 2016 Pedro M. Baeza <pedro.baeza@tecnativa.com>
  6. # Copyright 2017 Eficent Business and IT Consulting Services, S.L.
  7. # <contact@eficent.com>
  8. # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
  9. from odoo import _, api, fields, models, tools
  10. from odoo.exceptions import UserError
  11. import requests
  12. import tempfile
  13. import io
  14. import zipfile
  15. import os
  16. import logging
  17. import csv
  18. logger = logging.getLogger(__name__)
  19. class BetterZipGeonamesImport(models.TransientModel):
  20. _name = 'better.zip.geonames.import'
  21. _description = 'Import Better Zip from Geonames'
  22. _rec_name = 'country_id'
  23. country_id = fields.Many2one('res.country', 'Country', required=True)
  24. enforce_cities = fields.Boolean(string='Enforce Cities',
  25. help='The city will be created as a '
  26. 'separate entity.',
  27. related='country_id.enforce_cities',
  28. readonly=True)
  29. letter_case = fields.Selection([
  30. ('unchanged', 'Unchanged'),
  31. ('title', 'Title Case'),
  32. ('upper', 'Upper Case'),
  33. ], string='Letter Case', default='unchanged',
  34. help="Converts retreived city and state names to Title Case "
  35. "(upper case on each first letter of a word) or Upper Case "
  36. "(all letters upper case).")
  37. @api.model
  38. def transform_city_name(self, city, country):
  39. """Override it for transforming city name (if needed)
  40. :param city: Original city name
  41. :param country: Country record
  42. :return: Transformed city name
  43. """
  44. return city
  45. @api.model
  46. def _domain_search_res_city(self, row, country):
  47. return [('name', '=', self.transform_city_name(row[2], country)),
  48. ('country_id', '=', country.id)]
  49. @api.model
  50. def _domain_search_better_zip(self, row, country, res_city):
  51. domain = [('name', '=', row[1]),
  52. ('city', '=', self.transform_city_name(row[2], country)),
  53. ('country_id', '=', country.id)]
  54. if res_city:
  55. domain += [('city_id', '=', res_city.id)]
  56. return domain
  57. @api.model
  58. def _prepare_res_city(self, row, country):
  59. state = self.select_or_create_state(row, country)
  60. vals = {
  61. 'name': self.transform_city_name(row[2], country),
  62. 'state_id': state.id,
  63. 'country_id': country.id,
  64. }
  65. return vals
  66. @api.model
  67. def _prepare_better_zip(self, row, country, res_city):
  68. state = self.select_or_create_state(row, country)
  69. city_name = self.transform_city_name(row[2], country)
  70. vals = {
  71. 'name': row[1],
  72. 'city_id': res_city and res_city.id or False,
  73. 'city': res_city and res_city.name or city_name,
  74. 'state_id': state.id,
  75. 'country_id': country.id,
  76. 'latitude': row[9],
  77. 'longitude': row[10],
  78. }
  79. return vals
  80. @api.model
  81. def create_better_zip(self, row, country, res_city):
  82. if row[0] != country.code:
  83. raise UserError(
  84. _("The country code inside the file (%s) doesn't "
  85. "correspond to the selected country (%s).")
  86. % (row[0], country.code))
  87. logger.debug('ZIP = %s - City = %s' % (row[1], row[2]))
  88. if self.letter_case == 'title':
  89. row[2] = row[2].title()
  90. row[3] = row[3].title()
  91. elif self.letter_case == 'upper':
  92. row[2] = row[2].upper()
  93. row[3] = row[3].upper()
  94. if row[1] and row[2]:
  95. zip_model = self.env['res.better.zip']
  96. zips = zip_model.search(self._domain_search_better_zip(
  97. row, country, res_city))
  98. if zips:
  99. return zips[0]
  100. else:
  101. vals = self._prepare_better_zip(row, country, res_city)
  102. if vals:
  103. logger.debug('Creating res.better.zip %s' % vals['name'])
  104. return zip_model.create(vals)
  105. else: # pragma: no cover
  106. return False
  107. @api.model
  108. def create_res_city(self, row, country):
  109. if row[0] != country.code:
  110. raise UserError(
  111. _("The country code inside the file (%s) doesn't "
  112. "correspond to the selected country (%s).")
  113. % (row[0], country.code))
  114. logger.debug('Processing city creation for ZIP = %s - City = %s' %
  115. (row[1], row[2]))
  116. if self.letter_case == 'title':
  117. row[2] = row[2].title()
  118. row[3] = row[3].title()
  119. elif self.letter_case == 'upper':
  120. row[2] = row[2].upper()
  121. row[3] = row[3].upper()
  122. if row[2]:
  123. res_city_model = self.env['res.city']
  124. res_cities = res_city_model.search(self._domain_search_res_city(
  125. row, country))
  126. if res_cities:
  127. return res_cities[0]
  128. else:
  129. vals = self._prepare_res_city(row, country)
  130. if vals:
  131. logger.debug('Creating res.city %s' % vals['name'])
  132. return res_city_model.create(vals)
  133. else: # pragma: no cover
  134. return False
  135. @tools.ormcache('country_id', 'code')
  136. def _get_state(self, country_id, code, name):
  137. state = self.env['res.country.state'].search(
  138. [('country_id', '=', country_id),
  139. ('code', '=', code)], limit=1,
  140. )
  141. if state: # pragma: no cover
  142. return state
  143. else:
  144. return self.env['res.country.state'].create({
  145. 'name': name,
  146. 'code': code,
  147. 'country_id': country_id,
  148. })
  149. @api.model
  150. def select_or_create_state(
  151. self, row, country, code_row_index=4, name_row_index=3):
  152. return self._get_state(
  153. country.id, row[code_row_index], row[name_row_index],
  154. )
  155. @api.multi
  156. def run_import(self):
  157. self.ensure_one()
  158. zip_model = self.env['res.better.zip']
  159. res_city_model = self.env['res.city']
  160. country_code = self.country_id.code
  161. config_url = self.env['ir.config_parameter'].get_param(
  162. 'geonames.url',
  163. default='http://download.geonames.org/export/zip/%s.zip')
  164. url = config_url % country_code
  165. logger.info('Starting to download %s' % url)
  166. res_request = requests.get(url)
  167. if res_request.status_code != requests.codes.ok:
  168. raise UserError(
  169. _('Got an error %d when trying to download the file %s.')
  170. % (res_request.status_code, url))
  171. # Store current record list
  172. res_cities_to_delete = res_city_model
  173. zips_to_delete = zip_model.search(
  174. [('country_id', '=', self.country_id.id)])
  175. if self.enforce_cities:
  176. res_cities_to_delete = res_city_model.search(
  177. [('country_id', '=', self.country_id.id)])
  178. f_geonames = zipfile.ZipFile(io.BytesIO(res_request.content))
  179. tempdir = tempfile.mkdtemp(prefix='odoo')
  180. f_geonames.extract('%s.txt' % country_code, tempdir)
  181. logger.info('The geonames zipfile has been decompressed')
  182. data_file = open(os.path.join(tempdir, '%s.txt' % country_code), 'r',
  183. encoding='utf-8')
  184. data_file.seek(0)
  185. logger.info('Starting to create the cities and/or better zip entries')
  186. max_import = self.env.context.get('max_import', 0)
  187. reader = csv.reader(data_file, delimiter=' ')
  188. for i, row in enumerate(reader):
  189. res_city = False
  190. if self.enforce_cities:
  191. res_city = self.create_res_city(row, self.country_id)
  192. if res_city in res_cities_to_delete:
  193. res_cities_to_delete -= res_city
  194. zip_code = self.create_better_zip(row, self.country_id,
  195. res_city)
  196. if zip_code in zips_to_delete:
  197. zips_to_delete -= zip_code
  198. if max_import and (i + 1) == max_import:
  199. break
  200. data_file.close()
  201. if zips_to_delete and not max_import:
  202. zips_to_delete.unlink()
  203. logger.info('%d better zip entries deleted for country %s' %
  204. (len(zips_to_delete), self.country_id.name))
  205. if res_cities_to_delete and not max_import:
  206. res_cities_to_delete.unlink()
  207. logger.info('%d res.city entries deleted for country %s' %
  208. (len(res_cities_to_delete), self.country_id.name))
  209. logger.info(
  210. 'The wizard to create cities and/or better zip entries from '
  211. 'geonames has been successfully completed.')
  212. return True