Browse Source

[FIX] WARNING test_8 openerp.models: Cannot execute name_search, no _rec_name defined on better.zip.geonames.import

[REF] porting to new api

[IMP] removing 'FOR UPDATE NOWAIT' as ROW EXCLUSIVE lock is already acquired by DELETE and INSERT
http://www.postgresql.org/docs/9.2/static/explicit-locking.html

[REF] select_or_create_state
and tests

[FIX] __openerp__.py PEP8

[FIX] TypeError: unlink() got multiple values for keyword argument 'context'

[IMP] using ir.config_parameter for geonames URL

[FIX] missing cr uid
pull/643/head
Lorenzo Battistini 10 years ago
committed by Pedro M. Baeza
parent
commit
1b012ec30b
  1. 14
      base_location_geonames_import/__openerp__.py
  2. 54
      base_location_geonames_import/test/import.yml
  3. 100
      base_location_geonames_import/wizard/geonames_import.py

14
base_location_geonames_import/__openerp__.py

@ -4,6 +4,8 @@
# Base Location Geonames Import module for OpenERP # Base Location Geonames Import module for OpenERP
# Copyright (C) 2014 Akretion (http://www.akretion.com) # Copyright (C) 2014 Akretion (http://www.akretion.com)
# @author Alexis de Lattre <alexis.delattre@akretion.com> # @author Alexis de Lattre <alexis.delattre@akretion.com>
# Copyright (C) 2014 Agile Business Group (http://www.agilebg.com)
# @author Lorenzo Battistini <lorenzo.battistini@agilebg.com>
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as # it under the terms of the GNU Affero General Public License as
@ -23,7 +25,7 @@
{ {
'name': 'Base Location Geonames Import', 'name': 'Base Location Geonames Import',
'version': '0.1',
'version': '8.0.0.2.0',
'category': 'Extra Tools', 'category': 'Extra Tools',
'license': 'AGPL-3', 'license': 'AGPL-3',
'summary': 'Import better zip entries from Geonames', 'summary': 'Import better zip entries from Geonames',
@ -31,7 +33,10 @@
Base Location Geonames Import Base Location Geonames Import
============================= =============================
This module adds a wizard to import better zip entries from Geonames (http://download.geonames.org/export/zip/).
This module adds a wizard to import better zip entries from Geonames
(http://download.geonames.org/export/zip/).
If want want/need to modify the URL, you can set the 'geonames.url'
system parameter.
When you start the wizard, When you start the wizard,
it will ask you to select a country. Then, for the selected country, it will ask you to select a country. Then, for the selected country,
@ -54,7 +59,10 @@ Contributors
'external_dependencies': {'python': ['requests', 'unicodecsv']}, 'external_dependencies': {'python': ['requests', 'unicodecsv']},
'data': [ 'data': [
'wizard/geonames_import_view.xml', 'wizard/geonames_import_view.xml',
],
],
'test': [
'test/import.yml'
],
'installable': True, 'installable': True,
'active': False, 'active': False,
} }

54
base_location_geonames_import/test/import.yml

@ -0,0 +1,54 @@
-
I create the wizard
-
!record {model: better.zip.geonames.import, id: import_wizard_1, view: better_zip_geonames_import_form}:
country_id: base.it
-
I run the import
-
!python {model: better.zip.geonames.import}: |
self.run_import(cr, uid, [ref('import_wizard_1')], context=context)
-
I check the data
-
!python {model: res.better.zip}: |
state_obj = self.pool['res.country.state']
state_ids = state_obj.search(cr, uid, [
('code', '=', 'LG'),
('country_id', '=', ref('base.it')),
], context=context)
assert len(state_ids) == 1, "There must be 1 LG"
zip_ids = self.search(cr, uid, [
('name', '=', '16017'),
('city', '=', 'Isola Del Cantone'),
('state_id', '=', state_ids[0]),
('country_id', '=', ref('base.it'))
], context=context)
assert len(zip_ids) == 1, "There must be 1 'Isola Del Cantone'"
-
I create the wizard again
-
!record {model: better.zip.geonames.import, id: import_wizard_2, view: better_zip_geonames_import_form}:
country_id: base.it
-
I run the import again
-
!python {model: better.zip.geonames.import}: |
self.run_import(cr, uid, [ref('import_wizard_2')], context=context)
-
I check the data
-
!python {model: res.better.zip}: |
state_obj = self.pool['res.country.state']
state_ids = state_obj.search(cr, uid, [
('code', '=', 'LG'),
('country_id', '=', ref('base.it')),
], context=context)
assert len(state_ids) == 1, "There must be 1 LG"
zip_ids = self.search(cr, uid, [
('name', '=', '16017'),
('city', '=', 'Isola Del Cantone'),
('state_id', '=', state_ids[0]),
('country_id', '=', ref('base.it'))
], context=context)
assert len(zip_ids) == 1, "There must be 1 'Isola Del Cantone'"

100
base_location_geonames_import/wizard/geonames_import.py

@ -4,6 +4,8 @@
# Base Location Geonames Import module for OpenERP # Base Location Geonames Import module for OpenERP
# Copyright (C) 2014 Akretion (http://www.akretion.com) # Copyright (C) 2014 Akretion (http://www.akretion.com)
# @author Alexis de Lattre <alexis.delattre@akretion.com> # @author Alexis de Lattre <alexis.delattre@akretion.com>
# Copyright (C) 2014 Agile Business Group (http://www.agilebg.com)
# @author Lorenzo Battistini <lorenzo.battistini@agilebg.com>
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as # it under the terms of the GNU Affero General Public License as
@ -20,8 +22,8 @@
# #
############################################################################## ##############################################################################
from openerp.osv import orm, fields
from openerp.tools.translate import _
from openerp import models, fields, api, _
from openerp.exceptions import Warning
import requests import requests
import tempfile import tempfile
import StringIO import StringIO
@ -33,79 +35,84 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class better_zip_geonames_import(orm.TransientModel):
class better_zip_geonames_import(models.TransientModel):
_name = 'better.zip.geonames.import' _name = 'better.zip.geonames.import'
_description = 'Import Better Zip from Geonames' _description = 'Import Better Zip from Geonames'
_rec_name = 'country_id'
_columns = {
'country_id': fields.many2one('res.country', 'Country', required=True),
}
country_id = fields.Many2one('res.country', 'Country', required=True)
def _prepare_better_zip(
self, cr, uid, row, country_id, states, context=None):
'''This function is designed to be inherited'''
state_id = False
if states and row[4] and row[4] in states:
state_id = states[row[4].upper()]
@api.model
def _prepare_better_zip(self, row, country_id):
state = self.select_or_create_state(row, country_id)
vals = { vals = {
'name': row[1], 'name': row[1],
'city': row[2], 'city': row[2],
'state_id': state_id,
'state_id': state.id,
'country_id': country_id, 'country_id': country_id,
} }
return vals return vals
@api.model
def create_better_zip( def create_better_zip(
self, cr, uid, row, country_id, country_code, states,
context=None):
self, row, country_id, country_code):
bzip_id = False bzip_id = False
if row[0] != country_code: if row[0] != country_code:
raise orm.except_orm(
raise Warning(
_('Error:'), _('Error:'),
_("The country code inside the file (%s) doesn't " _("The country code inside the file (%s) doesn't "
"correspond to the selected country (%s).") "correspond to the selected country (%s).")
% (row[0], country_code)) % (row[0], country_code))
logger.debug('ZIP = %s - City = %s' % (row[1], row[2])) logger.debug('ZIP = %s - City = %s' % (row[1], row[2]))
if row[1] and row[2]: if row[1] and row[2]:
vals = self._prepare_better_zip(
cr, uid, row, country_id, states, context=context)
vals = self._prepare_better_zip(row, country_id)
if vals: if vals:
bzip_id = self.pool['res.better.zip'].create(
cr, uid, vals, context=context)
bzip_id = self.env['res.better.zip'].create(vals)
return bzip_id return bzip_id
def run_import(self, cr, uid, ids, context=None):
assert len(ids) == 1, 'Only one ID for the better zip import wizard'
bzip_obj = self.pool['res.better.zip']
wizard = self.browse(cr, uid, ids[0], context=context)
country_id = wizard.country_id.id
country_code = wizard.country_id.code.upper()
url = 'http://download.geonames.org/export/zip/%s.zip' % country_code
@api.model
def select_or_create_state(
self, row, country_id, code_row_index=4, name_row_index=3
):
states = self.env['res.country.state'].search([
('country_id', '=', country_id),
('code', '=', row[code_row_index]),
])
if len(states) > 1:
raise Warning(
_("Too many states with code %s for counrty %s")
% (row[code_row_index], country_id))
if len(states) == 1:
return states[0]
else:
return self.env['res.country.state'].create({
'name': row[name_row_index],
'code': row[code_row_index],
'country_id': country_id
})
@api.one
def run_import(self):
bzip_obj = self.env['res.better.zip']
country_id = self.country_id.id
country_code = self.country_id.code.upper()
config_url = self.pool['ir.config_parameter'].get_param(
self._cr, self._uid, 'geonames.url',
default='http://download.geonames.org/export/zip/%s.zip')
url = config_url % country_code
logger.info('Starting to download %s' % url) logger.info('Starting to download %s' % url)
res_request = requests.get(url) res_request = requests.get(url)
if res_request.status_code != requests.codes.ok: if res_request.status_code != requests.codes.ok:
raise orm.except_orm(
raise Warning(
_('Error:'), _('Error:'),
_('Got an error %d when trying to download the file %s.') _('Got an error %d when trying to download the file %s.')
% (res_request.status_code, url)) % (res_request.status_code, url))
bzip_ids_to_delete = bzip_obj.search(
cr, uid, [('country_id', '=', country_id)], context=context)
bzip_ids_to_delete = bzip_obj.search([('country_id', '=', country_id)])
if bzip_ids_to_delete: if bzip_ids_to_delete:
cr.execute('SELECT id FROM res_better_zip WHERE id in %s '
'FOR UPDATE NOWAIT', (tuple(bzip_ids_to_delete), ))
bzip_obj.unlink(cr, uid, bzip_ids_to_delete, context=context)
bzip_ids_to_delete.unlink()
logger.info( logger.info(
'%d better zip entries deleted for country %s' '%d better zip entries deleted for country %s'
% (len(bzip_ids_to_delete), wizard.country_id.name))
state_ids = self.pool['res.country.state'].search(
cr, uid, [('country_id', '=', country_id)], context=context)
states = {}
# key = code of the state ; value = ID of the state in OpenERP
if state_ids:
states_r = self.pool['res.country.state'].read(
cr, uid, state_ids, ['code', 'country_id'], context=context)
for state in states_r:
states[state['code'].upper()] = state['id']
% (len(bzip_ids_to_delete), self.country_id.name))
f_geonames = zipfile.ZipFile(StringIO.StringIO(res_request.content)) f_geonames = zipfile.ZipFile(StringIO.StringIO(res_request.content))
tempdir = tempfile.mkdtemp(prefix='openerp') tempdir = tempfile.mkdtemp(prefix='openerp')
f_geonames.extract('%s.txt' % country_code, tempdir) f_geonames.extract('%s.txt' % country_code, tempdir)
@ -113,13 +120,10 @@ class better_zip_geonames_import(orm.TransientModel):
data_file = open(os.path.join(tempdir, '%s.txt' % country_code), 'r') data_file = open(os.path.join(tempdir, '%s.txt' % country_code), 'r')
data_file.seek(0) data_file.seek(0)
logger.info( logger.info(
'Starting to create the better zip entries %s state information'
% (states and 'with' or 'without'))
'Starting to create the better zip entries')
for row in unicodecsv.reader( for row in unicodecsv.reader(
data_file, encoding='utf-8', delimiter=' '): data_file, encoding='utf-8', delimiter=' '):
self.create_better_zip(
cr, uid, row, country_id, country_code, states,
context=context)
self.create_better_zip(row, country_id, country_code)
data_file.close() data_file.close()
logger.info( logger.info(
'The wizard to create better zip entries from geonames ' 'The wizard to create better zip entries from geonames '

Loading…
Cancel
Save