@ -1,13 +1,13 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 Akretion (Alexis de Lattre
# Copyright 2014-2016 Akretion (Alexis de Lattre
# <alexis.delattre@akretion.com>)
# <alexis.delattre@akretion.com>)
# Copyright 2014 Lorenzo Battistini <lorenzo.battistini@agilebg.com>
# Copyright 2014 Lorenzo Battistini <lorenzo.battistini@agilebg.com>
# Copyright 2016 Pedro M. Baeza <pedro.baeza@tecnativa.com>
# Copyright 2016 Pedro M. Baeza <pedro.baeza@tecnativa.com>
# Copyright 2017 Eficent Business and IT Consulting Services, S.L.
# Copyright 2017 Eficent Business and IT Consulting Services, S.L.
# <contact@eficent.com>
# <contact@eficent.com>
# Copyright 2018 Aitor Bouzas <aitor.bouzas@adaptivecity.com>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
from odoo import _ , api , fields , models , tools
from odoo import _ , api , fields , models
from odoo.exceptions import UserError
from odoo.exceptions import UserError
import requests
import requests
import tempfile
import tempfile
@ -20,17 +20,17 @@ import csv
logger = logging . getLogger ( __name__ )
logger = logging . getLogger ( __name__ )
class Better ZipGeonamesImport( models . TransientModel ) :
_name = ' better .zip.geonames.import'
_description = ' Import Better Zip from Geonames '
class City ZipGeonamesImport( models . TransientModel ) :
_name = ' city .zip.geonames.import'
_description = ' Import City Zips from Geonames '
_rec_name = ' country_id '
_rec_name = ' country_id '
country_id = fields . Many2one ( ' res.country ' , ' Country ' , required = True )
country_id = fields . Many2one ( ' res.country ' , ' Country ' , required = True )
enforce_cities = fields . Boolean ( string = ' Enforce Cities ' ,
help = ' The city will be created as a '
' separate entity. ' ,
related = ' country_id.enforce_cities ' ,
code_row_index = fields . Integer (
related = ' country_id.geonames_state_code_column ' ,
readonly = True )
readonly = True )
name_row_index = fields . Integer (
related = ' country_id.geonames_state_name_column ' )
letter_case = fields . Selection ( [
letter_case = fields . Selection ( [
( ' unchanged ' , ' Unchanged ' ) ,
( ' unchanged ' , ' Unchanged ' ) ,
@ -48,7 +48,12 @@ class BetterZipGeonamesImport(models.TransientModel):
: param country : Country record
: param country : Country record
: return : Transformed city name
: return : Transformed city name
"""
"""
return city
res = city
if self . letter_case == ' title ' :
res = city . title ( )
elif self . letter_case == ' upper ' :
res = city . upper ( )
return res
@api.model
@api.model
def _domain_search_res_city ( self , row , country ) :
def _domain_search_res_city ( self , row , country ) :
@ -56,127 +61,59 @@ class BetterZipGeonamesImport(models.TransientModel):
( ' country_id ' , ' = ' , country . id ) ]
( ' country_id ' , ' = ' , country . id ) ]
@api.model
@api.model
def _domain_search_better_zip ( self , row , country , res_city ) :
domain = [ ( ' name ' , ' = ' , row [ 1 ] ) ,
( ' city ' , ' = ' , self . transform_city_name ( row [ 2 ] , country ) ) ,
( ' country_id ' , ' = ' , country . id ) ]
def _domain_search_city_zip ( self , row , res_city ) :
domain = [ ( ' name ' , ' = ' , row [ 1 ] ) ]
if res_city :
if res_city :
domain + = [ ( ' city_id ' , ' = ' , res_city . id ) ]
domain + = [ ( ' city_id ' , ' = ' , res_city . id ) ]
return domain
return domain
@api.model
@api.model
def _prepare_res_city ( self , row , country ) :
state = self . select_or_create_state ( row , country )
def select_state ( self , row , country ) :
code = row [ self . code_row_index or 4 ]
return self . env [ ' res.country.state ' ] . search (
[ ( ' country_id ' , ' = ' , country . id ) ,
( ' code ' , ' = ' , code ) ] , limit = 1 ,
)
@api.model
def select_city ( self , row , country ) :
res_city_model = self . env [ ' res.city ' ]
return res_city_model . search ( self . _domain_search_res_city (
row , country ) , limit = 1 )
@api.model
def select_zip ( self , row , country ) :
city = self . select_city ( row , country )
return self . env [ ' res.city.zip ' ] . search ( self . _domain_search_city_zip (
row , city ) )
@api.model
def prepare_state ( self , row , country ) :
return {
' name ' : row [ self . name_row_index or 3 ] ,
' code ' : row [ self . code_row_index or 4 ] ,
' country_id ' : country . id ,
}
@api.model
def prepare_city ( self , row , country , state_id ) :
vals = {
vals = {
' name ' : self . transform_city_name ( row [ 2 ] , country ) ,
' name ' : self . transform_city_name ( row [ 2 ] , country ) ,
' state_id ' : state . id ,
' state_id ' : state_ id ,
' country_id ' : country . id ,
' country_id ' : country . id ,
}
}
return vals
return vals
@api.model
@api.model
def _prepare_better_zip ( self , row , country , res_city ) :
state = self . select_or_create_state ( row , country )
city_name = self . transform_city_name ( row [ 2 ] , country )
def prepare_zip ( self , row , city_id ) :
vals = {
vals = {
' name ' : row [ 1 ] ,
' name ' : row [ 1 ] ,
' city_id ' : res_city and res_city . id or False ,
' city ' : res_city and res_city . name or city_name ,
' state_id ' : state . id ,
' country_id ' : country . id ,
' latitude ' : row [ 9 ] ,
' longitude ' : row [ 10 ] ,
' city_id ' : city_id ,
}
}
return vals
return vals
@api.model
@api.model
def create_better_zip ( self , row , country , res_city ) :
if row [ 0 ] != country . code :
raise UserError (
_ ( " The country code inside the file ( %s ) doesn ' t "
" correspond to the selected country ( %s ). " )
% ( row [ 0 ] , country . code ) )
logger . debug ( ' ZIP = %s - City = %s ' % ( row [ 1 ] , row [ 2 ] ) )
if self . letter_case == ' title ' :
row [ 2 ] = row [ 2 ] . title ( )
row [ 3 ] = row [ 3 ] . title ( )
elif self . letter_case == ' upper ' :
row [ 2 ] = row [ 2 ] . upper ( )
row [ 3 ] = row [ 3 ] . upper ( )
if row [ 1 ] and row [ 2 ] :
zip_model = self . env [ ' res.better.zip ' ]
zips = zip_model . search ( self . _domain_search_better_zip (
row , country , res_city ) )
if zips :
return zips [ 0 ]
else :
vals = self . _prepare_better_zip ( row , country , res_city )
if vals :
logger . debug ( ' Creating res.better.zip %s ' % vals [ ' name ' ] )
return zip_model . create ( vals )
else : # pragma: no cover
return False
@api.model
def create_res_city ( self , row , country ) :
if row [ 0 ] != country . code :
raise UserError (
_ ( " The country code inside the file ( %s ) doesn ' t "
" correspond to the selected country ( %s ). " )
% ( row [ 0 ] , country . code ) )
logger . debug ( ' Processing city creation for ZIP = %s - City = %s ' %
( row [ 1 ] , row [ 2 ] ) )
if self . letter_case == ' title ' :
row [ 2 ] = row [ 2 ] . title ( )
row [ 3 ] = row [ 3 ] . title ( )
elif self . letter_case == ' upper ' :
row [ 2 ] = row [ 2 ] . upper ( )
row [ 3 ] = row [ 3 ] . upper ( )
if row [ 2 ] :
res_city_model = self . env [ ' res.city ' ]
res_cities = res_city_model . search ( self . _domain_search_res_city (
row , country ) )
if res_cities :
return res_cities [ 0 ]
else :
vals = self . _prepare_res_city ( row , country )
if vals :
logger . debug ( ' Creating res.city %s ' % vals [ ' name ' ] )
return res_city_model . create ( vals )
else : # pragma: no cover
return False
@tools.ormcache ( ' country_id ' , ' code ' )
def _get_state ( self , country_id , code , name ) :
state = self . env [ ' res.country.state ' ] . search (
[ ( ' country_id ' , ' = ' , country_id ) ,
( ' code ' , ' = ' , code ) ] , limit = 1 ,
)
if state : # pragma: no cover
return state
else :
return self . env [ ' res.country.state ' ] . create ( {
' name ' : name ,
' code ' : code ,
' country_id ' : country_id ,
} )
@api.model
def select_or_create_state (
self , row , country , code_row_index = 4 , name_row_index = 3 ) :
if country . geonames_state_code_column :
code_row_index = country . geonames_state_code_column
if country . geonames_state_name_column :
name_row_index = country . geonames_state_name_column
return self . _get_state (
country . id , row [ code_row_index ] , row [ name_row_index ] ,
)
@api.multi
def run_import ( self ) :
self . ensure_one ( )
zip_model = self . env [ ' res.better.zip ' ]
res_city_model = self . env [ ' res.city ' ]
def get_and_parse_csv ( self ) :
country_code = self . country_id . code
country_code = self . country_id . code
config_url = self . env [ ' ir.config_parameter ' ] . get_param (
config_url = self . env [ ' ir.config_parameter ' ] . get_param (
' geonames.url ' ,
' geonames.url ' ,
@ -188,47 +125,131 @@ class BetterZipGeonamesImport(models.TransientModel):
raise UserError (
raise UserError (
_ ( ' Got an error %d when trying to download the file %s . ' )
_ ( ' Got an error %d when trying to download the file %s . ' )
% ( res_request . status_code , url ) )
% ( res_request . status_code , url ) )
# Store current record list
res_cities_to_delete = res_city_model
zips_to_delete = zip_model . search (
[ ( ' country_id ' , ' = ' , self . country_id . id ) ] )
if self . enforce_cities :
res_cities_to_delete = res_city_model . search (
[ ( ' country_id ' , ' = ' , self . country_id . id ) ] )
f_geonames = zipfile . ZipFile ( io . BytesIO ( res_request . content ) )
f_geonames = zipfile . ZipFile ( io . BytesIO ( res_request . content ) )
tempdir = tempfile . mkdtemp ( prefix = ' odoo ' )
tempdir = tempfile . mkdtemp ( prefix = ' odoo ' )
f_geonames . extract ( ' %s .txt ' % country_code , tempdir )
f_geonames . extract ( ' %s .txt ' % country_code , tempdir )
logger . info ( ' The geonames zipfile has been decompressed ' )
data_file = open ( os . path . join ( tempdir , ' %s .txt ' % country_code ) , ' r ' ,
data_file = open ( os . path . join ( tempdir , ' %s .txt ' % country_code ) , ' r ' ,
encoding = ' utf-8 ' )
encoding = ' utf-8 ' )
data_file . seek ( 0 )
data_file . seek ( 0 )
logger . info ( ' Starting to create the cities and/or better zip entries ' )
max_import = self . env . context . get ( ' max_import ' , 0 )
reader = csv . reader ( data_file , delimiter = ' ' )
reader = csv . reader ( data_file , delimiter = ' ' )
for i , row in enumerate ( reader ) :
res_city = False
if self . enforce_cities :
res_city = self . create_res_city ( row , self . country_id )
if res_city in res_cities_to_delete :
res_cities_to_delete - = res_city
zip_code = self . create_better_zip ( row , self . country_id ,
res_city )
if zip_code in zips_to_delete :
zips_to_delete - = zip_code
if max_import and ( i + 1 ) == max_import :
break
parsed_csv = [ row for i , row in enumerate ( reader ) ]
data_file . close ( )
data_file . close ( )
if zips_to_delete and not max_import :
zips_to_delete . unlink ( )
logger . info ( ' %d better zip entries deleted for country %s ' %
( len ( zips_to_delete ) , self . country_id . name ) )
if res_cities_to_delete and not max_import :
res_cities_to_delete . unlink ( )
logger . info ( ' The geonames zipfile has been decompressed ' )
return parsed_csv
def _create_states ( self , parsed_csv , search_states , max_import ) :
# States
state_vals_list = [ ]
state_dict = { }
for i , row in enumerate ( parsed_csv ) :
if max_import and i == max_import :
break
state = self . select_state (
row , self . country_id ) if search_states else False
if not state :
state_vals = self . prepare_state ( row , self . country_id )
if state_vals not in state_vals_list :
state_vals_list . append ( state_vals )
else :
state_dict [ state . code ] = state . id
created_states = self . env [ ' res.country.state ' ] . create ( state_vals_list )
for i , vals in enumerate ( state_vals_list ) :
state_dict [ vals [ ' code ' ] ] = created_states [ i ] . id
return state_dict
def _create_cities ( self , parsed_csv ,
search_cities , max_import , state_dict ) :
# Cities
city_vals_list = [ ]
city_dict = { }
for i , row in enumerate ( parsed_csv ) :
if max_import and i == max_import :
break
city = self . select_city (
row , self . country_id ) if search_cities else False
if not city :
state_id = state_dict [
row [ self . code_row_index or 4 ] ]
city_vals = self . prepare_city (
row , self . country_id , state_id )
if city_vals not in city_vals_list :
city_vals_list . append ( city_vals )
else :
city_dict [ city . name ] = city . id
created_cities = self . env [ ' res.city ' ] . create ( city_vals_list )
for i , vals in enumerate ( city_vals_list ) :
city_dict [ vals [ ' name ' ] ] = created_cities [ i ] . id
return city_dict
@api.multi
def run_import ( self ) :
self . ensure_one ( )
state_model = self . env [ ' res.country.state ' ]
zip_model = self . env [ ' res.city.zip ' ]
res_city_model = self . env [ ' res.city ' ]
# Store current record list
current_zips = zip_model . search (
[ ( ' city_id.country_id ' , ' = ' , self . country_id . id ) ] )
search_zips = True and len ( current_zips ) > 0 or False
current_cities = res_city_model . search (
[ ( ' country_id ' , ' = ' , self . country_id . id ) ] )
search_cities = True and len ( current_cities ) > 0 or False
current_states = state_model . search (
[ ( ' country_id ' , ' = ' , self . country_id . id ) ] )
search_states = True and len ( current_states ) > 0 or False
parsed_csv = self . get_and_parse_csv ( )
max_import = self . env . context . get ( ' max_import ' , 0 )
logger . info ( ' Starting to create the cities and/or city zip entries ' )
state_dict = self . _create_states ( parsed_csv ,
search_states , max_import )
city_dict = self . _create_cities ( parsed_csv ,
search_cities , max_import , state_dict )
# Zips
zip_vals_list = [ ]
for i , row in enumerate ( parsed_csv ) :
if max_import and i == max_import :
break
# Don't search if there aren't any records
zip = False
if search_zips :
zip = self . select_zip ( row , self . country_id )
if not zip :
city_id = city_dict [
self . transform_city_name ( row [ 2 ] , self . country_id ) ]
zip_vals = self . prepare_zip ( row , city_id )
if zip_vals not in zip_vals_list :
zip_vals_list . append ( zip_vals )
delete_zips = self . env [ ' res.city.zip ' ] . create ( zip_vals_list )
current_zips - = delete_zips
if not max_import :
current_zips . unlink ( )
logger . info ( ' %d city zip entries deleted for country %s ' %
( len ( current_zips ) , self . country_id . name ) )
# Since we wrapped the entire cities
# creation in a function we need
# to perform a search with city_dict in
# order to know which are the new ones so
# we can delete the old ones
created_cities = res_city_model . search (
[ ( ' country_id ' , ' = ' , self . country_id . id ) ,
( ' id ' , ' in ' , [ value for key , value in city_dict . items ( ) ] ) ]
)
current_cities - = created_cities
current_cities . unlink ( )
logger . info ( ' %d res.city entries deleted for country %s ' %
logger . info ( ' %d res.city entries deleted for country %s ' %
( len ( res_cities_to_delete ) , self . country_id . name ) )
( len ( current_cities ) , self . country_id . name ) )
logger . info (
logger . info (
' The wizard to create cities and/or better zip entries from '
' The wizard to create cities and/or city zip entries from '
' geonames has been successfully completed. ' )
' geonames has been successfully completed. ' )
return True
return True