# Copyright 2018 ForgeFlow, S.L. (https://www.forgeflow.com) # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). from datetime import datetime, timedelta from odoo import _, api, fields, models from odoo.tools.misc import DEFAULT_SERVER_DATE_FORMAT class ReportStatementCommon(models.AbstractModel): """Abstract Report Statement for use in other models""" _name = "statement.common" _description = "Statement Reports Common" def _get_invoice_address(self, part): inv_addr_id = part.address_get(["invoice"]).get("invoice", part.id) return self.env["res.partner"].browse(inv_addr_id) def _format_date_to_partner_lang( self, date, date_format=DEFAULT_SERVER_DATE_FORMAT ): if isinstance(date, str): date = datetime.strptime(date, DEFAULT_SERVER_DATE_FORMAT) return date.strftime(date_format) if date else "" def _get_account_display_lines( self, company_id, partner_ids, date_start, date_end, account_type ): raise NotImplementedError def _get_account_initial_balance( self, company_id, partner_ids, date_start, account_type ): return {} def _show_buckets_sql_q1(self, partners, date_end, account_type): return str( self._cr.mogrify( """ SELECT l.partner_id, l.currency_id, l.company_id, l.move_id, (abs(COALESCE(l.balance, 0.0)) + sum( coalesce(pr.pr_sign, 0.0) * coalesce(pr.amount, 0.0)) ) * sign(COALESCE(l.balance, 0.0)) AS open_due, (abs(COALESCE(l.amount_currency, 0.0)) + sum( coalesce(pr.pr_sign, 0.0) * CASE WHEN pr.currency_id IS NOT NULL AND pr.currency_id = l.currency_id THEN coalesce(pr.amount_currency, 0.0) WHEN cur.id IS NOT NULL AND ROUND( abs(COALESCE(l.balance, 0.0)), cur.decimal_places) > 0.0 THEN ROUND(coalesce(pr.amount, 0.0) * COALESCE(l.amount_currency, 0.0) / NULLIF(l.balance, 0.0), cur.decimal_places) ELSE ROUND(coalesce(pr.amount, 0.0) * COALESCE(( SELECT r.rate FROM res_currency_rate r JOIN account_move_line aml ON pr.credit_move_id = aml.id WHERE r.currency_id = l.currency_id AND r.name <= aml.date AND (r.company_id IS NULL OR r.company_id = l.company_id) ORDER BY r.company_id, r.name DESC LIMIT 1), 1.0), cur.decimal_places) END) ) * sign(COALESCE(l.amount_currency, 0.0)) AS open_due_currency, CASE WHEN l.date_maturity is null THEN l.date ELSE l.date_maturity END as date_maturity FROM ( SELECT l.*, CASE WHEN l.debit = 0.0 AND l.credit = 0.0 AND l.currency_id IS NOT NULL AND ROUND(COALESCE(l.amount_currency, 0.0), cur.decimal_places) > 0.0 THEN 1 WHEN l.debit = 0.0 AND l.credit = 0.0 AND l.currency_id IS NOT NULL AND ROUND(COALESCE(l.amount_currency, 0.0), cur.decimal_places) < 0.0 THEN -1 WHEN l.balance > 0.0 THEN 1 ELSE -1 END as sign FROM account_move_line l LEFT JOIN res_currency cur ON cur.id = l.currency_id ) l JOIN account_move m ON l.move_id = m.id LEFT JOIN res_currency cur ON cur.id = l.currency_id LEFT JOIN LATERAL (SELECT pr.*, CASE WHEN pr.credit_move_id = l.id THEN l.sign ELSE -l.sign END AS pr_sign FROM account_partial_reconcile pr WHERE pr.max_date <= %(date_end)s AND ( (pr.debit_move_id = l.id) OR (pr.credit_move_id = l.id)) ) as pr ON TRUE WHERE l.partner_id IN %(partners)s AND l.account_internal_type = %(account_type)s AND ( (pr.id IS NOT NULL AND pr.max_date <= %(date_end)s) OR (pr.id IS NULL) ) AND l.date <= %(date_end)s AND not l.blocked AND m.state IN ('posted') GROUP BY l.partner_id, l.currency_id, l.date, l.date_maturity, l.amount_currency, l.balance, l.move_id, l.company_id, l.id """, locals(), ), "utf-8", ) def _show_buckets_sql_q2(self, date_end, minus_30, minus_60, minus_90, minus_120): return str( self._cr.mogrify( """ SELECT partner_id, currency_id, date_maturity, open_due, open_due_currency, move_id, company_id, CASE WHEN %(date_end)s <= date_maturity AND currency_id is null THEN open_due WHEN %(date_end)s <= date_maturity AND currency_id is not null THEN open_due_currency ELSE 0.0 END as current, CASE WHEN %(minus_30)s < date_maturity AND date_maturity < %(date_end)s AND currency_id is null THEN open_due WHEN %(minus_30)s < date_maturity AND date_maturity < %(date_end)s AND currency_id is not null THEN open_due_currency ELSE 0.0 END as b_1_30, CASE WHEN %(minus_60)s < date_maturity AND date_maturity <= %(minus_30)s AND currency_id is null THEN open_due WHEN %(minus_60)s < date_maturity AND date_maturity <= %(minus_30)s AND currency_id is not null THEN open_due_currency ELSE 0.0 END as b_30_60, CASE WHEN %(minus_90)s < date_maturity AND date_maturity <= %(minus_60)s AND currency_id is null THEN open_due WHEN %(minus_90)s < date_maturity AND date_maturity <= %(minus_60)s AND currency_id is not null THEN open_due_currency ELSE 0.0 END as b_60_90, CASE WHEN %(minus_120)s < date_maturity AND date_maturity <= %(minus_90)s AND currency_id is null THEN open_due WHEN %(minus_120)s < date_maturity AND date_maturity <= %(minus_90)s AND currency_id is not null THEN open_due_currency ELSE 0.0 END as b_90_120, CASE WHEN date_maturity <= %(minus_120)s AND currency_id is null THEN open_due WHEN date_maturity <= %(minus_120)s AND currency_id is not null THEN open_due_currency ELSE 0.0 END as b_over_120 FROM Q1 GROUP BY partner_id, currency_id, date_maturity, open_due, open_due_currency, move_id, company_id """, locals(), ), "utf-8", ) def _show_buckets_sql_q3(self, company_id): return str( self._cr.mogrify( """ SELECT Q2.partner_id, current, b_1_30, b_30_60, b_60_90, b_90_120, b_over_120, COALESCE(Q2.currency_id, c.currency_id) AS currency_id FROM Q2 JOIN res_company c ON (c.id = Q2.company_id) WHERE c.id = %(company_id)s """, locals(), ), "utf-8", ) def _show_buckets_sql_q4(self): return """ SELECT partner_id, currency_id, sum(current) as current, sum(b_1_30) as b_1_30, sum(b_30_60) as b_30_60, sum(b_60_90) as b_60_90, sum(b_90_120) as b_90_120, sum(b_over_120) as b_over_120 FROM Q3 GROUP BY partner_id, currency_id """ def _get_bucket_dates(self, date_end, aging_type): return getattr( self, "_get_bucket_dates_%s" % aging_type, self._get_bucket_dates_days )(date_end) def _get_bucket_dates_days(self, date_end): return { "date_end": date_end, "minus_30": date_end - timedelta(days=30), "minus_60": date_end - timedelta(days=60), "minus_90": date_end - timedelta(days=90), "minus_120": date_end - timedelta(days=120), } def _get_bucket_dates_months(self, date_end): res = {} d = date_end for k in ("date_end", "minus_30", "minus_60", "minus_90", "minus_120"): res[k] = d d = d.replace(day=1) - timedelta(days=1) return res def _get_account_show_buckets( self, company_id, partner_ids, date_end, account_type, aging_type ): buckets = dict(map(lambda x: (x, []), partner_ids)) partners = tuple(partner_ids) full_dates = self._get_bucket_dates(date_end, aging_type) # pylint: disable=E8103 # All input queries are properly escaped - false positive self.env.cr.execute( """ WITH Q1 AS (%s), Q2 AS (%s), Q3 AS (%s), Q4 AS (%s) SELECT partner_id, currency_id, current, b_1_30, b_30_60, b_60_90, b_90_120, b_over_120, current+b_1_30+b_30_60+b_60_90+b_90_120+b_over_120 AS balance FROM Q4 GROUP BY partner_id, currency_id, current, b_1_30, b_30_60, b_60_90, b_90_120, b_over_120""" % ( self._show_buckets_sql_q1(partners, date_end, account_type), self._show_buckets_sql_q2( full_dates["date_end"], full_dates["minus_30"], full_dates["minus_60"], full_dates["minus_90"], full_dates["minus_120"], ), self._show_buckets_sql_q3(company_id), self._show_buckets_sql_q4(), ) ) for row in self.env.cr.dictfetchall(): buckets[row.pop("partner_id")].append(row) return buckets def _get_bucket_labels(self, date_end, aging_type): return getattr( self, "_get_bucket_labels_%s" % aging_type, self._get_bucket_dates_days )(date_end) def _get_bucket_labels_days(self, date_end): return [ _("Current"), _("1 - 30 Days"), _("31 - 60 Days"), _("61 - 90 Days"), _("91 - 120 Days"), _("121 Days +"), _("Total"), ] def _get_bucket_labels_months(self, date_end): return [ _("Current"), _("1 Month"), _("2 Months"), _("3 Months"), _("4 Months"), _("Older"), _("Total"), ] def _get_line_currency_defaults(self, currency_id, currencies, balance_forward): if currency_id not in currencies: # This will only happen if currency is inactive currencies[currency_id] = self.env["res.currency"].browse(currency_id) return ( { "lines": [], "buckets": [], "balance_forward": balance_forward, "amount_due": balance_forward, }, currencies, ) @api.model # flake8: noqa: C901 def _get_report_values(self, docids, data=None): """ @return: returns a dict of parameters to pass to qweb report. the most important pair is {'data': res} which contains all the data for each partner. It is structured like: {partner_id: { 'start': date string, 'end': date_string, 'today': date_string 'currencies': { currency_id: { 'lines': [{'date': date string, ...}, ...], 'balance_forward': float, 'amount_due': float, 'buckets': { 'p1': float, 'p2': ... } } } } """ company_id = data["company_id"] partner_ids = data["partner_ids"] date_start = data.get("date_start") if date_start and isinstance(date_start, str): date_start = datetime.strptime( date_start, DEFAULT_SERVER_DATE_FORMAT ).date() date_end = data["date_end"] if isinstance(date_end, str): date_end = datetime.strptime(date_end, DEFAULT_SERVER_DATE_FORMAT).date() account_type = data["account_type"] aging_type = data["aging_type"] today = fields.Date.today() amount_field = data.get("amount_field", "amount") # There should be relatively few of these, so to speed performance # we cache them - default needed if partner lang not set self._cr.execute( """ SELECT p.id, l.date_format FROM res_partner p LEFT JOIN res_lang l ON p.lang=l.code WHERE p.id IN %(partner_ids)s """, {"partner_ids": tuple(partner_ids)}, ) date_formats = {r[0]: r[1] for r in self._cr.fetchall()} default_fmt = self.env["res.lang"]._lang_get(self.env.user.lang).date_format currencies = {x.id: x for x in self.env["res.currency"].search([])} res = {} # get base data lines = self._get_account_display_lines( company_id, partner_ids, date_start, date_end, account_type ) balances_forward = self._get_account_initial_balance( company_id, partner_ids, date_start, account_type ) if data["show_aging_buckets"]: buckets = self._get_account_show_buckets( company_id, partner_ids, date_end, account_type, aging_type ) bucket_labels = self._get_bucket_labels(date_end, aging_type) else: bucket_labels = {} # organise and format for report format_date = self._format_date_to_partner_lang partners_to_remove = set() for partner_id in partner_ids: res[partner_id] = { "today": format_date(today, date_formats.get(partner_id, default_fmt)), "start": format_date( date_start, date_formats.get(partner_id, default_fmt) ), "end": format_date(date_end, date_formats.get(partner_id, default_fmt)), "currencies": {}, } currency_dict = res[partner_id]["currencies"] for line in balances_forward.get(partner_id, []): ( currency_dict[line["currency_id"]], currencies, ) = self._get_line_currency_defaults( line["currency_id"], currencies, line["balance"] ) for line in lines[partner_id]: if line["currency_id"] not in currency_dict: ( currency_dict[line["currency_id"]], currencies, ) = self._get_line_currency_defaults( line["currency_id"], currencies, 0.0 ) line_currency = currency_dict[line["currency_id"]] if not line["blocked"]: line_currency["amount_due"] += line[amount_field] line["balance"] = line_currency["amount_due"] line["date"] = format_date( line["date"], date_formats.get(partner_id, default_fmt) ) line["date_maturity"] = format_date( line["date_maturity"], date_formats.get(partner_id, default_fmt) ) line_currency["lines"].append(line) if data["show_aging_buckets"]: for line in buckets[partner_id]: if line["currency_id"] not in currency_dict: ( currency_dict[line["currency_id"]], currencies, ) = self._get_line_currency_defaults( line["currency_id"], currencies, 0.0 ) line_currency = currency_dict[line["currency_id"]] line_currency["buckets"] = line if len(partner_ids) > 1: values = currency_dict.values() if not any([v["lines"] or v["balance_forward"] for v in values]): if data["filter_non_due_partners"]: partners_to_remove.add(partner_id) continue else: res[partner_id]["no_entries"] = True if data["filter_negative_balances"]: if not all([v["amount_due"] >= 0.0 for v in values]): partners_to_remove.add(partner_id) for partner in partners_to_remove: del res[partner] partner_ids.remove(partner) return { "doc_ids": partner_ids, "doc_model": "res.partner", "docs": self.env["res.partner"].browse(partner_ids), "data": res, "company": self.env["res.company"].browse(company_id), "Currencies": currencies, "account_type": account_type, "bucket_labels": bucket_labels, "get_inv_addr": self._get_invoice_address, }