brotherton-erpnext/erpnext/accounts/report/cash_flow/custom_cash_flow.py
2022-03-28 18:52:46 +05:30

568 lines
15 KiB
Python

# Copyright (c) 2018, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
import frappe
from frappe import _
from frappe.query_builder.functions import Sum
from frappe.utils import add_to_date, flt, get_date_str
from erpnext.accounts.report.financial_statements import get_columns, get_data, get_period_list
from erpnext.accounts.report.profit_and_loss_statement.profit_and_loss_statement import (
get_net_profit_loss,
)
def get_mapper_for(mappers, position):
mapper_list = list(filter(lambda x: x["position"] == position, mappers))
return mapper_list[0] if mapper_list else []
def get_mappers_from_db():
return frappe.get_all(
"Cash Flow Mapper",
fields=[
"section_name",
"section_header",
"section_leader",
"section_subtotal",
"section_footer",
"name",
"position",
],
order_by="position",
)
def get_accounts_in_mappers(mapping_names):
cfm = frappe.qb.DocType("Cash Flow Mapping")
cfma = frappe.qb.DocType("Cash Flow Mapping Accounts")
result = (
frappe.qb.select(
cfma.name,
cfm.label,
cfm.is_working_capital,
cfm.is_income_tax_liability,
cfm.is_income_tax_expense,
cfm.is_finance_cost,
cfm.is_finance_cost_adjustment,
cfma.account,
)
.from_(cfm)
.join(cfma)
.on(cfm.name == cfma.parent)
.where(cfma.parent.isin(mapping_names))
).run()
return result
def setup_mappers(mappers):
cash_flow_accounts = []
for mapping in mappers:
mapping["account_types"] = []
mapping["tax_liabilities"] = []
mapping["tax_expenses"] = []
mapping["finance_costs"] = []
mapping["finance_costs_adjustments"] = []
doc = frappe.get_doc("Cash Flow Mapper", mapping["name"])
mapping_names = [item.name for item in doc.accounts]
if not mapping_names:
continue
accounts = get_accounts_in_mappers(mapping_names)
account_types = [
dict(
name=account[0],
account_name=account[7],
label=account[1],
is_working_capital=account[2],
is_income_tax_liability=account[3],
is_income_tax_expense=account[4],
)
for account in accounts
if not account[3]
]
finance_costs_adjustments = [
dict(
name=account[0],
account_name=account[7],
label=account[1],
is_finance_cost=account[5],
is_finance_cost_adjustment=account[6],
)
for account in accounts
if account[6]
]
tax_liabilities = [
dict(
name=account[0],
account_name=account[7],
label=account[1],
is_income_tax_liability=account[3],
is_income_tax_expense=account[4],
)
for account in accounts
if account[3]
]
tax_expenses = [
dict(
name=account[0],
account_name=account[7],
label=account[1],
is_income_tax_liability=account[3],
is_income_tax_expense=account[4],
)
for account in accounts
if account[4]
]
finance_costs = [
dict(name=account[0], account_name=account[7], label=account[1], is_finance_cost=account[5])
for account in accounts
if account[5]
]
account_types_labels = sorted(
set(
(d["label"], d["is_working_capital"], d["is_income_tax_liability"], d["is_income_tax_expense"])
for d in account_types
),
key=lambda x: x[1],
)
fc_adjustment_labels = sorted(
set(
[
(d["label"], d["is_finance_cost"], d["is_finance_cost_adjustment"])
for d in finance_costs_adjustments
if d["is_finance_cost_adjustment"]
]
),
key=lambda x: x[2],
)
unique_liability_labels = sorted(
set(
[
(d["label"], d["is_income_tax_liability"], d["is_income_tax_expense"])
for d in tax_liabilities
]
),
key=lambda x: x[0],
)
unique_expense_labels = sorted(
set(
[(d["label"], d["is_income_tax_liability"], d["is_income_tax_expense"]) for d in tax_expenses]
),
key=lambda x: x[0],
)
unique_finance_costs_labels = sorted(
set([(d["label"], d["is_finance_cost"]) for d in finance_costs]), key=lambda x: x[0]
)
for label in account_types_labels:
names = [d["account_name"] for d in account_types if d["label"] == label[0]]
m = dict(label=label[0], names=names, is_working_capital=label[1])
mapping["account_types"].append(m)
for label in fc_adjustment_labels:
names = [d["account_name"] for d in finance_costs_adjustments if d["label"] == label[0]]
m = dict(label=label[0], names=names)
mapping["finance_costs_adjustments"].append(m)
for label in unique_liability_labels:
names = [d["account_name"] for d in tax_liabilities if d["label"] == label[0]]
m = dict(label=label[0], names=names, tax_liability=label[1], tax_expense=label[2])
mapping["tax_liabilities"].append(m)
for label in unique_expense_labels:
names = [d["account_name"] for d in tax_expenses if d["label"] == label[0]]
m = dict(label=label[0], names=names, tax_liability=label[1], tax_expense=label[2])
mapping["tax_expenses"].append(m)
for label in unique_finance_costs_labels:
names = [d["account_name"] for d in finance_costs if d["label"] == label[0]]
m = dict(label=label[0], names=names, is_finance_cost=label[1])
mapping["finance_costs"].append(m)
cash_flow_accounts.append(mapping)
return cash_flow_accounts
def add_data_for_operating_activities(
filters, company_currency, profit_data, period_list, light_mappers, mapper, data
):
has_added_working_capital_header = False
section_data = []
data.append(
{
"account_name": mapper["section_header"],
"parent_account": None,
"indent": 0.0,
"account": mapper["section_header"],
}
)
if profit_data:
profit_data.update(
{"indent": 1, "parent_account": get_mapper_for(light_mappers, position=1)["section_header"]}
)
data.append(profit_data)
section_data.append(profit_data)
data.append(
{
"account_name": mapper["section_leader"],
"parent_account": None,
"indent": 1.0,
"account": mapper["section_leader"],
}
)
for account in mapper["account_types"]:
if account["is_working_capital"] and not has_added_working_capital_header:
data.append(
{
"account_name": "Movement in working capital",
"parent_account": None,
"indent": 1.0,
"account": "",
}
)
has_added_working_capital_header = True
account_data = _get_account_type_based_data(
filters, account["names"], period_list, filters.accumulated_values
)
if not account["is_working_capital"]:
for key in account_data:
if key != "total":
account_data[key] *= -1
if account_data["total"] != 0:
account_data.update(
{
"account_name": account["label"],
"account": account["names"],
"indent": 1.0,
"parent_account": mapper["section_header"],
"currency": company_currency,
}
)
data.append(account_data)
section_data.append(account_data)
_add_total_row_account(
data, section_data, mapper["section_subtotal"], period_list, company_currency, indent=1
)
# calculate adjustment for tax paid and add to data
if not mapper["tax_liabilities"]:
mapper["tax_liabilities"] = [
dict(label="Income tax paid", names=[""], tax_liability=1, tax_expense=0)
]
for account in mapper["tax_liabilities"]:
tax_paid = calculate_adjustment(
filters,
mapper["tax_liabilities"],
mapper["tax_expenses"],
filters.accumulated_values,
period_list,
)
if tax_paid:
tax_paid.update(
{
"parent_account": mapper["section_header"],
"currency": company_currency,
"account_name": account["label"],
"indent": 1.0,
}
)
data.append(tax_paid)
section_data.append(tax_paid)
if not mapper["finance_costs_adjustments"]:
mapper["finance_costs_adjustments"] = [dict(label="Interest Paid", names=[""])]
for account in mapper["finance_costs_adjustments"]:
interest_paid = calculate_adjustment(
filters,
mapper["finance_costs_adjustments"],
mapper["finance_costs"],
filters.accumulated_values,
period_list,
)
if interest_paid:
interest_paid.update(
{
"parent_account": mapper["section_header"],
"currency": company_currency,
"account_name": account["label"],
"indent": 1.0,
}
)
data.append(interest_paid)
section_data.append(interest_paid)
_add_total_row_account(
data, section_data, mapper["section_footer"], period_list, company_currency
)
def calculate_adjustment(
filters, non_expense_mapper, expense_mapper, use_accumulated_values, period_list
):
liability_accounts = [d["names"] for d in non_expense_mapper]
expense_accounts = [d["names"] for d in expense_mapper]
non_expense_closing = _get_account_type_based_data(filters, liability_accounts, period_list, 0)
non_expense_opening = _get_account_type_based_data(
filters, liability_accounts, period_list, use_accumulated_values, opening_balances=1
)
expense_data = _get_account_type_based_data(
filters, expense_accounts, period_list, use_accumulated_values
)
data = _calculate_adjustment(non_expense_closing, non_expense_opening, expense_data)
return data
def _calculate_adjustment(non_expense_closing, non_expense_opening, expense_data):
account_data = {}
for month in non_expense_opening.keys():
if non_expense_opening[month] and non_expense_closing[month]:
account_data[month] = (
non_expense_opening[month] - expense_data[month] + non_expense_closing[month]
)
elif expense_data[month]:
account_data[month] = expense_data[month]
return account_data
def add_data_for_other_activities(
filters, company_currency, profit_data, period_list, light_mappers, mapper_list, data
):
for mapper in mapper_list:
section_data = []
data.append(
{
"account_name": mapper["section_header"],
"parent_account": None,
"indent": 0.0,
"account": mapper["section_header"],
}
)
for account in mapper["account_types"]:
account_data = _get_account_type_based_data(
filters, account["names"], period_list, filters.accumulated_values
)
if account_data["total"] != 0:
account_data.update(
{
"account_name": account["label"],
"account": account["names"],
"indent": 1,
"parent_account": mapper["section_header"],
"currency": company_currency,
}
)
data.append(account_data)
section_data.append(account_data)
_add_total_row_account(
data, section_data, mapper["section_footer"], period_list, company_currency
)
def compute_data(filters, company_currency, profit_data, period_list, light_mappers, full_mapper):
data = []
operating_activities_mapper = get_mapper_for(light_mappers, position=1)
other_mappers = [
get_mapper_for(light_mappers, position=2),
get_mapper_for(light_mappers, position=3),
]
if operating_activities_mapper:
add_data_for_operating_activities(
filters,
company_currency,
profit_data,
period_list,
light_mappers,
operating_activities_mapper,
data,
)
if all(other_mappers):
add_data_for_other_activities(
filters, company_currency, profit_data, period_list, light_mappers, other_mappers, data
)
return data
def execute(filters=None):
if not filters.periodicity:
filters.periodicity = "Monthly"
period_list = get_period_list(
filters.from_fiscal_year,
filters.to_fiscal_year,
filters.period_start_date,
filters.period_end_date,
filters.filter_based_on,
filters.periodicity,
company=filters.company,
)
mappers = get_mappers_from_db()
cash_flow_accounts = setup_mappers(mappers)
# compute net profit / loss
income = get_data(
filters.company,
"Income",
"Credit",
period_list,
filters=filters,
accumulated_values=filters.accumulated_values,
ignore_closing_entries=True,
ignore_accumulated_values_for_fy=True,
)
expense = get_data(
filters.company,
"Expense",
"Debit",
period_list,
filters=filters,
accumulated_values=filters.accumulated_values,
ignore_closing_entries=True,
ignore_accumulated_values_for_fy=True,
)
net_profit_loss = get_net_profit_loss(income, expense, period_list, filters.company)
company_currency = frappe.get_cached_value("Company", filters.company, "default_currency")
data = compute_data(
filters, company_currency, net_profit_loss, period_list, mappers, cash_flow_accounts
)
_add_total_row_account(data, data, _("Net Change in Cash"), period_list, company_currency)
columns = get_columns(
filters.periodicity, period_list, filters.accumulated_values, filters.company
)
return columns, data
def _get_account_type_based_data(
filters, account_names, period_list, accumulated_values, opening_balances=0
):
if not account_names or not account_names[0] or not type(account_names[0]) == str:
# only proceed if account_names is a list of account names
return {}
from erpnext.accounts.report.cash_flow.cash_flow import get_start_date
company = filters.company
data = {}
total = 0
GLEntry = frappe.qb.DocType("GL Entry")
Account = frappe.qb.DocType("Account")
for period in period_list:
start_date = get_start_date(period, accumulated_values, company)
account_subquery = (
frappe.qb.from_(Account)
.where((Account.name.isin(account_names)) | (Account.parent_account.isin(account_names)))
.select(Account.name)
.as_("account_subquery")
)
if opening_balances:
date_info = dict(date=start_date)
months_map = {"Monthly": -1, "Quarterly": -3, "Half-Yearly": -6}
years_map = {"Yearly": -1}
if months_map.get(filters.periodicity):
date_info.update(months=months_map[filters.periodicity])
else:
date_info.update(years=years_map[filters.periodicity])
if accumulated_values:
start, end = add_to_date(start_date, years=-1), add_to_date(period["to_date"], years=-1)
else:
start, end = add_to_date(**date_info), add_to_date(**date_info)
start, end = get_date_str(start), get_date_str(end)
else:
start, end = start_date if accumulated_values else period["from_date"], period["to_date"]
start, end = get_date_str(start), get_date_str(end)
result = (
frappe.qb.from_(GLEntry)
.select(Sum(GLEntry.credit) - Sum(GLEntry.debit))
.where(
(GLEntry.company == company)
& (GLEntry.posting_date >= start)
& (GLEntry.posting_date <= end)
& (GLEntry.voucher_type != "Period Closing Voucher")
& (GLEntry.account.isin(account_subquery))
)
).run()
if result and result[0]:
gl_sum = result[0][0]
else:
gl_sum = 0
total += flt(gl_sum)
data.setdefault(period["key"], flt(gl_sum))
data["total"] = total
return data
def _add_total_row_account(out, data, label, period_list, currency, indent=0.0):
total_row = {
"indent": indent,
"account_name": "'" + _("{0}").format(label) + "'",
"account": "'" + _("{0}").format(label) + "'",
"currency": currency,
}
for row in data:
if row.get("parent_account"):
for period in period_list:
total_row.setdefault(period.key, 0.0)
total_row[period.key] += row.get(period.key, 0.0)
total_row.setdefault("total", 0.0)
total_row["total"] += row["total"]
out.append(total_row)
out.append({})