Browse Source

pre-commit, black, isort

14.0
OCA-git-bot 5 years ago
committed by Pedro M. Baeza
parent
commit
f77c557e69
  1. 38
      base_location_geonames_import/__manifest__.py
  2. 10
      base_location_geonames_import/models/res_country.py
  3. 1
      base_location_geonames_import/readme/INSTALL.rst
  4. 172
      base_location_geonames_import/tests/test_base_location_geonames_import.py
  5. 196
      base_location_geonames_import/wizard/geonames_import.py

38
base_location_geonames_import/__manifest__.py

@ -7,24 +7,24 @@
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
{
'name': 'Base Location Geonames Import',
'version': '13.0.1.0.0',
'category': 'Partner Management',
'license': 'AGPL-3',
'summary': 'Import zip entries from Geonames',
'author': 'Akretion,'
'Agile Business Group,'
'Tecnativa,'
'AdaptiveCity,'
'Odoo Community Association (OCA)',
'website': 'https://github.com/OCA/partner-contact',
'depends': [
'base_location',
"name": "Base Location Geonames Import",
"version": "13.0.1.0.0",
"category": "Partner Management",
"license": "AGPL-3",
"summary": "Import zip entries from Geonames",
"author": (
"Akretion,"
"Agile Business Group,"
"Tecnativa,"
"AdaptiveCity,"
"Odoo Community Association (OCA)"
),
"website": "https://github.com/OCA/partner-contact",
"depends": ["base_location"],
"data": [
"data/res_country_data.xml",
"views/res_country_view.xml",
"wizard/geonames_import_view.xml",
],
'data': [
'data/res_country_data.xml',
'views/res_country_view.xml',
'wizard/geonames_import_view.xml',
],
'installable': True,
"installable": True,
}

10
base_location_geonames_import/models/res_country.py

@ -6,11 +6,7 @@ from odoo import fields, models
class ResCountryState(models.Model):
_inherit = 'res.country'
_inherit = "res.country"
geonames_state_name_column = fields.Integer(
'Geonames State Name Column',
)
geonames_state_code_column = fields.Integer(
'Geonames State Code Column',
)
geonames_state_name_column = fields.Integer("Geonames State Name Column")
geonames_state_code_column = fields.Integer("Geonames State Code Column")

1
base_location_geonames_import/readme/INSTALL.rst

@ -1,4 +1,3 @@
To install this module, you need the Python library 'requests'::
pip install requests

172
base_location_geonames_import/tests/test_base_location_geonames_import.py

@ -1,106 +1,101 @@
# Copyright 2016-2019 Pedro M. Baeza <pedro.baeza@tecnativa.com>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
from odoo.tests import common
from odoo.exceptions import UserError
from odoo.tests import common
class TestBaseLocationGeonamesImport(common.SavepointCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.country = cls.env.ref('base.mc')
cls.city = cls.env['res.city'].create({
'name': 'Test city',
'country_id': cls.country.id,
})
cls.wizard = cls.env['city.zip.geonames.import'].create({
'country_id': cls.country.id,
})
cls.wrong_country = cls.env['res.country'].create({
'name': 'Wrong country',
'code': 'ZZYYXX',
})
cls.wrong_wizard = cls.env['city.zip.geonames.import'].create({
'country_id': cls.wrong_country.id,
})
cls.country = cls.env.ref("base.mc")
cls.city = cls.env["res.city"].create(
{"name": "Test city", "country_id": cls.country.id}
)
cls.wizard = cls.env["city.zip.geonames.import"].create(
{"country_id": cls.country.id}
)
cls.wrong_country = cls.env["res.country"].create(
{"name": "Wrong country", "code": "ZZYYXX"}
)
cls.wrong_wizard = cls.env["city.zip.geonames.import"].create(
{"country_id": cls.wrong_country.id}
)
def test_import_country(self):
max_import = 10
self.wizard.with_context(max_import=max_import).run_import()
# Look if there are imported states for the country
state_count = self.env['res.country.state'].search_count([
('country_id', '=', self.country.id)
])
state_count = self.env["res.country.state"].search_count(
[("country_id", "=", self.country.id)]
)
self.assertTrue(state_count)
# Look if there are imported zips
zip_count = self.env['res.city.zip'].search_count([
('city_id.country_id', '=', self.country.id)
])
zip_count = self.env["res.city.zip"].search_count(
[("city_id.country_id", "=", self.country.id)]
)
self.assertEqual(zip_count, max_import)
# Look if there are imported cities
city_count = self.env['res.city'].search_count([
('country_id', '=', self.country.id)
])
city_count = self.env["res.city"].search_count(
[("country_id", "=", self.country.id)]
)
self.assertTrue(city_count)
# Reimport again to see that there's no duplicates
self.wizard.with_context(max_import=max_import).run_import()
state_count2 = self.env['res.country.state'].search_count([
('country_id', '=', self.country.id)
])
state_count2 = self.env["res.country.state"].search_count(
[("country_id", "=", self.country.id)]
)
self.assertEqual(state_count, state_count2)
city_count2 = self.env['res.city'].search_count([
('country_id', '=', self.country.id)
])
city_count2 = self.env["res.city"].search_count(
[("country_id", "=", self.country.id)]
)
self.assertEqual(city_count, city_count2)
zip_count = self.env['res.city.zip'].search_count([
('city_id.country_id', '=', self.country.id)
])
zip_count = self.env["res.city.zip"].search_count(
[("city_id.country_id", "=", self.country.id)]
)
self.assertEqual(zip_count, max_import)
def test_delete_old_entries(self):
zip_entry = self.env['res.city.zip'].create({
'name': 'Brussels',
'city_id': self.city.id,
})
zip_entry = self.env["res.city.zip"].create(
{"name": "Brussels", "city_id": self.city.id}
)
self.wizard.run_import()
self.assertFalse(zip_entry.exists())
city_entry = self.env['res.city'].create({
'name': 'Test city',
'country_id': self.country.id,
})
city_entry = self.env["res.city"].create(
{"name": "Test city", "country_id": self.country.id}
)
self.wizard.run_import()
self.assertFalse(city_entry.exists())
def test_import_title(self):
self.wizard.letter_case = 'title'
self.wizard.letter_case = "title"
self.wizard.with_context(max_import=1).run_import()
zip = self.env['res.city.zip'].search(
[('city_id.country_id', '=', self.country.id)], limit=1
zip = self.env["res.city.zip"].search(
[("city_id.country_id", "=", self.country.id)], limit=1
)
self.assertEqual(zip.city_id.name, zip.city_id.name.title())
city = self.env['res.city'].search(
[('country_id', '=', self.country.id)], limit=1
city = self.env["res.city"].search(
[("country_id", "=", self.country.id)], limit=1
)
self.assertEqual(city.name, city.name.title())
def test_import_upper(self):
self.wizard.letter_case = 'upper'
self.wizard.letter_case = "upper"
self.wizard.with_context(max_import=1).run_import()
zip = self.env['res.city.zip'].search(
[('city_id.country_id', '=', self.country.id)], limit=1
zip = self.env["res.city.zip"].search(
[("city_id.country_id", "=", self.country.id)], limit=1
)
self.assertEqual(zip.city_id.name, zip.city_id.name.upper())
city = self.env['res.city'].search(
[('country_id', '=', self.country.id)], limit=1
city = self.env["res.city"].search(
[("country_id", "=", self.country.id)], limit=1
)
self.assertEqual(city.name, city.name.upper())
@ -111,34 +106,63 @@ class TestBaseLocationGeonamesImport(common.SavepointCase):
self.wrong_wizard.run_import()
def test_import_duplicated_city_name(self):
country = self.env.ref('base.us')
country = self.env.ref("base.us")
self.wizard.country_id = country.id
parsed_csv = [
['US', '95602', 'Auburn', ' California', 'CA', 'Placer', '61',
'38.9829', '-121.0944', '4'],
['US', '95603', 'Auburn', ' California', 'CA', 'Placer', '61',
'38.9115', '-121.08', '4'],
['US', '30011', 'Auburn', ' Georgia', 'GA', 'Barrow', '13',
'34.0191', '-83.8261', '4'],
[
"US",
"95602",
"Auburn",
" California",
"CA",
"Placer",
"61",
"38.9829",
"-121.0944",
"4",
],
[
"US",
"95603",
"Auburn",
" California",
"CA",
"Placer",
"61",
"38.9115",
"-121.08",
"4",
],
[
"US",
"30011",
"Auburn",
" Georgia",
"GA",
"Barrow",
"13",
"34.0191",
"-83.8261",
"4",
],
]
self.wizard._process_csv(parsed_csv)
cities = self.env['res.city'].search([('name', '=', 'Auburn')])
cities = self.env["res.city"].search([("name", "=", "Auburn")])
self.assertEqual(len(cities), 2)
mapping = [
['California', '95602'],
['California', '95603'],
['Georgia', '30011'],
["California", "95602"],
["California", "95603"],
["Georgia", "30011"],
]
for state_name, zip_code in mapping:
zip_entry = self.env['res.city.zip'].search([
('city_id.country_id', '=', country.id),
('name', '=', zip_code),
])
state = self.env['res.country.state'].search([
('country_id', '=', country.id),
('name', '=', state_name),
])
zip_entry = self.env["res.city.zip"].search(
[("city_id.country_id", "=", country.id), ("name", "=", zip_code)]
)
state = self.env["res.country.state"].search(
[("country_id", "=", country.id), ("name", "=", state_name)]
)
self.assertEqual(
zip_entry.city_id.state_id, state,
"Incorrect state for %s %s" % (state_name, zip_code),
zip_entry.city_id.state_id,
state,
"Incorrect state for {} {}".format(state_name, zip_code),
)

196
base_location_geonames_import/wizard/geonames_import.py

@ -7,39 +7,40 @@
# Copyright 2016-2019 Tecnativa - Pedro M. Baeza
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
from odoo import _, api, fields, models
from odoo.exceptions import UserError
import requests
import tempfile
import csv
import io
import zipfile
import os
import logging
import csv
import os
import tempfile
import zipfile
import requests
from odoo import _, api, fields, models
from odoo.exceptions import UserError
logger = logging.getLogger(__name__)
class CityZipGeonamesImport(models.TransientModel):
_name = 'city.zip.geonames.import'
_description = 'Import City Zips from Geonames'
_rec_name = 'country_id'
_name = "city.zip.geonames.import"
_description = "Import City Zips from Geonames"
_rec_name = "country_id"
country_id = fields.Many2one('res.country', 'Country', required=True)
country_id = fields.Many2one("res.country", "Country", required=True)
code_row_index = fields.Integer(
related='country_id.geonames_state_code_column',
readonly=True)
name_row_index = fields.Integer(
related='country_id.geonames_state_name_column')
letter_case = fields.Selection([
('unchanged', 'Unchanged'),
('title', 'Title Case'),
('upper', 'Upper Case'),
], string='Letter Case', default='unchanged',
related="country_id.geonames_state_code_column", readonly=True
)
name_row_index = fields.Integer(related="country_id.geonames_state_name_column")
letter_case = fields.Selection(
[("unchanged", "Unchanged"), ("title", "Title Case"), ("upper", "Upper Case")],
string="Letter Case",
default="unchanged",
help="Converts retreived city and state names to Title Case "
"(upper case on each first letter of a word) or Upper Case "
"(all letters upper case).")
"(all letters upper case).",
)
@api.model
def transform_city_name(self, city, country):
@ -49,94 +50,94 @@ class CityZipGeonamesImport(models.TransientModel):
:return: Transformed city name
"""
res = city
if self.letter_case == 'title':
if self.letter_case == "title":
res = city.title()
elif self.letter_case == 'upper':
elif self.letter_case == "upper":
res = city.upper()
return res
@api.model
def _domain_search_res_city(self, row, country):
return [('name', '=', self.transform_city_name(row[2], country)),
('country_id', '=', country.id)]
return [
("name", "=", self.transform_city_name(row[2], country)),
("country_id", "=", country.id),
]
@api.model
def _domain_search_city_zip(self, row, res_city):
domain = [('name', '=', row[1])]
domain = [("name", "=", row[1])]
if res_city:
domain += [('city_id', '=', res_city.id)]
domain += [("city_id", "=", res_city.id)]
return domain
@api.model
def select_state(self, row, country):
code = row[self.code_row_index or 4]
return self.env['res.country.state'].search(
[('country_id', '=', country.id),
('code', '=', code)], limit=1,
return self.env["res.country.state"].search(
[("country_id", "=", country.id), ("code", "=", code)], limit=1
)
@api.model
def select_city(self, row, country):
res_city_model = self.env['res.city']
return res_city_model.search(self._domain_search_res_city(
row, country), limit=1)
res_city_model = self.env["res.city"]
return res_city_model.search(
self._domain_search_res_city(row, country), limit=1
)
@api.model
def select_zip(self, row, country):
city = self.select_city(row, country)
return self.env['res.city.zip'].search(self._domain_search_city_zip(
row, city))
return self.env["res.city.zip"].search(self._domain_search_city_zip(row, city))
@api.model
def prepare_state(self, row, country):
return {
'name': row[self.name_row_index or 3],
'code': row[self.code_row_index or 4],
'country_id': country.id,
"name": row[self.name_row_index or 3],
"code": row[self.code_row_index or 4],
"country_id": country.id,
}
@api.model
def prepare_city(self, row, country, state_id):
vals = {
'name': self.transform_city_name(row[2], country),
'state_id': state_id,
'country_id': country.id,
"name": self.transform_city_name(row[2], country),
"state_id": state_id,
"country_id": country.id,
}
return vals
@api.model
def prepare_zip(self, row, city_id):
vals = {
'name': row[1],
'city_id': city_id,
}
vals = {"name": row[1], "city_id": city_id}
return vals
@api.model
def get_and_parse_csv(self):
country_code = self.country_id.code
config_url = self.env['ir.config_parameter'].get_param(
'geonames.url',
default='http://download.geonames.org/export/zip/%s.zip')
config_url = self.env["ir.config_parameter"].get_param(
"geonames.url", default="http://download.geonames.org/export/zip/%s.zip"
)
url = config_url % country_code
logger.info('Starting to download %s' % url)
logger.info("Starting to download %s" % url)
res_request = requests.get(url)
if res_request.status_code != requests.codes.ok:
raise UserError(
_('Got an error %d when trying to download the file %s.')
% (res_request.status_code, url))
_("Got an error %d when trying to download the file %s.")
% (res_request.status_code, url)
)
f_geonames = zipfile.ZipFile(io.BytesIO(res_request.content))
tempdir = tempfile.mkdtemp(prefix='odoo')
f_geonames.extract('%s.txt' % country_code, tempdir)
tempdir = tempfile.mkdtemp(prefix="odoo")
f_geonames.extract("%s.txt" % country_code, tempdir)
data_file = open(os.path.join(tempdir, '%s.txt' % country_code), 'r',
encoding='utf-8')
data_file = open(
os.path.join(tempdir, "%s.txt" % country_code), "r", encoding="utf-8"
)
data_file.seek(0)
reader = csv.reader(data_file, delimiter=' ')
reader = csv.reader(data_file, delimiter=" ")
parsed_csv = [row for i, row in enumerate(reader)]
data_file.close()
logger.info('The geonames zipfile has been decompressed')
logger.info("The geonames zipfile has been decompressed")
return parsed_csv
def _create_states(self, parsed_csv, search_states, max_import):
@ -146,8 +147,7 @@ class CityZipGeonamesImport(models.TransientModel):
for i, row in enumerate(parsed_csv):
if max_import and i == max_import:
break
state = self.select_state(
row, self.country_id) if search_states else False
state = self.select_state(row, self.country_id) if search_states else False
if not state:
state_vals = self.prepare_state(row, self.country_id)
if state_vals not in state_vals_list:
@ -155,13 +155,12 @@ class CityZipGeonamesImport(models.TransientModel):
else:
state_dict[state.code] = state.id
created_states = self.env['res.country.state'].create(state_vals_list)
created_states = self.env["res.country.state"].create(state_vals_list)
for i, vals in enumerate(state_vals_list):
state_dict[vals['code']] = created_states[i].id
state_dict[vals["code"]] = created_states[i].id
return state_dict
def _create_cities(self, parsed_csv,
search_cities, max_import, state_dict):
def _create_cities(self, parsed_csv, search_cities, max_import, state_dict):
# Cities
city_vals_list = []
city_dict = {}
@ -169,18 +168,16 @@ class CityZipGeonamesImport(models.TransientModel):
if max_import and i == max_import:
break
state_id = state_dict[row[self.code_row_index or 4]]
city = self.select_city(
row, self.country_id) if search_cities else False
city = self.select_city(row, self.country_id) if search_cities else False
if not city:
city_vals = self.prepare_city(
row, self.country_id, state_id)
city_vals = self.prepare_city(row, self.country_id, state_id)
if city_vals not in city_vals_list:
city_vals_list.append(city_vals)
else:
city_dict[(city.name, state_id)] = city.id
created_cities = self.env['res.city'].create(city_vals_list)
created_cities = self.env["res.city"].create(city_vals_list)
for i, vals in enumerate(city_vals_list):
city_dict[(vals['name'], vals['state_id'])] = created_cities[i].id
city_dict[(vals["name"], vals["state_id"])] = created_cities[i].id
return city_dict
def run_import(self):
@ -189,28 +186,29 @@ class CityZipGeonamesImport(models.TransientModel):
return self._process_csv(parsed_csv)
def _process_csv(self, parsed_csv):
state_model = self.env['res.country.state']
zip_model = self.env['res.city.zip']
res_city_model = self.env['res.city']
state_model = self.env["res.country.state"]
zip_model = self.env["res.city.zip"]
res_city_model = self.env["res.city"]
# Store current record list
current_zips = zip_model.search(
[('city_id.country_id', '=', self.country_id.id)])
[("city_id.country_id", "=", self.country_id.id)]
)
search_zips = True and len(current_zips) > 0 or False
current_cities = res_city_model.search(
[('country_id', '=', self.country_id.id)])
[("country_id", "=", self.country_id.id)]
)
search_cities = True and len(current_cities) > 0 or False
current_states = state_model.search(
[('country_id', '=', self.country_id.id)])
current_states = state_model.search([("country_id", "=", self.country_id.id)])
search_states = True and len(current_states) > 0 or False
max_import = self.env.context.get('max_import', 0)
logger.info('Starting to create the cities and/or city zip entries')
max_import = self.env.context.get("max_import", 0)
logger.info("Starting to create the cities and/or city zip entries")
state_dict = self._create_states(parsed_csv,
search_states, max_import)
city_dict = self._create_cities(parsed_csv,
search_cities, max_import, state_dict)
state_dict = self._create_states(parsed_csv, search_states, max_import)
city_dict = self._create_cities(
parsed_csv, search_cities, max_import, state_dict
)
# Zips
zip_vals_list = []
@ -223,21 +221,22 @@ class CityZipGeonamesImport(models.TransientModel):
zip_code = self.select_zip(row, self.country_id)
if not zip_code:
state_id = state_dict[row[self.code_row_index or 4]]
city_id = city_dict[(
self.transform_city_name(row[2], self.country_id),
state_id,
)]
city_id = city_dict[
(self.transform_city_name(row[2], self.country_id), state_id)
]
zip_vals = self.prepare_zip(row, city_id)
if zip_vals not in zip_vals_list:
zip_vals_list.append(zip_vals)
delete_zips = self.env['res.city.zip'].create(zip_vals_list)
delete_zips = self.env["res.city.zip"].create(zip_vals_list)
current_zips -= delete_zips
if not max_import:
current_zips.unlink()
logger.info('%d city zip entries deleted for country %s' %
(len(current_zips), self.country_id.name))
logger.info(
"%d city zip entries deleted for country %s"
% (len(current_zips), self.country_id.name)
)
# Since we wrapped the entire cities
# creation in a function we need
@ -245,14 +244,19 @@ class CityZipGeonamesImport(models.TransientModel):
# order to know which are the new ones so
# we can delete the old ones
created_cities = res_city_model.search(
[('country_id', '=', self.country_id.id),
('id', 'in', list(city_dict.values()))]
[
("country_id", "=", self.country_id.id),
("id", "in", list(city_dict.values())),
]
)
current_cities -= created_cities
current_cities.unlink()
logger.info('%d res.city entries deleted for country %s' %
(len(current_cities), self.country_id.name))
logger.info(
'The wizard to create cities and/or city zip entries from '
'geonames has been successfully completed.')
"%d res.city entries deleted for country %s"
% (len(current_cities), self.country_id.name)
)
logger.info(
"The wizard to create cities and/or city zip entries from "
"geonames has been successfully completed."
)
return True
Loading…
Cancel
Save