You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

231 lines
7.5 KiB

# Copyright 2015 Antonio Espinosa <antonio.espinosa@tecnativa.com>
# Copyright 2016 Jairo Llopis <jairo.llopis@tecnativa.com>
# Copyright 2017 David Vidal <jairo.llopis@tecnativa.com>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
import logging
import re
from collections import OrderedDict
import requests
from lxml import etree
from odoo import _, api, models
from odoo.exceptions import UserError
logger = logging.getLogger(__name__)
# Default server values
URL_BASE = "http://ec.europa.eu"
URL_PATH = "/eurostat/ramon/nomenclatures/index.cfm"
URL_PARAMS = {
"TargetUrl": "ACT_OTH_CLS_DLD",
"StrNom": "NUTS_2013",
"StrFormat": "XML",
"StrLanguageCode": "EN",
"StrLayoutCode": "HIERARCHIC",
}
class NutsImport(models.TransientModel):
_name = "nuts.import"
_description = "Import NUTS items from European RAMON service"
_parents = [False, False, False, False]
_countries = {
"BE": False,
"BG": False,
"CZ": False,
"DK": False,
"DE": False,
"EE": False,
"IE": False,
"GR": False, # EL
"ES": False,
"FR": False,
"HR": False,
"IT": False,
"CY": False,
"LV": False,
"LT": False,
"LU": False,
"HU": False,
"MT": False,
"NL": False,
"AT": False,
"PL": False,
"PT": False,
"RO": False,
"SI": False,
"SK": False,
"FI": False,
"SE": False,
"GB": False, # UK
}
_current_country = False
_map = OrderedDict(
[
(
"level",
{"xpath": "", "attrib": "idLevel", "type": "integer", "required": True},
),
(
"code",
{
"xpath": './Label/LabelText[@language="ALL"]',
"type": "string",
"required": True,
},
),
(
"name",
{
"xpath": './Label/LabelText[@language="EN"]',
"type": "string",
"required": True,
},
),
]
)
def _check_node(self, node):
if node.get("id") and node.get("idLevel"):
return True
return False
def _mapping(self, node):
item = {}
for k, v in self._map.items():
field_xpath = v.get("xpath", "")
field_attrib = v.get("attrib", False)
field_type = v.get("type", "string")
field_required = v.get("required", False)
value = ""
if field_xpath:
n = node.find(field_xpath)
else:
n = node
if n is not None:
if field_attrib:
value = n.get(field_attrib, "")
else:
value = n.text
if field_type == "integer":
try:
value = int(value)
except (ValueError, TypeError):
logger.warning(
"Value {} for field {} replaced by 0".format(value, k)
)
value = 0
else:
logger.debug("xpath = '%s', not found" % field_xpath)
if field_required and not value:
raise UserError(_("Value not found for mandatory field %s" % k))
item[k] = value
return item
def _download_nuts(self, url_base=None, url_path=None, url_params=None):
if not url_base:
url_base = URL_BASE
if not url_path:
url_path = URL_PATH
if not url_params:
url_params = URL_PARAMS
url = url_base + url_path + "?"
url += "&".join([k + "=" + v for k, v in url_params.items()])
logger.info("Starting to download %s" % url)
try:
res_request = requests.get(url)
except Exception as e:
raise UserError(
_("Got an error when trying to download the file: %s.") % str(e)
)
if res_request.status_code != requests.codes.ok:
raise UserError(
_("Got an error %d when trying to download the file %s.")
% (res_request.status_code, url)
)
logger.info("Download successfully %d bytes" % len(res_request.content))
# Workaround XML: Remove all characters before <?xml
pattern = re.compile(rb"^.*<\?xml", re.DOTALL)
content_fixed = re.sub(pattern, b"<?xml", res_request.content)
if not re.match(rb"<\?xml", content_fixed):
raise UserError(_("Downloaded file is not a valid XML file"))
return content_fixed
@api.model
def _load_countries(self):
for k in self._countries:
self._countries[k] = self.env["res.country"].search([("code", "=", k)])
# Workaround to translate some country codes:
# EL => GR (Greece)
# UK => GB (United Kingdom)
self._countries["EL"] = self._countries["GR"]
self._countries["UK"] = self._countries["GB"]
@api.model
def state_mapping(self, data, node):
# Method to inherit and add state_id relation depending on country
level = data.get("level", 0)
code = data.get("code", "")
if level == 1:
self._current_country = self._countries[code]
return {"country_id": self._current_country.id}
@api.model
def create_or_update_nuts(self, node):
if not self._check_node(node):
return False
nuts_model = self.env["res.partner.nuts"]
data = self._mapping(node)
data.update(self.state_mapping(data, node))
level = data.get("level", 0)
if 2 <= level <= 5:
data["parent_id"] = self._parents[level - 2]
nuts = nuts_model.search(
[("level", "=", data["level"]), ("code", "=", data["code"])]
)
if nuts:
nuts.filtered(lambda n: not n.not_updatable).write(data)
else:
nuts = nuts_model.create(data)
if 1 <= level <= 4:
self._parents[level - 1] = nuts.id
return nuts
def run_import(self):
nuts_model = self.env["res.partner.nuts"].with_context(
defer_parent_store_computation=True
)
self._load_countries()
# All current NUTS (for available countries),
# delete if not found above
nuts_to_delete = nuts_model.search(
[
("country_id", "in", [x.id for x in self._countries.values()]),
("not_updatable", "=", False),
]
)
# Download NUTS in english, create or update
logger.info("Importing NUTS 2013 English...")
xmlcontent = self._download_nuts()
dom = etree.fromstring(xmlcontent)
for node in dom.iter("Item"):
logger.debug(
"Reading level=%s, id=%s",
node.get("idLevel", "N/A"),
node.get("id", "N/A"),
)
nuts = self.create_or_update_nuts(node)
if nuts and nuts in nuts_to_delete:
nuts_to_delete -= nuts
# Delete obsolete NUTS
if nuts_to_delete:
logger.info("%d NUTS entries deleted" % len(nuts_to_delete))
nuts_to_delete.unlink()
logger.info(
"The wizard to create NUTS entries from RAMON "
"has been successfully completed."
)
return True