You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

212 lines
7.3 KiB

  1. # Copyright 2015 Antonio Espinosa <antonio.espinosa@tecnativa.com>
  2. # Copyright 2016 Jairo Llopis <jairo.llopis@tecnativa.com>
  3. # Copyright 2017 David Vidal <jairo.llopis@tecnativa.com>
  4. # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
  5. from odoo import _, api, models
  6. from odoo.exceptions import UserError
  7. import requests
  8. import re
  9. import logging
  10. from lxml import etree
  11. from collections import OrderedDict
  12. logger = logging.getLogger(__name__)
  13. # Default server values
  14. URL_BASE = 'http://ec.europa.eu'
  15. URL_PATH = '/eurostat/ramon/nomenclatures/index.cfm'
  16. URL_PARAMS = {'TargetUrl': 'ACT_OTH_CLS_DLD',
  17. 'StrNom': 'NUTS_2013',
  18. 'StrFormat': 'XML',
  19. 'StrLanguageCode': 'EN',
  20. 'StrLayoutCode': 'HIERARCHIC'
  21. }
  22. class NutsImport(models.TransientModel):
  23. _name = 'nuts.import'
  24. _description = 'Import NUTS items from European RAMON service'
  25. _parents = [False, False, False, False]
  26. _countries = {
  27. "BE": False,
  28. "BG": False,
  29. "CZ": False,
  30. "DK": False,
  31. "DE": False,
  32. "EE": False,
  33. "IE": False,
  34. "GR": False, # EL
  35. "ES": False,
  36. "FR": False,
  37. "HR": False,
  38. "IT": False,
  39. "CY": False,
  40. "LV": False,
  41. "LT": False,
  42. "LU": False,
  43. "HU": False,
  44. "MT": False,
  45. "NL": False,
  46. "AT": False,
  47. "PL": False,
  48. "PT": False,
  49. "RO": False,
  50. "SI": False,
  51. "SK": False,
  52. "FI": False,
  53. "SE": False,
  54. "GB": False, # UK
  55. }
  56. _current_country = False
  57. _map = OrderedDict([
  58. ('level', {
  59. 'xpath': '', 'attrib': 'idLevel',
  60. 'type': 'integer', 'required': True}),
  61. ('code', {
  62. 'xpath': './Label/LabelText[@language="ALL"]',
  63. 'type': 'string', 'required': True}),
  64. ('name', {
  65. 'xpath': './Label/LabelText[@language="EN"]',
  66. 'type': 'string', 'required': True}),
  67. ])
  68. def _check_node(self, node):
  69. if node.get('id') and node.get('idLevel'):
  70. return True
  71. return False
  72. def _mapping(self, node):
  73. item = {}
  74. for k, v in self._map.items():
  75. field_xpath = v.get('xpath', '')
  76. field_attrib = v.get('attrib', False)
  77. field_type = v.get('type', 'string')
  78. field_required = v.get('required', False)
  79. value = ''
  80. if field_xpath:
  81. n = node.find(field_xpath)
  82. else:
  83. n = node
  84. if n is not None:
  85. if field_attrib:
  86. value = n.get(field_attrib, '')
  87. else:
  88. value = n.text
  89. if field_type == 'integer':
  90. try:
  91. value = int(value)
  92. except (ValueError, TypeError):
  93. logger.warn(
  94. "Value %s for field %s replaced by 0" %
  95. (value, k))
  96. value = 0
  97. else:
  98. logger.debug("xpath = '%s', not found" % field_xpath)
  99. if field_required and not value:
  100. raise UserError(
  101. _('Value not found for mandatory field %s' % k))
  102. item[k] = value
  103. return item
  104. def _download_nuts(self, url_base=None, url_path=None, url_params=None):
  105. if not url_base:
  106. url_base = URL_BASE
  107. if not url_path:
  108. url_path = URL_PATH
  109. if not url_params:
  110. url_params = URL_PARAMS
  111. url = url_base + url_path + '?'
  112. url += '&'.join([k + '=' + v for k, v in url_params.items()])
  113. logger.info('Starting to download %s' % url)
  114. try:
  115. res_request = requests.get(url)
  116. except Exception as e:
  117. raise UserError(
  118. _('Got an error when trying to download the file: %s.') %
  119. str(e))
  120. if res_request.status_code != requests.codes.ok:
  121. raise UserError(
  122. _('Got an error %d when trying to download the file %s.')
  123. % (res_request.status_code, url))
  124. logger.info('Download successfully %d bytes' %
  125. len(res_request.content))
  126. # Workaround XML: Remove all characters before <?xml
  127. pattern = re.compile(rb'^.*<\?xml', re.DOTALL)
  128. content_fixed = re.sub(pattern, b'<?xml', res_request.content)
  129. if not re.match(rb'<\?xml', content_fixed):
  130. raise UserError(_('Downloaded file is not a valid XML file'))
  131. return content_fixed
  132. @api.model
  133. def _load_countries(self):
  134. for k in self._countries:
  135. self._countries[k] = self.env['res.country'].search(
  136. [('code', '=', k)])
  137. # Workaround to translate some country codes:
  138. # EL => GR (Greece)
  139. # UK => GB (United Kingdom)
  140. self._countries['EL'] = self._countries['GR']
  141. self._countries['UK'] = self._countries['GB']
  142. @api.model
  143. def state_mapping(self, data, node):
  144. # Method to inherit and add state_id relation depending on country
  145. level = data.get('level', 0)
  146. code = data.get('code', '')
  147. if level == 1:
  148. self._current_country = self._countries[code]
  149. return {
  150. 'country_id': self._current_country.id,
  151. }
  152. @api.model
  153. def create_or_update_nuts(self, node):
  154. if not self._check_node(node):
  155. return False
  156. nuts_model = self.env['res.partner.nuts']
  157. data = self._mapping(node)
  158. data.update(self.state_mapping(data, node))
  159. level = data.get('level', 0)
  160. if level >= 2 and level <= 5:
  161. data['parent_id'] = self._parents[level - 2]
  162. nuts = nuts_model.search([('level', '=', data['level']),
  163. ('code', '=', data['code'])])
  164. if nuts:
  165. nuts.filtered(lambda n: not n.not_updatable).write(data)
  166. else:
  167. nuts = nuts_model.create(data)
  168. if level >= 1 and level <= 4:
  169. self._parents[level - 1] = nuts.id
  170. return nuts
  171. @api.multi
  172. def run_import(self):
  173. nuts_model = self.env['res.partner.nuts'].\
  174. with_context(defer_parent_store_computation=True)
  175. self._load_countries()
  176. # All current NUTS (for available countries),
  177. # delete if not found above
  178. nuts_to_delete = nuts_model.search(
  179. [('country_id', 'in', [x.id for x in self._countries.values()]),
  180. ('not_updatable', '=', False)])
  181. # Download NUTS in english, create or update
  182. logger.info('Importing NUTS 2013 English...')
  183. xmlcontent = self._download_nuts()
  184. dom = etree.fromstring(xmlcontent)
  185. for node in dom.iter('Item'):
  186. logger.debug('Reading level=%s, id=%s',
  187. node.get('idLevel', 'N/A'),
  188. node.get('id', 'N/A'))
  189. nuts = self.create_or_update_nuts(node)
  190. if nuts and nuts in nuts_to_delete:
  191. nuts_to_delete -= nuts
  192. # Delete obsolete NUTS
  193. if nuts_to_delete:
  194. logger.info('%d NUTS entries deleted' % len(nuts_to_delete))
  195. nuts_to_delete.unlink()
  196. logger.info(
  197. 'The wizard to create NUTS entries from RAMON '
  198. 'has been successfully completed.')
  199. return True