You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

250 lines
9.5 KiB

  1. # Copyright 2014-2016 Akretion (Alexis de Lattre
  2. # <alexis.delattre@akretion.com>)
  3. # Copyright 2014 Lorenzo Battistini <lorenzo.battistini@agilebg.com>
  4. # Copyright 2017 Eficent Business and IT Consulting Services, S.L.
  5. # <contact@eficent.com>
  6. # Copyright 2018 Aitor Bouzas <aitor.bouzas@adaptivecity.com>
  7. # Copyright 2016-2020 Tecnativa - Pedro M. Baeza
  8. # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
  9. import csv
  10. import io
  11. import logging
  12. import os
  13. import tempfile
  14. import zipfile
  15. import requests
  16. from odoo import _, api, fields, models
  17. from odoo.exceptions import UserError
  18. logger = logging.getLogger(__name__)
  19. class CityZipGeonamesImport(models.TransientModel):
  20. _name = "city.zip.geonames.import"
  21. _description = "Import City Zips from Geonames"
  22. country_ids = fields.Many2many("res.country", string="Countries")
  23. letter_case = fields.Selection(
  24. [("unchanged", "Unchanged"), ("title", "Title Case"), ("upper", "Upper Case")],
  25. string="Letter Case",
  26. default="unchanged",
  27. help="Converts retreived city and state names to Title Case "
  28. "(upper case on each first letter of a word) or Upper Case "
  29. "(all letters upper case).",
  30. )
  31. @api.model
  32. def transform_city_name(self, city, country):
  33. """Override it for transforming city name (if needed)
  34. :param city: Original city name
  35. :param country: Country record
  36. :return: Transformed city name
  37. """
  38. res = city
  39. if self.letter_case == "title":
  40. res = city.title()
  41. elif self.letter_case == "upper":
  42. res = city.upper()
  43. return res
  44. @api.model
  45. def _domain_search_city_zip(self, row, city_id=False):
  46. domain = [("name", "=", row[1])]
  47. if city_id:
  48. domain += [("city_id", "=", city_id)]
  49. return domain
  50. @api.model
  51. def select_state(self, row, country):
  52. code = row[country.geonames_state_code_column or 4]
  53. return self.env["res.country.state"].search(
  54. [("country_id", "=", country.id), ("code", "=", code)], limit=1
  55. )
  56. @api.model
  57. def select_city(self, row, country, state_id):
  58. # This has to be done by SQL for performance reasons avoiding
  59. # left join with ir_translation on the translatable field "name"
  60. self.env.cr.execute(
  61. "SELECT id, name FROM res_city "
  62. "WHERE name = %s AND country_id = %s AND state_id = %s LIMIT 1",
  63. (self.transform_city_name(row[2], country), country.id, state_id),
  64. )
  65. row_city = self.env.cr.fetchone()
  66. return (row_city[0], row_city[1]) if row_city else (False, False)
  67. @api.model
  68. def select_zip(self, row, country, state_id):
  69. city_id, _ = self.select_city(row, country, state_id)
  70. return self.env["res.city.zip"].search(
  71. self._domain_search_city_zip(row, city_id)
  72. )
  73. @api.model
  74. def prepare_state(self, row, country):
  75. return {
  76. "name": row[country.geonames_state_name_column or 3],
  77. "code": row[country.geonames_state_code_column or 4],
  78. "country_id": country.id,
  79. }
  80. @api.model
  81. def prepare_city(self, row, country, state_id):
  82. vals = {
  83. "name": self.transform_city_name(row[2], country),
  84. "state_id": state_id,
  85. "country_id": country.id,
  86. }
  87. return vals
  88. @api.model
  89. def prepare_zip(self, row, city_id):
  90. vals = {"name": row[1], "city_id": city_id}
  91. return vals
  92. @api.model
  93. def get_and_parse_csv(self, country):
  94. country_code = country.code
  95. config_url = self.env["ir.config_parameter"].get_param(
  96. "geonames.url", default="http://download.geonames.org/export/zip/%s.zip"
  97. )
  98. url = config_url % country_code
  99. logger.info("Starting to download %s" % url)
  100. res_request = requests.get(url)
  101. if res_request.status_code != requests.codes.ok:
  102. raise UserError(
  103. _("Got an error %d when trying to download the file %s.")
  104. % (res_request.status_code, url)
  105. )
  106. f_geonames = zipfile.ZipFile(io.BytesIO(res_request.content))
  107. tempdir = tempfile.mkdtemp(prefix="odoo")
  108. f_geonames.extract("%s.txt" % country_code, tempdir)
  109. data_file = open(
  110. os.path.join(tempdir, "%s.txt" % country_code), "r", encoding="utf-8"
  111. )
  112. data_file.seek(0)
  113. reader = csv.reader(data_file, delimiter=" ")
  114. parsed_csv = [row for i, row in enumerate(reader)]
  115. data_file.close()
  116. logger.info("The geonames zipfile has been decompressed")
  117. return parsed_csv
  118. def _create_states(self, parsed_csv, search_states, max_import, country):
  119. # States
  120. state_vals_list = []
  121. state_dict = {}
  122. for i, row in enumerate(parsed_csv):
  123. if max_import and i == max_import:
  124. break
  125. state = self.select_state(row, country) if search_states else False
  126. if not state:
  127. state_vals = self.prepare_state(row, country)
  128. if state_vals not in state_vals_list:
  129. state_vals_list.append(state_vals)
  130. else:
  131. state_dict[state.code] = state.id
  132. created_states = self.env["res.country.state"].create(state_vals_list)
  133. for i, vals in enumerate(state_vals_list):
  134. state_dict[vals["code"]] = created_states[i].id
  135. return state_dict
  136. def _create_cities(
  137. self, parsed_csv, search_cities, max_import, state_dict, country
  138. ):
  139. # Cities
  140. city_vals_list = []
  141. city_dict = {}
  142. for i, row in enumerate(parsed_csv):
  143. if max_import and i == max_import:
  144. break
  145. state_id = state_dict[row[country.geonames_state_code_column or 4]]
  146. city_id, city_name = (
  147. self.select_city(row, country, state_id)
  148. if search_cities
  149. else (False, False)
  150. )
  151. if not city_id:
  152. city_vals = self.prepare_city(row, country, state_id)
  153. if city_vals not in city_vals_list:
  154. city_vals_list.append(city_vals)
  155. else:
  156. city_dict[(city_name, state_id)] = city_id
  157. ctx = dict(self.env.context)
  158. ctx.pop("lang", None) # make sure no translation is added
  159. created_cities = self.env["res.city"].with_context(ctx).create(city_vals_list)
  160. for i, vals in enumerate(city_vals_list):
  161. city_dict[(vals["name"], vals["state_id"])] = created_cities[i].id
  162. return city_dict
  163. def run_import(self):
  164. for country in self.country_ids:
  165. parsed_csv = self.get_and_parse_csv(country)
  166. self._process_csv(parsed_csv, country)
  167. return True
  168. def _process_csv(self, parsed_csv, country):
  169. state_model = self.env["res.country.state"]
  170. zip_model = self.env["res.city.zip"]
  171. res_city_model = self.env["res.city"]
  172. # Store current record list
  173. old_zips = set(zip_model.search([("city_id.country_id", "=", country.id)]).ids)
  174. search_zips = len(old_zips) > 0
  175. old_cities = set(res_city_model.search([("country_id", "=", country.id)]).ids)
  176. search_cities = len(old_cities) > 0
  177. current_states = state_model.search([("country_id", "=", country.id)])
  178. search_states = len(current_states) > 0
  179. max_import = self.env.context.get("max_import", 0)
  180. logger.info("Starting to create the cities and/or city zip entries")
  181. # Pre-create states and cities
  182. state_dict = self._create_states(parsed_csv, search_states, max_import, country)
  183. city_dict = self._create_cities(
  184. parsed_csv, search_cities, max_import, state_dict, country
  185. )
  186. # Zips
  187. zip_vals_list = []
  188. for i, row in enumerate(parsed_csv):
  189. if max_import and i == max_import:
  190. break
  191. # Don't search if there aren't any records
  192. zip_code = False
  193. state_id = state_dict[row[country.geonames_state_code_column or 4]]
  194. if search_zips:
  195. zip_code = self.select_zip(row, country, state_id)
  196. if not zip_code:
  197. city_id = city_dict[
  198. (self.transform_city_name(row[2], country), state_id)
  199. ]
  200. zip_vals = self.prepare_zip(row, city_id)
  201. if zip_vals not in zip_vals_list:
  202. zip_vals_list.append(zip_vals)
  203. else:
  204. old_zips.remove(zip_code.id)
  205. self.env["res.city.zip"].create(zip_vals_list)
  206. if not max_import:
  207. if old_zips:
  208. logger.info("removing city zip entries")
  209. self.env["res.city.zip"].browse(list(old_zips)).unlink()
  210. logger.info(
  211. "%d city zip entries deleted for country %s"
  212. % (len(old_zips), country.name)
  213. )
  214. old_cities -= set(city_dict.values())
  215. if old_cities:
  216. logger.info("removing city entries")
  217. self.env["res.city"].browse(list(old_cities)).unlink()
  218. logger.info(
  219. "%d res.city entries deleted for country %s"
  220. % (len(old_cities), country.name)
  221. )
  222. logger.info(
  223. "The wizard to create cities and/or city zip entries from "
  224. "geonames has been successfully completed."
  225. )
  226. return True