# Copyright (c) 2018, Frappe Technologies Pvt. Ltd. and contributors # For license information, please see license.txt import frappe from frappe import _ from frappe.query_builder.functions import Sum from frappe.utils import add_to_date, flt, get_date_str from erpnext.accounts.report.financial_statements import get_columns, get_data, get_period_list from erpnext.accounts.report.profit_and_loss_statement.profit_and_loss_statement import ( get_net_profit_loss, ) def get_mapper_for(mappers, position): mapper_list = list(filter(lambda x: x["position"] == position, mappers)) return mapper_list[0] if mapper_list else [] def get_mappers_from_db(): return frappe.get_all( "Cash Flow Mapper", fields=[ "section_name", "section_header", "section_leader", "section_subtotal", "section_footer", "name", "position", ], order_by="position", ) def get_accounts_in_mappers(mapping_names): cfm = frappe.qb.DocType("Cash Flow Mapping") cfma = frappe.qb.DocType("Cash Flow Mapping Accounts") result = ( frappe.qb.select( cfma.name, cfm.label, cfm.is_working_capital, cfm.is_income_tax_liability, cfm.is_income_tax_expense, cfm.is_finance_cost, cfm.is_finance_cost_adjustment, cfma.account, ) .from_(cfm) .join(cfma) .on(cfm.name == cfma.parent) .where(cfma.parent.isin(mapping_names)) ).run() return result def setup_mappers(mappers): cash_flow_accounts = [] for mapping in mappers: mapping["account_types"] = [] mapping["tax_liabilities"] = [] mapping["tax_expenses"] = [] mapping["finance_costs"] = [] mapping["finance_costs_adjustments"] = [] doc = frappe.get_doc("Cash Flow Mapper", mapping["name"]) mapping_names = [item.name for item in doc.accounts] if not mapping_names: continue accounts = get_accounts_in_mappers(mapping_names) account_types = [ dict( name=account[0], account_name=account[7], label=account[1], is_working_capital=account[2], is_income_tax_liability=account[3], is_income_tax_expense=account[4], ) for account in accounts if not account[3] ] finance_costs_adjustments = [ dict( name=account[0], account_name=account[7], label=account[1], is_finance_cost=account[5], is_finance_cost_adjustment=account[6], ) for account in accounts if account[6] ] tax_liabilities = [ dict( name=account[0], account_name=account[7], label=account[1], is_income_tax_liability=account[3], is_income_tax_expense=account[4], ) for account in accounts if account[3] ] tax_expenses = [ dict( name=account[0], account_name=account[7], label=account[1], is_income_tax_liability=account[3], is_income_tax_expense=account[4], ) for account in accounts if account[4] ] finance_costs = [ dict(name=account[0], account_name=account[7], label=account[1], is_finance_cost=account[5]) for account in accounts if account[5] ] account_types_labels = sorted( set( (d["label"], d["is_working_capital"], d["is_income_tax_liability"], d["is_income_tax_expense"]) for d in account_types ), key=lambda x: x[1], ) fc_adjustment_labels = sorted( set( [ (d["label"], d["is_finance_cost"], d["is_finance_cost_adjustment"]) for d in finance_costs_adjustments if d["is_finance_cost_adjustment"] ] ), key=lambda x: x[2], ) unique_liability_labels = sorted( set( [ (d["label"], d["is_income_tax_liability"], d["is_income_tax_expense"]) for d in tax_liabilities ] ), key=lambda x: x[0], ) unique_expense_labels = sorted( set( [(d["label"], d["is_income_tax_liability"], d["is_income_tax_expense"]) for d in tax_expenses] ), key=lambda x: x[0], ) unique_finance_costs_labels = sorted( set([(d["label"], d["is_finance_cost"]) for d in finance_costs]), key=lambda x: x[0] ) for label in account_types_labels: names = [d["account_name"] for d in account_types if d["label"] == label[0]] m = dict(label=label[0], names=names, is_working_capital=label[1]) mapping["account_types"].append(m) for label in fc_adjustment_labels: names = [d["account_name"] for d in finance_costs_adjustments if d["label"] == label[0]] m = dict(label=label[0], names=names) mapping["finance_costs_adjustments"].append(m) for label in unique_liability_labels: names = [d["account_name"] for d in tax_liabilities if d["label"] == label[0]] m = dict(label=label[0], names=names, tax_liability=label[1], tax_expense=label[2]) mapping["tax_liabilities"].append(m) for label in unique_expense_labels: names = [d["account_name"] for d in tax_expenses if d["label"] == label[0]] m = dict(label=label[0], names=names, tax_liability=label[1], tax_expense=label[2]) mapping["tax_expenses"].append(m) for label in unique_finance_costs_labels: names = [d["account_name"] for d in finance_costs if d["label"] == label[0]] m = dict(label=label[0], names=names, is_finance_cost=label[1]) mapping["finance_costs"].append(m) cash_flow_accounts.append(mapping) return cash_flow_accounts def add_data_for_operating_activities( filters, company_currency, profit_data, period_list, light_mappers, mapper, data ): has_added_working_capital_header = False section_data = [] data.append( { "account_name": mapper["section_header"], "parent_account": None, "indent": 0.0, "account": mapper["section_header"], } ) if profit_data: profit_data.update( {"indent": 1, "parent_account": get_mapper_for(light_mappers, position=1)["section_header"]} ) data.append(profit_data) section_data.append(profit_data) data.append( { "account_name": mapper["section_leader"], "parent_account": None, "indent": 1.0, "account": mapper["section_leader"], } ) for account in mapper["account_types"]: if account["is_working_capital"] and not has_added_working_capital_header: data.append( { "account_name": "Movement in working capital", "parent_account": None, "indent": 1.0, "account": "", } ) has_added_working_capital_header = True account_data = _get_account_type_based_data( filters, account["names"], period_list, filters.accumulated_values ) if not account["is_working_capital"]: for key in account_data: if key != "total": account_data[key] *= -1 if account_data["total"] != 0: account_data.update( { "account_name": account["label"], "account": account["names"], "indent": 1.0, "parent_account": mapper["section_header"], "currency": company_currency, } ) data.append(account_data) section_data.append(account_data) _add_total_row_account( data, section_data, mapper["section_subtotal"], period_list, company_currency, indent=1 ) # calculate adjustment for tax paid and add to data if not mapper["tax_liabilities"]: mapper["tax_liabilities"] = [ dict(label="Income tax paid", names=[""], tax_liability=1, tax_expense=0) ] for account in mapper["tax_liabilities"]: tax_paid = calculate_adjustment( filters, mapper["tax_liabilities"], mapper["tax_expenses"], filters.accumulated_values, period_list, ) if tax_paid: tax_paid.update( { "parent_account": mapper["section_header"], "currency": company_currency, "account_name": account["label"], "indent": 1.0, } ) data.append(tax_paid) section_data.append(tax_paid) if not mapper["finance_costs_adjustments"]: mapper["finance_costs_adjustments"] = [dict(label="Interest Paid", names=[""])] for account in mapper["finance_costs_adjustments"]: interest_paid = calculate_adjustment( filters, mapper["finance_costs_adjustments"], mapper["finance_costs"], filters.accumulated_values, period_list, ) if interest_paid: interest_paid.update( { "parent_account": mapper["section_header"], "currency": company_currency, "account_name": account["label"], "indent": 1.0, } ) data.append(interest_paid) section_data.append(interest_paid) _add_total_row_account( data, section_data, mapper["section_footer"], period_list, company_currency ) def calculate_adjustment( filters, non_expense_mapper, expense_mapper, use_accumulated_values, period_list ): liability_accounts = [d["names"] for d in non_expense_mapper] expense_accounts = [d["names"] for d in expense_mapper] non_expense_closing = _get_account_type_based_data(filters, liability_accounts, period_list, 0) non_expense_opening = _get_account_type_based_data( filters, liability_accounts, period_list, use_accumulated_values, opening_balances=1 ) expense_data = _get_account_type_based_data( filters, expense_accounts, period_list, use_accumulated_values ) data = _calculate_adjustment(non_expense_closing, non_expense_opening, expense_data) return data def _calculate_adjustment(non_expense_closing, non_expense_opening, expense_data): account_data = {} for month in non_expense_opening.keys(): if non_expense_opening[month] and non_expense_closing[month]: account_data[month] = ( non_expense_opening[month] - expense_data[month] + non_expense_closing[month] ) elif expense_data[month]: account_data[month] = expense_data[month] return account_data def add_data_for_other_activities( filters, company_currency, profit_data, period_list, light_mappers, mapper_list, data ): for mapper in mapper_list: section_data = [] data.append( { "account_name": mapper["section_header"], "parent_account": None, "indent": 0.0, "account": mapper["section_header"], } ) for account in mapper["account_types"]: account_data = _get_account_type_based_data( filters, account["names"], period_list, filters.accumulated_values ) if account_data["total"] != 0: account_data.update( { "account_name": account["label"], "account": account["names"], "indent": 1, "parent_account": mapper["section_header"], "currency": company_currency, } ) data.append(account_data) section_data.append(account_data) _add_total_row_account( data, section_data, mapper["section_footer"], period_list, company_currency ) def compute_data(filters, company_currency, profit_data, period_list, light_mappers, full_mapper): data = [] operating_activities_mapper = get_mapper_for(light_mappers, position=1) other_mappers = [ get_mapper_for(light_mappers, position=2), get_mapper_for(light_mappers, position=3), ] if operating_activities_mapper: add_data_for_operating_activities( filters, company_currency, profit_data, period_list, light_mappers, operating_activities_mapper, data, ) if all(other_mappers): add_data_for_other_activities( filters, company_currency, profit_data, period_list, light_mappers, other_mappers, data ) return data def execute(filters=None): if not filters.periodicity: filters.periodicity = "Monthly" period_list = get_period_list( filters.from_fiscal_year, filters.to_fiscal_year, filters.period_start_date, filters.period_end_date, filters.filter_based_on, filters.periodicity, company=filters.company, ) mappers = get_mappers_from_db() cash_flow_accounts = setup_mappers(mappers) # compute net profit / loss income = get_data( filters.company, "Income", "Credit", period_list, filters=filters, accumulated_values=filters.accumulated_values, ignore_closing_entries=True, ignore_accumulated_values_for_fy=True, ) expense = get_data( filters.company, "Expense", "Debit", period_list, filters=filters, accumulated_values=filters.accumulated_values, ignore_closing_entries=True, ignore_accumulated_values_for_fy=True, ) net_profit_loss = get_net_profit_loss(income, expense, period_list, filters.company) company_currency = frappe.get_cached_value("Company", filters.company, "default_currency") data = compute_data( filters, company_currency, net_profit_loss, period_list, mappers, cash_flow_accounts ) _add_total_row_account(data, data, _("Net Change in Cash"), period_list, company_currency) columns = get_columns( filters.periodicity, period_list, filters.accumulated_values, filters.company ) return columns, data def _get_account_type_based_data( filters, account_names, period_list, accumulated_values, opening_balances=0 ): if not account_names or not account_names[0] or not type(account_names[0]) == str: # only proceed if account_names is a list of account names return {} from erpnext.accounts.report.cash_flow.cash_flow import get_start_date company = filters.company data = {} total = 0 GLEntry = frappe.qb.DocType("GL Entry") Account = frappe.qb.DocType("Account") for period in period_list: start_date = get_start_date(period, accumulated_values, company) account_subquery = ( frappe.qb.from_(Account) .where((Account.name.isin(account_names)) | (Account.parent_account.isin(account_names))) .select(Account.name) .as_("account_subquery") ) if opening_balances: date_info = dict(date=start_date) months_map = {"Monthly": -1, "Quarterly": -3, "Half-Yearly": -6} years_map = {"Yearly": -1} if months_map.get(filters.periodicity): date_info.update(months=months_map[filters.periodicity]) else: date_info.update(years=years_map[filters.periodicity]) if accumulated_values: start, end = add_to_date(start_date, years=-1), add_to_date(period["to_date"], years=-1) else: start, end = add_to_date(**date_info), add_to_date(**date_info) start, end = get_date_str(start), get_date_str(end) else: start, end = start_date if accumulated_values else period["from_date"], period["to_date"] start, end = get_date_str(start), get_date_str(end) result = ( frappe.qb.from_(GLEntry) .select(Sum(GLEntry.credit) - Sum(GLEntry.debit)) .where( (GLEntry.company == company) & (GLEntry.posting_date >= start) & (GLEntry.posting_date <= end) & (GLEntry.voucher_type != "Period Closing Voucher") & (GLEntry.account.isin(account_subquery)) ) ).run() if result and result[0]: gl_sum = result[0][0] else: gl_sum = 0 total += flt(gl_sum) data.setdefault(period["key"], flt(gl_sum)) data["total"] = total return data def _add_total_row_account(out, data, label, period_list, currency, indent=0.0): total_row = { "indent": indent, "account_name": "'" + _("{0}").format(label) + "'", "account": "'" + _("{0}").format(label) + "'", "currency": currency, } for row in data: if row.get("parent_account"): for period in period_list: total_row.setdefault(period.key, 0.0) total_row[period.key] += row.get(period.key, 0.0) total_row.setdefault("total", 0.0) total_row["total"] += row["total"] out.append(total_row) out.append({})