Browse Source

[IMP] mis_builder: convenience methods to obtain balance and unallocated p&l

plus improve mechanism to group by account
pull/189/head
Stéphane Bidoul 9 years ago
parent
commit
466cd6087f
  1. 164
      mis_builder/models/aep.py
  2. 28
      mis_builder/models/mis_builder.py
  3. 168
      mis_builder/tests/test_aep.py

164
mis_builder/models/aep.py

@ -4,6 +4,7 @@
import re
from collections import defaultdict
from itertools import izip
from openerp import fields
from openerp.models import expression
@ -11,11 +12,6 @@ from openerp.tools.safe_eval import safe_eval
from openerp.tools.float_utils import float_is_zero
from .accounting_none import AccountingNone
MODE_VARIATION = 'p'
MODE_INITIAL = 'i'
MODE_END = 'e'
MODE_UNALLOCATED = 'u'
class AccountingExpressionProcessor(object):
""" Processor for accounting expressions.
@ -61,11 +57,15 @@ class AccountingExpressionProcessor(object):
* additionally, one query per view/consolidation account is done to
discover the children accounts.
"""
MODE_VARIATION = 'p'
MODE_INITIAL = 'i'
MODE_END = 'e'
MODE_UNALLOCATED = 'u'
ACC_RE = re.compile(r"(?P<field>\bbal|\bcrd|\bdeb)"
r"(?P<mode>[piseu])?"
r"(?P<accounts>_[a-zA-Z0-9]+|\[.*?\])"
r"(?P<domain>\[.*?\])?")
_ACC_RE = re.compile(r"(?P<field>\bbal|\bcrd|\bdeb)"
r"(?P<mode>[piseu])?"
r"(?P<accounts>_[a-zA-Z0-9]+|\[.*?\])"
r"(?P<domain>\[.*?\])?")
def __init__(self, env):
self.env = env
@ -77,6 +77,11 @@ class AccountingExpressionProcessor(object):
# - NNN% for a like
# - NNN for a code with an exact match
self._account_ids_by_code = defaultdict(set)
# smart ending balance (returns AccountingNone if there
# are no moves in period and 0 initial balance), implies
# a first query to get the initial balance and another
# to get the variation, so it's a bit slower
self.smart_end = True
def _load_account_codes(self, account_codes, company):
account_model = self.env['account.account']
@ -87,7 +92,7 @@ class AccountingExpressionProcessor(object):
if account_code is None:
# None means we want all accounts
account_ids = account_model.\
search([]).ids
search([('company_id', '=', company.id)]).ids
self._account_ids_by_code[account_code].update(account_ids)
elif '%' in account_code:
account_ids = account_model.\
@ -109,9 +114,9 @@ class AccountingExpressionProcessor(object):
"""
field, mode, account_codes, domain = mo.groups()
if not mode:
mode = MODE_VARIATION
mode = self.MODE_VARIATION
elif mode == 's':
mode = MODE_END
mode = self.MODE_END
if account_codes.startswith('_'):
account_codes = account_codes[1:]
else:
@ -131,10 +136,10 @@ class AccountingExpressionProcessor(object):
so when all expressions have been parsed, we know which
account codes to query for each domain and mode.
"""
for mo in self.ACC_RE.finditer(expr):
for mo in self._ACC_RE.finditer(expr):
_, mode, account_codes, domain = self._parse_match_object(mo)
if mode == MODE_END:
modes = (MODE_INITIAL, MODE_VARIATION)
if mode == self.MODE_END and self.smart_end:
modes = (self.MODE_INITIAL, self.MODE_VARIATION)
else:
modes = (mode, )
for mode in modes:
@ -156,7 +161,7 @@ class AccountingExpressionProcessor(object):
@classmethod
def has_account_var(cls, expr):
"""Test if an string contains an accounting variable."""
return bool(cls.ACC_RE.search(expr))
return bool(cls._ACC_RE.search(expr))
def get_aml_domain_for_expr(self, expr,
date_from, date_to,
@ -169,7 +174,7 @@ class AccountingExpressionProcessor(object):
"""
aml_domains = []
date_domain_by_mode = {}
for mo in self.ACC_RE.finditer(expr):
for mo in self._ACC_RE.finditer(expr):
field, mode, account_codes, domain = self._parse_match_object(mo)
aml_domain = list(domain)
account_ids = set()
@ -192,9 +197,9 @@ class AccountingExpressionProcessor(object):
def get_aml_domain_for_dates(self, date_from, date_to,
mode,
target_move, company):
if mode == MODE_VARIATION:
if mode == self.MODE_VARIATION:
domain = [('date', '>=', date_from), ('date', '<=', date_to)]
elif mode in (MODE_INITIAL, MODE_END):
elif mode in (self.MODE_INITIAL, self.MODE_END):
# for income and expense account, sum from the beginning
# of the current fiscal year only, for balance sheet accounts
# sum from the beginning of time
@ -204,11 +209,11 @@ class AccountingExpressionProcessor(object):
domain = ['|',
('date', '>=', fields.Date.to_string(fy_date_from)),
('user_type_id.include_initial_balance', '=', True)]
if mode == MODE_INITIAL:
if mode == self.MODE_INITIAL:
domain.append(('date', '<', date_from))
elif mode == MODE_END:
elif mode == self.MODE_END:
domain.append(('date', '<=', date_to))
elif mode == MODE_UNALLOCATED:
elif mode == self.MODE_UNALLOCATED:
date_from_date = fields.Date.from_string(date_from)
fy_date_from = \
company.compute_fiscalyear_dates(date_from_date)['date_from']
@ -246,13 +251,13 @@ class AccountingExpressionProcessor(object):
for acc in accs:
debit = acc['debit'] or 0.0
credit = acc['credit'] or 0.0
if mode in (MODE_INITIAL, MODE_UNALLOCATED) and \
if mode in (self.MODE_INITIAL, self.MODE_UNALLOCATED) and \
float_is_zero(debit-credit, precision_rounding=2):
# in initial mode, ignore accounts with 0 balance
continue
self._data[key][acc['account_id'][0]] = (debit, credit)
def replace_expr(self, expr, account_ids_filter=None):
def replace_expr(self, expr):
"""Replace accounting variables in an expression by their amount.
Returns a new expression string.
@ -266,10 +271,6 @@ class AccountingExpressionProcessor(object):
for account_code in account_codes:
account_ids = self._account_ids_by_code[account_code]
for account_id in account_ids:
# TODO FIXME: improve perf with sets
if account_ids_filter and \
account_id not in account_ids_filter:
continue
debit, credit = \
account_ids_data.get(account_id,
(AccountingNone, AccountingNone))
@ -282,41 +283,114 @@ class AccountingExpressionProcessor(object):
# in initial balance mode, assume 0 is None
# as it does not make sense to distinguish 0 from "no data"
if v is not AccountingNone and \
mode in (MODE_INITIAL, MODE_UNALLOCATED) and \
mode in (self.MODE_INITIAL, self.MODE_UNALLOCATED) and \
float_is_zero(v, precision_rounding=2):
v = AccountingNone
return v
def f(mo):
field, mode, account_codes, domain = self._parse_match_object(mo)
if mode == MODE_END:
if mode == self.MODE_END and self.smart_end:
# split ending balance in initial+variation, so
# if there is no move in period, we end up with AccountingNone
v = s(field, MODE_INITIAL, account_codes, domain) + \
s(field, MODE_VARIATION, account_codes, domain)
v = s(field, self.MODE_INITIAL, account_codes, domain) + \
s(field, self.MODE_VARIATION, account_codes, domain)
else:
v = s(field, mode, account_codes, domain)
return '(' + repr(v) + ')'
return self.ACC_RE.sub(f, expr)
return self._ACC_RE.sub(f, expr)
def get_accounts_in_expr(self, expr):
"""Get the ids of all accounts involved in an expression.
This means only accounts which contribute data to the expression.
def replace_expr_by_account_id(self, expr):
"""Replace accounting variables in an expression by their amount,
iterating by accounts involved in the expression.
Returns a set of account ids.
yields account_id, replaced_expr
This method must be executed after do_queries().
"""
res = set()
for mo in self.ACC_RE.finditer(expr):
_, mode, account_codes, domain = self._parse_match_object(mo)
def s(field, mode, account_codes, domain):
key = (domain, mode)
account_ids_data = self._data[key]
debit, credit = \
account_ids_data.get(account_id,
(AccountingNone, AccountingNone))
if field == 'bal':
v = debit - credit
elif field == 'deb':
v = debit
elif field == 'crd':
v = credit
# in initial balance mode, assume 0 is None
# as it does not make sense to distinguish 0 from "no data"
if v is not AccountingNone and \
mode in (self.MODE_INITIAL, self.MODE_UNALLOCATED) and \
float_is_zero(v, precision_rounding=2):
v = AccountingNone
return v
def f(mo):
field, mode, account_codes, domain = self._parse_match_object(mo)
if mode == self.MODE_END and self.smart_end:
# split ending balance in initial+variation, so
# if there is no move in period, we end up with AccountingNone
v = s(field, self.MODE_INITIAL, account_codes, domain) + \
s(field, self.MODE_VARIATION, account_codes, domain)
else:
v = s(field, mode, account_codes, domain)
return '(' + repr(v) + ')'
account_ids = set()
for mo in self._ACC_RE.finditer(expr):
field, mode, account_codes, domain = self._parse_match_object(mo)
key = (domain, mode)
account_ids_data = self._data[key]
for account_code in account_codes:
account_ids = self._account_ids_by_code[account_code]
# TODO FIXME: improve perf with sets
for account_id in account_ids:
for account_id in self._account_ids_by_code[account_code]:
if account_id in account_ids_data:
res.add(account_id)
return res
account_ids.add(account_id)
for account_id in account_ids:
yield account_id, self._ACC_RE.sub(f, expr)
@classmethod
def get_balances(cls, mode, date_from, date_to, target_move, company):
""" A convenience method to obtain the balances of all accounts
:param mode: MODE_INITIAL|MODE_END|MODE_VARIATION
:param date_from:
:param date_to:
:param target_move: if 'posted', consider only posted moves
:param company:
Returns a dictionary: {account_id, (debit, credit)}
"""
assert mode in (cls.MODE_INITIAL, cls.MODE_END, cls.MODE_VARIATION)
expr = 'deb{mode}[], crd{mode}[]'.format(mode=mode)
aep = AccountingExpressionProcessor(company.env)
# disable smart_end to have the data at once, instead
# of initial + variation
aep.smart_end = False
aep.parse_expr(expr)
aep.done_parsing(company)
aep.do_queries(date_from, date_to, target_move, company)
return aep._data[((), mode)]
@classmethod
def get_unallocated_pl(cls, date, target_move, company):
""" A convenience method to obtain the unallocated profit/loss
of the previous fiscal years
:param date:
:param target_move: if 'posted', consider only posted moves
:param company:
Returns a tuple (debit, credit)
"""
aep = AccountingExpressionProcessor(company.env)
expr = 'deb{mode}[], crd{mode}[]'.format(mode=cls.MODE_UNALLOCATED)
aep.parse_expr(expr)
aep.done_parsing(company)
aep.do_queries(date, date, target_move, company)
values = aep._data[((), cls.MODE_UNALLOCATED)].values()
return tuple(map(sum, izip(*values)))

28
mis_builder/models/mis_builder.py

@ -727,24 +727,20 @@ class MisReport(models.Model):
# in computing other kpis
localdict[kpi.name] = vals
# let's compute the exploded values by account
# we assume there will be no errors, because it is a
# the same as the kpi, just filtered on one account;
# I'd say if we have an exception in this part, it's bug...
# TODO FIXME handle exceptions
if not kpi.auto_expand_accounts:
continue
for account_id in aep.get_accounts_in_expr(kpi.expression):
account_id_vals = []
for expression in kpi.expression_ids:
if expression.subkpi_id and \
subkpis_filter and \
expression.subkpi_id not in subkpis_filter:
continue
kpi_eval_expression = \
aep.replace_expr(expression.name,
account_ids_filter=[account_id])
account_id_vals.\
append(safe_eval(kpi_eval_expression, localdict))
expr = []
for expression in kpi.expression_ids:
if expression.subkpi_id and \
subkpis_filter and \
expression.subkpi_id not in subkpis_filter:
continue
expr.append(expression.name)
expr = ', '.join(expr) # tuple
for account_id, replaced_expr in \
aep.replace_expr_by_account_id(expr):
account_id_vals = safe_eval(replaced_expr, localdict)
kpi_matrix.set_kpi_exploded_vals(kpi_matrix_period, kpi,
account_id,
account_id_vals)

168
mis_builder/tests/test_aep.py

@ -3,6 +3,7 @@
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
import datetime
import time
from openerp import fields
import openerp.tests.common as common
@ -16,14 +17,19 @@ class TestAEP(common.TransactionCase):
def setUp(self):
super(TestAEP, self).setUp()
self.res_company = self.env['res.company']
self.account_model = self.env['account.account']
self.move_model = self.env['account.move']
self.journal_model = self.env['account.journal']
self.curr_year = datetime.date.today().year
self.prev_year = self.curr_year - 1
# create company
self.company = self.res_company.create({
'name': 'AEP Company'})
# create receivable bs account
type_ar = self.browse_ref('account.data_account_type_receivable')
self.account_ar = self.account_model.create({
'company_id': self.company.id,
'code': '400AR',
'name': 'Receivable',
'user_type_id': type_ar.id,
@ -31,21 +37,47 @@ class TestAEP(common.TransactionCase):
# create income pl account
type_in = self.browse_ref('account.data_account_type_revenue')
self.account_in = self.account_model.create({
'company_id': self.company.id,
'code': '700IN',
'name': 'Income',
'user_type_id': type_in.id})
# create journal
self.journal = self.journal_model.create({
'company_id': self.company.id,
'name': 'Sale journal',
'code': 'VEN',
'type': 'sale'})
# create move in december last year
self._create_move(
date=datetime.date(self.prev_year, 12, 1),
amount=100,
debit_acc=self.account_ar,
credit_acc=self.account_in)
# create move in january this year
self._create_move(
date=datetime.date(self.curr_year, 1, 1),
amount=300,
debit_acc=self.account_ar,
credit_acc=self.account_in)
# create move in february this year
self._create_move(
date=datetime.date(self.curr_year, 3, 1),
amount=500,
debit_acc=self.account_ar,
credit_acc=self.account_in)
# create the AEP, and prepare the expressions we'll need
self.aep = AEP(self.env)
self.aep.parse_expr("bali[]")
self.aep.parse_expr("bale[]")
self.aep.parse_expr("balp[]")
self.aep.parse_expr("balu[]")
self.aep.done_parsing(self.env.user.company_id)
self.aep.parse_expr("bali[700IN]")
self.aep.parse_expr("bale[700IN]")
self.aep.parse_expr("balp[700IN]")
self.aep.parse_expr("bali[400AR]")
self.aep.parse_expr("bale[400AR]")
self.aep.parse_expr("balp[400AR]")
self.aep.done_parsing(self.company)
def _create_move(self, date, amount, debit_acc, credit_acc):
move = self.move_model.create({
@ -68,87 +100,119 @@ class TestAEP(common.TransactionCase):
date_from=fields.Date.to_string(date_from),
date_to=fields.Date.to_string(date_to),
target_move='posted',
company=self.env.user.company_id
)
company=self.company)
def _eval(self, expr, acc=None):
def _eval(self, expr):
eval_dict = {'AccountingNone': AccountingNone}
if acc:
return safe_eval(
self.aep.replace_expr(expr, account_ids_filter=[acc.id]),
eval_dict)
else:
return safe_eval(
self.aep.replace_expr(expr),
eval_dict)
return safe_eval(self.aep.replace_expr(expr), eval_dict)
def test_sanity_check(self):
self.assertEquals(self.env.user.company_id.fiscalyear_last_day, 31)
self.assertEquals(self.env.user.company_id.fiscalyear_last_month, 12)
def _eval_by_account_id(self, expr):
res = {}
eval_dict = {'AccountingNone': AccountingNone}
for account_id, replaced_expr in \
self.aep.replace_expr_by_account_id(expr):
res[account_id] = safe_eval(replaced_expr, eval_dict)
return res
def test_aep_1(self):
# create move in december last year
self._create_move(
date=datetime.date(self.prev_year, 12, 1),
amount=100,
debit_acc=self.account_ar,
credit_acc=self.account_in)
# create move in january this year
self._create_move(
date=datetime.date(self.curr_year, 1, 1),
amount=300,
debit_acc=self.account_ar,
credit_acc=self.account_in)
# create move in february this year
self._create_move(
date=datetime.date(self.curr_year, 3, 1),
amount=500,
debit_acc=self.account_ar,
credit_acc=self.account_in)
def test_sanity_check(self):
self.assertEquals(self.company.fiscalyear_last_day, 31)
self.assertEquals(self.company.fiscalyear_last_month, 12)
def test_aep_basic(self):
# let's query for december
self._do_queries(
datetime.date(self.prev_year, 12, 1),
datetime.date(self.prev_year, 12, 31))
# initial balance must be None
self.assertIs(self._eval('bali[]', self.account_in), AccountingNone)
self.assertIs(self._eval('bali[]', self.account_ar), AccountingNone)
self.assertIs(self._eval('bali[400AR]'), AccountingNone)
self.assertIs(self._eval('bali[700IN]'), AccountingNone)
# check variation
self.assertEquals(self._eval('balp[]', self.account_in), -100)
self.assertEquals(self._eval('balp[]', self.account_ar), 100)
self.assertEquals(self._eval('balp[400AR]'), 100)
self.assertEquals(self._eval('balp[700IN]'), -100)
# check ending balance
self.assertEquals(self._eval('bale[]', self.account_in), -100)
self.assertEquals(self._eval('bale[]', self.account_ar), 100)
self.assertEquals(self._eval('bale[400AR]'), 100)
self.assertEquals(self._eval('bale[700IN]'), -100)
# let's query for January
self._do_queries(
datetime.date(self.curr_year, 1, 1),
datetime.date(self.curr_year, 1, 31))
# initial balance is None for income account (it's not carried over)
self.assertIs(self._eval('bali[]', self.account_in), AccountingNone)
self.assertEquals(self._eval('bali[]', self.account_ar), 100)
self.assertEquals(self._eval('bali[400AR]'), 100)
self.assertIs(self._eval('bali[700IN]'), AccountingNone)
# check variation
self.assertEquals(self._eval('balp[]', self.account_in), -300)
self.assertEquals(self._eval('balp[]', self.account_ar), 300)
self.assertEquals(self._eval('balp[400AR]'), 300)
self.assertEquals(self._eval('balp[700IN]'), -300)
# check ending balance
self.assertEquals(self._eval('bale[]', self.account_in), -300)
self.assertEquals(self._eval('bale[]', self.account_ar), 400)
self.assertEquals(self._eval('bale[400AR]'), 400)
self.assertEquals(self._eval('bale[700IN]'), -300)
# let's query for March
self._do_queries(
datetime.date(self.curr_year, 3, 1),
datetime.date(self.curr_year, 3, 31))
# initial balance is the ending balance fo January
self.assertEquals(self._eval('bali[]', self.account_in), -300)
self.assertEquals(self._eval('bali[]', self.account_ar), 400)
self.assertEquals(self._eval('bali[400AR]'), 400)
self.assertEquals(self._eval('bali[700IN]'), -300)
# check variation
self.assertEquals(self._eval('balp[]', self.account_in), -500)
self.assertEquals(self._eval('balp[]', self.account_ar), 500)
self.assertEquals(self._eval('balp[400AR]'), 500)
self.assertEquals(self._eval('balp[700IN]'), -500)
# check ending balance
self.assertEquals(self._eval('bale[]', self.account_in), -800)
self.assertEquals(self._eval('bale[]', self.account_ar), 900)
self.assertEquals(self._eval('bale[400AR]'), 900)
self.assertEquals(self._eval('bale[700IN]'), -800)
# unallocated p&l from previous year
self.assertEquals(self._eval('balu[]'), -100)
# TODO allocate profits, and then...
def test_aep_by_account(self):
self._do_queries(
datetime.date(self.curr_year, 3, 1),
datetime.date(self.curr_year, 3, 31))
variation = self._eval_by_account_id('balp[]')
self.assertEquals(variation, {
self.account_ar.id: 500,
self.account_in.id: -500,
})
variation = self._eval_by_account_id('balp[700IN]')
self.assertEquals(variation, {
self.account_in.id: -500,
})
def test_aep_convenience_methods(self):
initial = AEP.get_balances(
AEP.MODE_INITIAL,
time.strftime('%Y') + '-03-01',
time.strftime('%Y') + '-03-31',
'posted',
self.company)
self.assertEquals(initial, {
self.account_ar.id: (400, 0),
self.account_in.id: (0, 300),
})
variation = AEP.get_balances(
AEP.MODE_VARIATION,
time.strftime('%Y') + '-03-01',
time.strftime('%Y') + '-03-31',
'posted',
self.company)
self.assertEquals(variation, {
self.account_ar.id: (500, 0),
self.account_in.id: (0, 500),
})
end = AEP.get_balances(
AEP.MODE_END,
time.strftime('%Y') + '-03-01',
time.strftime('%Y') + '-03-31',
'posted',
self.company)
self.assertEquals(end, {
self.account_ar.id: (900, 0),
self.account_in.id: (0, 800),
})
unallocated = AEP.get_unallocated_pl(
time.strftime('%Y') + '-03-01',
'posted',
self.company)
self.assertEquals(unallocated, (0, 100))
Loading…
Cancel
Save