You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

236 lines
9.2 KiB

  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 Akretion (Alexis de Lattre
  3. # <alexis.delattre@akretion.com>)
  4. # Copyright 2014 Lorenzo Battistini <lorenzo.battistini@agilebg.com>
  5. # Copyright 2016 Pedro M. Baeza <pedro.baeza@tecnativa.com>
  6. # Copyright 2017 Eficent Business and IT Consulting Services, S.L.
  7. # <contact@eficent.com>
  8. # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
  9. from odoo import _, api, fields, models, tools
  10. from odoo.exceptions import UserError
  11. import requests
  12. import tempfile
  13. import io
  14. import zipfile
  15. import os
  16. import logging
  17. import csv
  18. logger = logging.getLogger(__name__)
  19. class BetterZipGeonamesImport(models.TransientModel):
  20. _name = 'better.zip.geonames.import'
  21. _description = 'Import Better Zip from Geonames'
  22. _rec_name = 'country_id'
  23. country_id = fields.Many2one('res.country', 'Country', required=True)
  24. enforce_cities = fields.Boolean(string='Enforce Cities',
  25. help='The city will be created as a '
  26. 'separate entity.',
  27. related='country_id.enforce_cities',
  28. readonly=True)
  29. letter_case = fields.Selection([
  30. ('unchanged', 'Unchanged'),
  31. ('title', 'Title Case'),
  32. ('upper', 'Upper Case'),
  33. ], string='Letter Case', default='unchanged',
  34. help="Converts retreived city and state names to Title Case "
  35. "(upper case on each first letter of a word) or Upper Case "
  36. "(all letters upper case).")
  37. @api.model
  38. def transform_city_name(self, city, country):
  39. """Override it for transforming city name (if needed)
  40. :param city: Original city name
  41. :param country: Country record
  42. :return: Transformed city name
  43. """
  44. return city
  45. @api.model
  46. def _domain_search_res_city(self, row, country):
  47. return [('name', '=', self.transform_city_name(row[2], country)),
  48. ('state_id.name', '=', row[3]),
  49. ('country_id', '=', country.id)]
  50. @api.model
  51. def _domain_search_better_zip(self, row, country, res_city):
  52. domain = [('name', '=', row[1]),
  53. ('city', '=', self.transform_city_name(row[2], country)),
  54. ('state_id.name', '=', row[3]),
  55. ('country_id', '=', country.id)]
  56. if res_city:
  57. domain += [('city_id', '=', res_city.id)]
  58. return domain
  59. @api.model
  60. def _prepare_res_city(self, row, country):
  61. state = self.select_or_create_state(row, country)
  62. vals = {
  63. 'name': self.transform_city_name(row[2], country),
  64. 'state_id': state.id,
  65. 'country_id': country.id,
  66. }
  67. return vals
  68. @api.model
  69. def _prepare_better_zip(self, row, country, res_city):
  70. state = self.select_or_create_state(row, country)
  71. city_name = self.transform_city_name(row[2], country)
  72. vals = {
  73. 'name': row[1],
  74. 'city_id': res_city and res_city.id or False,
  75. 'city': res_city and res_city.name or city_name,
  76. 'state_id': state.id,
  77. 'country_id': country.id,
  78. 'latitude': row[9],
  79. 'longitude': row[10],
  80. }
  81. return vals
  82. @api.model
  83. def create_better_zip(self, row, country, res_city):
  84. if row[0] != country.code:
  85. raise UserError(
  86. _("The country code inside the file (%s) doesn't "
  87. "correspond to the selected country (%s).")
  88. % (row[0], country.code))
  89. logger.debug('ZIP = %s - City = %s' % (row[1], row[2]))
  90. if self.letter_case == 'title':
  91. row[2] = row[2].title()
  92. row[3] = row[3].title()
  93. elif self.letter_case == 'upper':
  94. row[2] = row[2].upper()
  95. row[3] = row[3].upper()
  96. if row[1] and row[2]:
  97. zip_model = self.env['res.better.zip']
  98. zips = zip_model.search(self._domain_search_better_zip(
  99. row, country, res_city))
  100. if zips:
  101. return zips[0]
  102. else:
  103. vals = self._prepare_better_zip(row, country, res_city)
  104. if vals:
  105. logger.debug('Creating res.better.zip %s' % vals['name'])
  106. return zip_model.create(vals)
  107. else: # pragma: no cover
  108. return False
  109. @api.model
  110. def create_res_city(self, row, country):
  111. if row[0] != country.code:
  112. raise UserError(
  113. _("The country code inside the file (%s) doesn't "
  114. "correspond to the selected country (%s).")
  115. % (row[0], country.code))
  116. logger.debug('Processing city creation for ZIP = %s - City = %s' %
  117. (row[1], row[2]))
  118. if self.letter_case == 'title':
  119. row[2] = row[2].title()
  120. row[3] = row[3].title()
  121. elif self.letter_case == 'upper':
  122. row[2] = row[2].upper()
  123. row[3] = row[3].upper()
  124. if row[2]:
  125. res_city_model = self.env['res.city']
  126. res_cities = res_city_model.search(self._domain_search_res_city(
  127. row, country))
  128. if res_cities:
  129. return res_cities[0]
  130. else:
  131. vals = self._prepare_res_city(row, country)
  132. if vals:
  133. logger.debug('Creating res.city %s' % vals['name'])
  134. return res_city_model.create(vals)
  135. else: # pragma: no cover
  136. return False
  137. @tools.ormcache('country_id', 'code')
  138. def _get_state(self, country_id, code, name):
  139. state = self.env['res.country.state'].search(
  140. [('country_id', '=', country_id),
  141. ('code', '=', code)], limit=1,
  142. )
  143. if state: # pragma: no cover
  144. return state
  145. else:
  146. return self.env['res.country.state'].create({
  147. 'name': name,
  148. 'code': code,
  149. 'country_id': country_id,
  150. })
  151. @api.model
  152. def select_or_create_state(
  153. self, row, country, code_row_index=4, name_row_index=3):
  154. if country.geonames_state_code_column:
  155. code_row_index = country.geonames_state_code_column
  156. if country.geonames_state_name_column:
  157. name_row_index = country.geonames_state_name_column
  158. return self._get_state(
  159. country.id, row[code_row_index], row[name_row_index],
  160. )
  161. @api.multi
  162. def run_import(self):
  163. self.ensure_one()
  164. zip_model = self.env['res.better.zip']
  165. res_city_model = self.env['res.city']
  166. country_code = self.country_id.code
  167. config_url = self.env['ir.config_parameter'].get_param(
  168. 'geonames.url',
  169. default='http://download.geonames.org/export/zip/%s.zip')
  170. url = config_url % country_code
  171. logger.info('Starting to download %s' % url)
  172. res_request = requests.get(url)
  173. if res_request.status_code != requests.codes.ok:
  174. raise UserError(
  175. _('Got an error %d when trying to download the file %s.')
  176. % (res_request.status_code, url))
  177. # Store current record list
  178. res_cities_to_delete = res_city_model
  179. zips_to_delete = zip_model.search(
  180. [('country_id', '=', self.country_id.id)])
  181. if self.enforce_cities:
  182. res_cities_to_delete = res_city_model.search(
  183. [('country_id', '=', self.country_id.id)])
  184. f_geonames = zipfile.ZipFile(io.BytesIO(res_request.content))
  185. tempdir = tempfile.mkdtemp(prefix='odoo')
  186. f_geonames.extract('%s.txt' % country_code, tempdir)
  187. logger.info('The geonames zipfile has been decompressed')
  188. data_file = open(os.path.join(tempdir, '%s.txt' % country_code), 'r',
  189. encoding='utf-8')
  190. data_file.seek(0)
  191. logger.info('Starting to create the cities and/or better zip entries')
  192. max_import = self.env.context.get('max_import', 0)
  193. reader = csv.reader(data_file, delimiter=' ')
  194. for i, row in enumerate(reader):
  195. res_city = False
  196. if self.enforce_cities:
  197. res_city = self.create_res_city(row, self.country_id)
  198. if res_city in res_cities_to_delete:
  199. res_cities_to_delete -= res_city
  200. zip_code = self.create_better_zip(row, self.country_id,
  201. res_city)
  202. if zip_code in zips_to_delete:
  203. zips_to_delete -= zip_code
  204. if max_import and (i + 1) == max_import:
  205. break
  206. data_file.close()
  207. if zips_to_delete and not max_import:
  208. zips_to_delete.unlink()
  209. logger.info('%d better zip entries deleted for country %s' %
  210. (len(zips_to_delete), self.country_id.name))
  211. if res_cities_to_delete and not max_import:
  212. res_cities_to_delete.unlink()
  213. logger.info('%d res.city entries deleted for country %s' %
  214. (len(res_cities_to_delete), self.country_id.name))
  215. logger.info(
  216. 'The wizard to create cities and/or better zip entries from '
  217. 'geonames has been successfully completed.')
  218. return True