|
|
# Copyright 2015 Antonio Espinosa <antonio.espinosa@tecnativa.com> # Copyright 2016 Jairo Llopis <jairo.llopis@tecnativa.com> # Copyright 2017 David Vidal <jairo.llopis@tecnativa.com> # Copyright 2021 Andrii Skrypka <andrijskrypa@ukr.net> # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
import logging import re from collections import OrderedDict
import requests from lxml import etree
from odoo import _, api, fields, models from odoo.exceptions import UserError
logger = logging.getLogger(__name__)
# Default server values URL_BASE = "http://ec.europa.eu" URL_PATH = "/eurostat/ramon/nomenclatures/index.cfm" URL_PARAMS = { "TargetUrl": "ACT_OTH_CLS_DLD", "StrNom": "NUTS_2013", "StrFormat": "XML", "StrLanguageCode": "EN", "StrLayoutCode": "HIERARCHIC", }
class NutsImport(models.TransientModel): _name = "nuts.import" _description = "Import NUTS items from European RAMON service" _parents = [False, False, False, False] _countries = { "BE": False, "BG": False, "CZ": False, "DK": False, "DE": False, "EE": False, "IE": False, "GR": False, # EL "ES": False, "FR": False, "HR": False, "IT": False, "CY": False, "LV": False, "LT": False, "LU": False, "HU": False, "MT": False, "NL": False, "AT": False, "PL": False, "PT": False, "RO": False, "SI": False, "SK": False, "FI": False, "SE": False, "GB": False, # UK } _map = OrderedDict( [ ( "level", {"xpath": "", "attrib": "idLevel", "type": "integer", "required": True}, ), ( "code", { "xpath": './Label/LabelText[@language="ALL"]', "type": "string", "required": True, }, ), ( "name", { "xpath": './Label/LabelText[@language="EN"]', "type": "string", "required": True, }, ), ] ) current_country_id = fields.Many2one("res.country")
def _check_node(self, node): if node.get("id") and node.get("idLevel"): return True return False
def _mapping(self, node): item = {} for k, v in self._map.items(): field_xpath = v.get("xpath", "") field_attrib = v.get("attrib", False) field_type = v.get("type", "string") field_required = v.get("required", False) value = "" if field_xpath: n = node.find(field_xpath) else: n = node if n is not None: if field_attrib: value = n.get(field_attrib, "") else: value = n.text if field_type == "integer": try: value = int(value) except (ValueError, TypeError): logger.warning( "Value {} for field {} replaced by 0".format(value, k) ) value = 0 else: logger.debug("xpath = '%s', not found" % field_xpath) if field_required and not value: raise UserError(_("Value not found for mandatory field %s" % k)) item[k] = value return item
def _download_nuts(self, url_base=None, url_path=None, url_params=None): if not url_base: url_base = URL_BASE if not url_path: url_path = URL_PATH if not url_params: url_params = URL_PARAMS url = url_base + url_path + "?" url += "&".join([k + "=" + v for k, v in url_params.items()]) logger.info("Starting to download %s" % url) try: res_request = requests.get(url) except Exception as e: raise UserError( _("Got an error when trying to download the file: %s.") % str(e) ) if res_request.status_code != requests.codes.ok: raise UserError( _("Got an error %d when trying to download the file %s.") % (res_request.status_code, url) ) logger.info("Download successfully %d bytes" % len(res_request.content)) # Workaround XML: Remove all characters before <?xml pattern = re.compile(rb"^.*<\?xml", re.DOTALL) content_fixed = re.sub(pattern, b"<?xml", res_request.content) if not re.match(rb"<\?xml", content_fixed): raise UserError(_("Downloaded file is not a valid XML file")) return content_fixed
@api.model def _load_countries(self): for k in self._countries: self._countries[k] = self.env["res.country"].search([("code", "=", k)]) # Workaround to translate some country codes: # EL => GR (Greece) # UK => GB (United Kingdom) self._countries["EL"] = self._countries["GR"] self._countries["UK"] = self._countries["GB"]
@api.model def state_mapping(self, data, node): # Method to inherit and add state_id relation depending on country level = data.get("level", 0) code = data.get("code", "") if level == 1: self.current_country_id = self._countries[code] return {"country_id": self.current_country_id.id}
@api.model def create_or_update_nuts(self, node): if not self._check_node(node): return False
nuts_model = self.env["res.partner.nuts"] data = self._mapping(node) data.update(self.state_mapping(data, node)) level = data.get("level", 0) if 2 <= level <= 5: data["parent_id"] = self._parents[level - 2] nuts = nuts_model.search( [("level", "=", data["level"]), ("code", "=", data["code"])] ) if nuts: nuts.filtered(lambda n: not n.not_updatable).write(data) else: nuts = nuts_model.create(data) if 1 <= level <= 4: self._parents[level - 1] = nuts.id return nuts
def run_import(self): nuts_model = self.env["res.partner.nuts"].with_context( defer_parent_store_computation=True ) self._load_countries() # All current NUTS (for available countries), # delete if not found above nuts_to_delete = nuts_model.search( [ ("country_id", "in", [x.id for x in self._countries.values()]), ("not_updatable", "=", False), ] ) # Download NUTS in english, create or update logger.info("Importing NUTS 2013 English...") xmlcontent = self._download_nuts() dom = etree.fromstring(xmlcontent) for node in dom.iter("Item"): logger.debug( "Reading level=%s, id=%s", node.get("idLevel", "N/A"), node.get("id", "N/A"), ) nuts = self.create_or_update_nuts(node) if nuts and nuts in nuts_to_delete: nuts_to_delete -= nuts # Delete obsolete NUTS if nuts_to_delete: logger.info("%d NUTS entries deleted" % len(nuts_to_delete)) nuts_to_delete.unlink() logger.info( "The wizard to create NUTS entries from RAMON " "has been successfully completed." ) return True
|