You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

234 lines
9.1 KiB

  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 Akretion (Alexis de Lattre
  3. # <alexis.delattre@akretion.com>)
  4. # Copyright 2014 Lorenzo Battistini <lorenzo.battistini@agilebg.com>
  5. # Copyright 2016 Pedro M. Baeza <pedro.baeza@tecnativa.com>
  6. # Copyright 2017 Eficent Business and IT Consulting Services, S.L.
  7. # <contact@eficent.com>
  8. # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
  9. from odoo import _, api, fields, models, tools
  10. from odoo.exceptions import UserError
  11. import requests
  12. import tempfile
  13. import io
  14. import zipfile
  15. import os
  16. import logging
  17. import csv
  18. logger = logging.getLogger(__name__)
  19. class BetterZipGeonamesImport(models.TransientModel):
  20. _name = 'better.zip.geonames.import'
  21. _description = 'Import Better Zip from Geonames'
  22. _rec_name = 'country_id'
  23. country_id = fields.Many2one('res.country', 'Country', required=True)
  24. enforce_cities = fields.Boolean(string='Enforce Cities',
  25. help='The city will be created as a '
  26. 'separate entity.',
  27. related='country_id.enforce_cities',
  28. readonly=True)
  29. letter_case = fields.Selection([
  30. ('unchanged', 'Unchanged'),
  31. ('title', 'Title Case'),
  32. ('upper', 'Upper Case'),
  33. ], string='Letter Case', default='unchanged',
  34. help="Converts retreived city and state names to Title Case "
  35. "(upper case on each first letter of a word) or Upper Case "
  36. "(all letters upper case).")
  37. @api.model
  38. def transform_city_name(self, city, country):
  39. """Override it for transforming city name (if needed)
  40. :param city: Original city name
  41. :param country: Country record
  42. :return: Transformed city name
  43. """
  44. return city
  45. @api.model
  46. def _domain_search_res_city(self, row, country):
  47. return [('name', '=', self.transform_city_name(row[2], country)),
  48. ('country_id', '=', country.id)]
  49. @api.model
  50. def _domain_search_better_zip(self, row, country, res_city):
  51. domain = [('name', '=', row[1]),
  52. ('city', '=', self.transform_city_name(row[2], country)),
  53. ('country_id', '=', country.id)]
  54. if res_city:
  55. domain += [('city_id', '=', res_city.id)]
  56. return domain
  57. @api.model
  58. def _prepare_res_city(self, row, country):
  59. state = self.select_or_create_state(row, country)
  60. vals = {
  61. 'name': self.transform_city_name(row[2], country),
  62. 'state_id': state.id,
  63. 'country_id': country.id,
  64. }
  65. return vals
  66. @api.model
  67. def _prepare_better_zip(self, row, country, res_city):
  68. state = self.select_or_create_state(row, country)
  69. city_name = self.transform_city_name(row[2], country)
  70. vals = {
  71. 'name': row[1],
  72. 'city_id': res_city and res_city.id or False,
  73. 'city': res_city and res_city.name or city_name,
  74. 'state_id': state.id,
  75. 'country_id': country.id,
  76. 'latitude': row[9],
  77. 'longitude': row[10],
  78. }
  79. return vals
  80. @api.model
  81. def create_better_zip(self, row, country, res_city):
  82. if row[0] != country.code:
  83. raise UserError(
  84. _("The country code inside the file (%s) doesn't "
  85. "correspond to the selected country (%s).")
  86. % (row[0], country.code))
  87. logger.debug('ZIP = %s - City = %s' % (row[1], row[2]))
  88. if self.letter_case == 'title':
  89. row[2] = row[2].title()
  90. row[3] = row[3].title()
  91. elif self.letter_case == 'upper':
  92. row[2] = row[2].upper()
  93. row[3] = row[3].upper()
  94. if row[1] and row[2]:
  95. zip_model = self.env['res.better.zip']
  96. zips = zip_model.search(self._domain_search_better_zip(
  97. row, country, res_city))
  98. if zips:
  99. return zips[0]
  100. else:
  101. vals = self._prepare_better_zip(row, country, res_city)
  102. if vals:
  103. logger.debug('Creating res.better.zip %s' % vals['name'])
  104. return zip_model.create(vals)
  105. else: # pragma: no cover
  106. return False
  107. @api.model
  108. def create_res_city(self, row, country):
  109. if row[0] != country.code:
  110. raise UserError(
  111. _("The country code inside the file (%s) doesn't "
  112. "correspond to the selected country (%s).")
  113. % (row[0], country.code))
  114. logger.debug('Processing city creation for ZIP = %s - City = %s' %
  115. (row[1], row[2]))
  116. if self.letter_case == 'title':
  117. row[2] = row[2].title()
  118. row[3] = row[3].title()
  119. elif self.letter_case == 'upper':
  120. row[2] = row[2].upper()
  121. row[3] = row[3].upper()
  122. if row[2]:
  123. res_city_model = self.env['res.city']
  124. res_cities = res_city_model.search(self._domain_search_res_city(
  125. row, country))
  126. if res_cities:
  127. return res_cities[0]
  128. else:
  129. vals = self._prepare_res_city(row, country)
  130. if vals:
  131. logger.debug('Creating res.city %s' % vals['name'])
  132. return res_city_model.create(vals)
  133. else: # pragma: no cover
  134. return False
  135. @tools.ormcache('country_id', 'code')
  136. def _get_state(self, country_id, code, name):
  137. state = self.env['res.country.state'].search(
  138. [('country_id', '=', country_id),
  139. ('code', '=', code)], limit=1,
  140. )
  141. if state: # pragma: no cover
  142. return state
  143. else:
  144. return self.env['res.country.state'].create({
  145. 'name': name,
  146. 'code': code,
  147. 'country_id': country_id,
  148. })
  149. @api.model
  150. def select_or_create_state(
  151. self, row, country, code_row_index=4, name_row_index=3):
  152. if country.geonames_state_code_column:
  153. code_row_index = country.geonames_state_code_column
  154. if country.geonames_state_name_column:
  155. name_row_index = country.geonames_state_name_column
  156. return self._get_state(
  157. country.id, row[code_row_index], row[name_row_index],
  158. )
  159. @api.multi
  160. def run_import(self):
  161. self.ensure_one()
  162. zip_model = self.env['res.better.zip']
  163. res_city_model = self.env['res.city']
  164. country_code = self.country_id.code
  165. config_url = self.env['ir.config_parameter'].get_param(
  166. 'geonames.url',
  167. default='http://download.geonames.org/export/zip/%s.zip')
  168. url = config_url % country_code
  169. logger.info('Starting to download %s' % url)
  170. res_request = requests.get(url)
  171. if res_request.status_code != requests.codes.ok:
  172. raise UserError(
  173. _('Got an error %d when trying to download the file %s.')
  174. % (res_request.status_code, url))
  175. # Store current record list
  176. res_cities_to_delete = res_city_model
  177. zips_to_delete = zip_model.search(
  178. [('country_id', '=', self.country_id.id)])
  179. if self.enforce_cities:
  180. res_cities_to_delete = res_city_model.search(
  181. [('country_id', '=', self.country_id.id)])
  182. f_geonames = zipfile.ZipFile(io.BytesIO(res_request.content))
  183. tempdir = tempfile.mkdtemp(prefix='odoo')
  184. f_geonames.extract('%s.txt' % country_code, tempdir)
  185. logger.info('The geonames zipfile has been decompressed')
  186. data_file = open(os.path.join(tempdir, '%s.txt' % country_code), 'r',
  187. encoding='utf-8')
  188. data_file.seek(0)
  189. logger.info('Starting to create the cities and/or better zip entries')
  190. max_import = self.env.context.get('max_import', 0)
  191. reader = csv.reader(data_file, delimiter=' ')
  192. for i, row in enumerate(reader):
  193. res_city = False
  194. if self.enforce_cities:
  195. res_city = self.create_res_city(row, self.country_id)
  196. if res_city in res_cities_to_delete:
  197. res_cities_to_delete -= res_city
  198. zip_code = self.create_better_zip(row, self.country_id,
  199. res_city)
  200. if zip_code in zips_to_delete:
  201. zips_to_delete -= zip_code
  202. if max_import and (i + 1) == max_import:
  203. break
  204. data_file.close()
  205. if zips_to_delete and not max_import:
  206. zips_to_delete.unlink()
  207. logger.info('%d better zip entries deleted for country %s' %
  208. (len(zips_to_delete), self.country_id.name))
  209. if res_cities_to_delete and not max_import:
  210. res_cities_to_delete.unlink()
  211. logger.info('%d res.city entries deleted for country %s' %
  212. (len(res_cities_to_delete), self.country_id.name))
  213. logger.info(
  214. 'The wizard to create cities and/or better zip entries from '
  215. 'geonames has been successfully completed.')
  216. return True