568 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			568 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright (c) 2018, Frappe Technologies Pvt. Ltd. and contributors
 | |
| # For license information, please see license.txt
 | |
| 
 | |
| 
 | |
| import frappe
 | |
| from frappe import _
 | |
| from frappe.query_builder.functions import Sum
 | |
| from frappe.utils import add_to_date, flt, get_date_str
 | |
| 
 | |
| from erpnext.accounts.report.financial_statements import get_columns, get_data, get_period_list
 | |
| from erpnext.accounts.report.profit_and_loss_statement.profit_and_loss_statement import (
 | |
| 	get_net_profit_loss,
 | |
| )
 | |
| 
 | |
| 
 | |
| def get_mapper_for(mappers, position):
 | |
| 	mapper_list = list(filter(lambda x: x["position"] == position, mappers))
 | |
| 	return mapper_list[0] if mapper_list else []
 | |
| 
 | |
| 
 | |
| def get_mappers_from_db():
 | |
| 	return frappe.get_all(
 | |
| 		"Cash Flow Mapper",
 | |
| 		fields=[
 | |
| 			"section_name",
 | |
| 			"section_header",
 | |
| 			"section_leader",
 | |
| 			"section_subtotal",
 | |
| 			"section_footer",
 | |
| 			"name",
 | |
| 			"position",
 | |
| 		],
 | |
| 		order_by="position",
 | |
| 	)
 | |
| 
 | |
| 
 | |
| def get_accounts_in_mappers(mapping_names):
 | |
| 	cfm = frappe.qb.DocType("Cash Flow Mapping")
 | |
| 	cfma = frappe.qb.DocType("Cash Flow Mapping Accounts")
 | |
| 	result = (
 | |
| 		frappe.qb.select(
 | |
| 			cfma.name,
 | |
| 			cfm.label,
 | |
| 			cfm.is_working_capital,
 | |
| 			cfm.is_income_tax_liability,
 | |
| 			cfm.is_income_tax_expense,
 | |
| 			cfm.is_finance_cost,
 | |
| 			cfm.is_finance_cost_adjustment,
 | |
| 			cfma.account,
 | |
| 		)
 | |
| 		.from_(cfm)
 | |
| 		.join(cfma)
 | |
| 		.on(cfm.name == cfma.parent)
 | |
| 		.where(cfma.parent.isin(mapping_names))
 | |
| 	).run()
 | |
| 
 | |
| 	return result
 | |
| 
 | |
| 
 | |
| def setup_mappers(mappers):
 | |
| 	cash_flow_accounts = []
 | |
| 
 | |
| 	for mapping in mappers:
 | |
| 		mapping["account_types"] = []
 | |
| 		mapping["tax_liabilities"] = []
 | |
| 		mapping["tax_expenses"] = []
 | |
| 		mapping["finance_costs"] = []
 | |
| 		mapping["finance_costs_adjustments"] = []
 | |
| 		doc = frappe.get_doc("Cash Flow Mapper", mapping["name"])
 | |
| 		mapping_names = [item.name for item in doc.accounts]
 | |
| 
 | |
| 		if not mapping_names:
 | |
| 			continue
 | |
| 
 | |
| 		accounts = get_accounts_in_mappers(mapping_names)
 | |
| 
 | |
| 		account_types = [
 | |
| 			dict(
 | |
| 				name=account[0],
 | |
| 				account_name=account[7],
 | |
| 				label=account[1],
 | |
| 				is_working_capital=account[2],
 | |
| 				is_income_tax_liability=account[3],
 | |
| 				is_income_tax_expense=account[4],
 | |
| 			)
 | |
| 			for account in accounts
 | |
| 			if not account[3]
 | |
| 		]
 | |
| 
 | |
| 		finance_costs_adjustments = [
 | |
| 			dict(
 | |
| 				name=account[0],
 | |
| 				account_name=account[7],
 | |
| 				label=account[1],
 | |
| 				is_finance_cost=account[5],
 | |
| 				is_finance_cost_adjustment=account[6],
 | |
| 			)
 | |
| 			for account in accounts
 | |
| 			if account[6]
 | |
| 		]
 | |
| 
 | |
| 		tax_liabilities = [
 | |
| 			dict(
 | |
| 				name=account[0],
 | |
| 				account_name=account[7],
 | |
| 				label=account[1],
 | |
| 				is_income_tax_liability=account[3],
 | |
| 				is_income_tax_expense=account[4],
 | |
| 			)
 | |
| 			for account in accounts
 | |
| 			if account[3]
 | |
| 		]
 | |
| 
 | |
| 		tax_expenses = [
 | |
| 			dict(
 | |
| 				name=account[0],
 | |
| 				account_name=account[7],
 | |
| 				label=account[1],
 | |
| 				is_income_tax_liability=account[3],
 | |
| 				is_income_tax_expense=account[4],
 | |
| 			)
 | |
| 			for account in accounts
 | |
| 			if account[4]
 | |
| 		]
 | |
| 
 | |
| 		finance_costs = [
 | |
| 			dict(name=account[0], account_name=account[7], label=account[1], is_finance_cost=account[5])
 | |
| 			for account in accounts
 | |
| 			if account[5]
 | |
| 		]
 | |
| 
 | |
| 		account_types_labels = sorted(
 | |
| 			set(
 | |
| 				(d["label"], d["is_working_capital"], d["is_income_tax_liability"], d["is_income_tax_expense"])
 | |
| 				for d in account_types
 | |
| 			),
 | |
| 			key=lambda x: x[1],
 | |
| 		)
 | |
| 
 | |
| 		fc_adjustment_labels = sorted(
 | |
| 			set(
 | |
| 				[
 | |
| 					(d["label"], d["is_finance_cost"], d["is_finance_cost_adjustment"])
 | |
| 					for d in finance_costs_adjustments
 | |
| 					if d["is_finance_cost_adjustment"]
 | |
| 				]
 | |
| 			),
 | |
| 			key=lambda x: x[2],
 | |
| 		)
 | |
| 
 | |
| 		unique_liability_labels = sorted(
 | |
| 			set(
 | |
| 				[
 | |
| 					(d["label"], d["is_income_tax_liability"], d["is_income_tax_expense"])
 | |
| 					for d in tax_liabilities
 | |
| 				]
 | |
| 			),
 | |
| 			key=lambda x: x[0],
 | |
| 		)
 | |
| 
 | |
| 		unique_expense_labels = sorted(
 | |
| 			set(
 | |
| 				[(d["label"], d["is_income_tax_liability"], d["is_income_tax_expense"]) for d in tax_expenses]
 | |
| 			),
 | |
| 			key=lambda x: x[0],
 | |
| 		)
 | |
| 
 | |
| 		unique_finance_costs_labels = sorted(
 | |
| 			set([(d["label"], d["is_finance_cost"]) for d in finance_costs]), key=lambda x: x[0]
 | |
| 		)
 | |
| 
 | |
| 		for label in account_types_labels:
 | |
| 			names = [d["account_name"] for d in account_types if d["label"] == label[0]]
 | |
| 			m = dict(label=label[0], names=names, is_working_capital=label[1])
 | |
| 			mapping["account_types"].append(m)
 | |
| 
 | |
| 		for label in fc_adjustment_labels:
 | |
| 			names = [d["account_name"] for d in finance_costs_adjustments if d["label"] == label[0]]
 | |
| 			m = dict(label=label[0], names=names)
 | |
| 			mapping["finance_costs_adjustments"].append(m)
 | |
| 
 | |
| 		for label in unique_liability_labels:
 | |
| 			names = [d["account_name"] for d in tax_liabilities if d["label"] == label[0]]
 | |
| 			m = dict(label=label[0], names=names, tax_liability=label[1], tax_expense=label[2])
 | |
| 			mapping["tax_liabilities"].append(m)
 | |
| 
 | |
| 		for label in unique_expense_labels:
 | |
| 			names = [d["account_name"] for d in tax_expenses if d["label"] == label[0]]
 | |
| 			m = dict(label=label[0], names=names, tax_liability=label[1], tax_expense=label[2])
 | |
| 			mapping["tax_expenses"].append(m)
 | |
| 
 | |
| 		for label in unique_finance_costs_labels:
 | |
| 			names = [d["account_name"] for d in finance_costs if d["label"] == label[0]]
 | |
| 			m = dict(label=label[0], names=names, is_finance_cost=label[1])
 | |
| 			mapping["finance_costs"].append(m)
 | |
| 
 | |
| 		cash_flow_accounts.append(mapping)
 | |
| 
 | |
| 	return cash_flow_accounts
 | |
| 
 | |
| 
 | |
| def add_data_for_operating_activities(
 | |
| 	filters, company_currency, profit_data, period_list, light_mappers, mapper, data
 | |
| ):
 | |
| 	has_added_working_capital_header = False
 | |
| 	section_data = []
 | |
| 
 | |
| 	data.append(
 | |
| 		{
 | |
| 			"account_name": mapper["section_header"],
 | |
| 			"parent_account": None,
 | |
| 			"indent": 0.0,
 | |
| 			"account": mapper["section_header"],
 | |
| 		}
 | |
| 	)
 | |
| 
 | |
| 	if profit_data:
 | |
| 		profit_data.update(
 | |
| 			{"indent": 1, "parent_account": get_mapper_for(light_mappers, position=1)["section_header"]}
 | |
| 		)
 | |
| 		data.append(profit_data)
 | |
| 		section_data.append(profit_data)
 | |
| 
 | |
| 		data.append(
 | |
| 			{
 | |
| 				"account_name": mapper["section_leader"],
 | |
| 				"parent_account": None,
 | |
| 				"indent": 1.0,
 | |
| 				"account": mapper["section_leader"],
 | |
| 			}
 | |
| 		)
 | |
| 
 | |
| 	for account in mapper["account_types"]:
 | |
| 		if account["is_working_capital"] and not has_added_working_capital_header:
 | |
| 			data.append(
 | |
| 				{
 | |
| 					"account_name": "Movement in working capital",
 | |
| 					"parent_account": None,
 | |
| 					"indent": 1.0,
 | |
| 					"account": "",
 | |
| 				}
 | |
| 			)
 | |
| 			has_added_working_capital_header = True
 | |
| 
 | |
| 		account_data = _get_account_type_based_data(
 | |
| 			filters, account["names"], period_list, filters.accumulated_values
 | |
| 		)
 | |
| 
 | |
| 		if not account["is_working_capital"]:
 | |
| 			for key in account_data:
 | |
| 				if key != "total":
 | |
| 					account_data[key] *= -1
 | |
| 
 | |
| 		if account_data["total"] != 0:
 | |
| 			account_data.update(
 | |
| 				{
 | |
| 					"account_name": account["label"],
 | |
| 					"account": account["names"],
 | |
| 					"indent": 1.0,
 | |
| 					"parent_account": mapper["section_header"],
 | |
| 					"currency": company_currency,
 | |
| 				}
 | |
| 			)
 | |
| 			data.append(account_data)
 | |
| 			section_data.append(account_data)
 | |
| 
 | |
| 	_add_total_row_account(
 | |
| 		data, section_data, mapper["section_subtotal"], period_list, company_currency, indent=1
 | |
| 	)
 | |
| 
 | |
| 	# calculate adjustment for tax paid and add to data
 | |
| 	if not mapper["tax_liabilities"]:
 | |
| 		mapper["tax_liabilities"] = [
 | |
| 			dict(label="Income tax paid", names=[""], tax_liability=1, tax_expense=0)
 | |
| 		]
 | |
| 
 | |
| 	for account in mapper["tax_liabilities"]:
 | |
| 		tax_paid = calculate_adjustment(
 | |
| 			filters,
 | |
| 			mapper["tax_liabilities"],
 | |
| 			mapper["tax_expenses"],
 | |
| 			filters.accumulated_values,
 | |
| 			period_list,
 | |
| 		)
 | |
| 
 | |
| 		if tax_paid:
 | |
| 			tax_paid.update(
 | |
| 				{
 | |
| 					"parent_account": mapper["section_header"],
 | |
| 					"currency": company_currency,
 | |
| 					"account_name": account["label"],
 | |
| 					"indent": 1.0,
 | |
| 				}
 | |
| 			)
 | |
| 			data.append(tax_paid)
 | |
| 			section_data.append(tax_paid)
 | |
| 
 | |
| 	if not mapper["finance_costs_adjustments"]:
 | |
| 		mapper["finance_costs_adjustments"] = [dict(label="Interest Paid", names=[""])]
 | |
| 
 | |
| 	for account in mapper["finance_costs_adjustments"]:
 | |
| 		interest_paid = calculate_adjustment(
 | |
| 			filters,
 | |
| 			mapper["finance_costs_adjustments"],
 | |
| 			mapper["finance_costs"],
 | |
| 			filters.accumulated_values,
 | |
| 			period_list,
 | |
| 		)
 | |
| 
 | |
| 		if interest_paid:
 | |
| 			interest_paid.update(
 | |
| 				{
 | |
| 					"parent_account": mapper["section_header"],
 | |
| 					"currency": company_currency,
 | |
| 					"account_name": account["label"],
 | |
| 					"indent": 1.0,
 | |
| 				}
 | |
| 			)
 | |
| 			data.append(interest_paid)
 | |
| 			section_data.append(interest_paid)
 | |
| 
 | |
| 	_add_total_row_account(
 | |
| 		data, section_data, mapper["section_footer"], period_list, company_currency
 | |
| 	)
 | |
| 
 | |
| 
 | |
| def calculate_adjustment(
 | |
| 	filters, non_expense_mapper, expense_mapper, use_accumulated_values, period_list
 | |
| ):
 | |
| 	liability_accounts = [d["names"] for d in non_expense_mapper]
 | |
| 	expense_accounts = [d["names"] for d in expense_mapper]
 | |
| 
 | |
| 	non_expense_closing = _get_account_type_based_data(filters, liability_accounts, period_list, 0)
 | |
| 
 | |
| 	non_expense_opening = _get_account_type_based_data(
 | |
| 		filters, liability_accounts, period_list, use_accumulated_values, opening_balances=1
 | |
| 	)
 | |
| 
 | |
| 	expense_data = _get_account_type_based_data(
 | |
| 		filters, expense_accounts, period_list, use_accumulated_values
 | |
| 	)
 | |
| 
 | |
| 	data = _calculate_adjustment(non_expense_closing, non_expense_opening, expense_data)
 | |
| 	return data
 | |
| 
 | |
| 
 | |
| def _calculate_adjustment(non_expense_closing, non_expense_opening, expense_data):
 | |
| 	account_data = {}
 | |
| 	for month in non_expense_opening.keys():
 | |
| 		if non_expense_opening[month] and non_expense_closing[month]:
 | |
| 			account_data[month] = (
 | |
| 				non_expense_opening[month] - expense_data[month] + non_expense_closing[month]
 | |
| 			)
 | |
| 		elif expense_data[month]:
 | |
| 			account_data[month] = expense_data[month]
 | |
| 
 | |
| 	return account_data
 | |
| 
 | |
| 
 | |
| def add_data_for_other_activities(
 | |
| 	filters, company_currency, profit_data, period_list, light_mappers, mapper_list, data
 | |
| ):
 | |
| 	for mapper in mapper_list:
 | |
| 		section_data = []
 | |
| 		data.append(
 | |
| 			{
 | |
| 				"account_name": mapper["section_header"],
 | |
| 				"parent_account": None,
 | |
| 				"indent": 0.0,
 | |
| 				"account": mapper["section_header"],
 | |
| 			}
 | |
| 		)
 | |
| 
 | |
| 		for account in mapper["account_types"]:
 | |
| 			account_data = _get_account_type_based_data(
 | |
| 				filters, account["names"], period_list, filters.accumulated_values
 | |
| 			)
 | |
| 			if account_data["total"] != 0:
 | |
| 				account_data.update(
 | |
| 					{
 | |
| 						"account_name": account["label"],
 | |
| 						"account": account["names"],
 | |
| 						"indent": 1,
 | |
| 						"parent_account": mapper["section_header"],
 | |
| 						"currency": company_currency,
 | |
| 					}
 | |
| 				)
 | |
| 				data.append(account_data)
 | |
| 				section_data.append(account_data)
 | |
| 
 | |
| 		_add_total_row_account(
 | |
| 			data, section_data, mapper["section_footer"], period_list, company_currency
 | |
| 		)
 | |
| 
 | |
| 
 | |
| def compute_data(filters, company_currency, profit_data, period_list, light_mappers, full_mapper):
 | |
| 	data = []
 | |
| 
 | |
| 	operating_activities_mapper = get_mapper_for(light_mappers, position=1)
 | |
| 	other_mappers = [
 | |
| 		get_mapper_for(light_mappers, position=2),
 | |
| 		get_mapper_for(light_mappers, position=3),
 | |
| 	]
 | |
| 
 | |
| 	if operating_activities_mapper:
 | |
| 		add_data_for_operating_activities(
 | |
| 			filters,
 | |
| 			company_currency,
 | |
| 			profit_data,
 | |
| 			period_list,
 | |
| 			light_mappers,
 | |
| 			operating_activities_mapper,
 | |
| 			data,
 | |
| 		)
 | |
| 
 | |
| 	if all(other_mappers):
 | |
| 		add_data_for_other_activities(
 | |
| 			filters, company_currency, profit_data, period_list, light_mappers, other_mappers, data
 | |
| 		)
 | |
| 
 | |
| 	return data
 | |
| 
 | |
| 
 | |
| def execute(filters=None):
 | |
| 	if not filters.periodicity:
 | |
| 		filters.periodicity = "Monthly"
 | |
| 	period_list = get_period_list(
 | |
| 		filters.from_fiscal_year,
 | |
| 		filters.to_fiscal_year,
 | |
| 		filters.period_start_date,
 | |
| 		filters.period_end_date,
 | |
| 		filters.filter_based_on,
 | |
| 		filters.periodicity,
 | |
| 		company=filters.company,
 | |
| 	)
 | |
| 
 | |
| 	mappers = get_mappers_from_db()
 | |
| 
 | |
| 	cash_flow_accounts = setup_mappers(mappers)
 | |
| 
 | |
| 	# compute net profit / loss
 | |
| 	income = get_data(
 | |
| 		filters.company,
 | |
| 		"Income",
 | |
| 		"Credit",
 | |
| 		period_list,
 | |
| 		filters=filters,
 | |
| 		accumulated_values=filters.accumulated_values,
 | |
| 		ignore_closing_entries=True,
 | |
| 		ignore_accumulated_values_for_fy=True,
 | |
| 	)
 | |
| 
 | |
| 	expense = get_data(
 | |
| 		filters.company,
 | |
| 		"Expense",
 | |
| 		"Debit",
 | |
| 		period_list,
 | |
| 		filters=filters,
 | |
| 		accumulated_values=filters.accumulated_values,
 | |
| 		ignore_closing_entries=True,
 | |
| 		ignore_accumulated_values_for_fy=True,
 | |
| 	)
 | |
| 
 | |
| 	net_profit_loss = get_net_profit_loss(income, expense, period_list, filters.company)
 | |
| 
 | |
| 	company_currency = frappe.get_cached_value("Company", filters.company, "default_currency")
 | |
| 
 | |
| 	data = compute_data(
 | |
| 		filters, company_currency, net_profit_loss, period_list, mappers, cash_flow_accounts
 | |
| 	)
 | |
| 
 | |
| 	_add_total_row_account(data, data, _("Net Change in Cash"), period_list, company_currency)
 | |
| 	columns = get_columns(
 | |
| 		filters.periodicity, period_list, filters.accumulated_values, filters.company
 | |
| 	)
 | |
| 
 | |
| 	return columns, data
 | |
| 
 | |
| 
 | |
| def _get_account_type_based_data(
 | |
| 	filters, account_names, period_list, accumulated_values, opening_balances=0
 | |
| ):
 | |
| 	if not account_names or not account_names[0] or not type(account_names[0]) == str:
 | |
| 		# only proceed if account_names is a list of account names
 | |
| 		return {}
 | |
| 
 | |
| 	from erpnext.accounts.report.cash_flow.cash_flow import get_start_date
 | |
| 
 | |
| 	company = filters.company
 | |
| 	data = {}
 | |
| 	total = 0
 | |
| 	GLEntry = frappe.qb.DocType("GL Entry")
 | |
| 	Account = frappe.qb.DocType("Account")
 | |
| 
 | |
| 	for period in period_list:
 | |
| 		start_date = get_start_date(period, accumulated_values, company)
 | |
| 
 | |
| 		account_subquery = (
 | |
| 			frappe.qb.from_(Account)
 | |
| 			.where((Account.name.isin(account_names)) | (Account.parent_account.isin(account_names)))
 | |
| 			.select(Account.name)
 | |
| 			.as_("account_subquery")
 | |
| 		)
 | |
| 
 | |
| 		if opening_balances:
 | |
| 			date_info = dict(date=start_date)
 | |
| 			months_map = {"Monthly": -1, "Quarterly": -3, "Half-Yearly": -6}
 | |
| 			years_map = {"Yearly": -1}
 | |
| 
 | |
| 			if months_map.get(filters.periodicity):
 | |
| 				date_info.update(months=months_map[filters.periodicity])
 | |
| 			else:
 | |
| 				date_info.update(years=years_map[filters.periodicity])
 | |
| 
 | |
| 			if accumulated_values:
 | |
| 				start, end = add_to_date(start_date, years=-1), add_to_date(period["to_date"], years=-1)
 | |
| 			else:
 | |
| 				start, end = add_to_date(**date_info), add_to_date(**date_info)
 | |
| 
 | |
| 			start, end = get_date_str(start), get_date_str(end)
 | |
| 
 | |
| 		else:
 | |
| 			start, end = start_date if accumulated_values else period["from_date"], period["to_date"]
 | |
| 			start, end = get_date_str(start), get_date_str(end)
 | |
| 
 | |
| 		result = (
 | |
| 			frappe.qb.from_(GLEntry)
 | |
| 			.select(Sum(GLEntry.credit) - Sum(GLEntry.debit))
 | |
| 			.where(
 | |
| 				(GLEntry.company == company)
 | |
| 				& (GLEntry.posting_date >= start)
 | |
| 				& (GLEntry.posting_date <= end)
 | |
| 				& (GLEntry.voucher_type != "Period Closing Voucher")
 | |
| 				& (GLEntry.account.isin(account_subquery))
 | |
| 			)
 | |
| 		).run()
 | |
| 
 | |
| 		if result and result[0]:
 | |
| 			gl_sum = result[0][0]
 | |
| 		else:
 | |
| 			gl_sum = 0
 | |
| 
 | |
| 		total += flt(gl_sum)
 | |
| 		data.setdefault(period["key"], flt(gl_sum))
 | |
| 
 | |
| 	data["total"] = total
 | |
| 	return data
 | |
| 
 | |
| 
 | |
| def _add_total_row_account(out, data, label, period_list, currency, indent=0.0):
 | |
| 	total_row = {
 | |
| 		"indent": indent,
 | |
| 		"account_name": "'" + _("{0}").format(label) + "'",
 | |
| 		"account": "'" + _("{0}").format(label) + "'",
 | |
| 		"currency": currency,
 | |
| 	}
 | |
| 	for row in data:
 | |
| 		if row.get("parent_account"):
 | |
| 			for period in period_list:
 | |
| 				total_row.setdefault(period.key, 0.0)
 | |
| 				total_row[period.key] += row.get(period.key, 0.0)
 | |
| 
 | |
| 			total_row.setdefault("total", 0.0)
 | |
| 			total_row["total"] += row["total"]
 | |
| 
 | |
| 	out.append(total_row)
 | |
| 	out.append({})
 |