# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors # License: GNU General Public License v3. See license.txt import frappe from frappe import _ from frappe.query_builder.functions import Sum from frappe.utils import add_days, cstr, flt, formatdate, getdate import erpnext from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import ( get_accounting_dimensions, get_dimension_with_children, ) from erpnext.accounts.report.financial_statements import ( filter_accounts, filter_out_zero_value_rows, set_gl_entries_by_account, ) value_fields = ( "opening_debit", "opening_credit", "debit", "credit", "closing_debit", "closing_credit", ) def execute(filters=None): validate_filters(filters) data = get_data(filters) columns = get_columns() return columns, data def validate_filters(filters): if not filters.fiscal_year: frappe.throw(_("Fiscal Year {0} is required").format(filters.fiscal_year)) fiscal_year = frappe.get_cached_value( "Fiscal Year", filters.fiscal_year, ["year_start_date", "year_end_date"], as_dict=True ) if not fiscal_year: frappe.throw(_("Fiscal Year {0} does not exist").format(filters.fiscal_year)) else: filters.year_start_date = getdate(fiscal_year.year_start_date) filters.year_end_date = getdate(fiscal_year.year_end_date) if not filters.from_date: filters.from_date = filters.year_start_date if not filters.to_date: filters.to_date = filters.year_end_date filters.from_date = getdate(filters.from_date) filters.to_date = getdate(filters.to_date) if filters.from_date > filters.to_date: frappe.throw(_("From Date cannot be greater than To Date")) if (filters.from_date < filters.year_start_date) or (filters.from_date > filters.year_end_date): frappe.msgprint( _("From Date should be within the Fiscal Year. Assuming From Date = {0}").format( formatdate(filters.year_start_date) ) ) filters.from_date = filters.year_start_date if (filters.to_date < filters.year_start_date) or (filters.to_date > filters.year_end_date): frappe.msgprint( _("To Date should be within the Fiscal Year. Assuming To Date = {0}").format( formatdate(filters.year_end_date) ) ) filters.to_date = filters.year_end_date def get_data(filters): accounts = frappe.db.sql( """select name, account_number, parent_account, account_name, root_type, report_type, lft, rgt from `tabAccount` where company=%s order by lft""", filters.company, as_dict=True, ) company_currency = filters.presentation_currency or erpnext.get_company_currency(filters.company) if not accounts: return None accounts, accounts_by_name, parent_children_map = filter_accounts(accounts) min_lft, max_rgt = frappe.db.sql( """select min(lft), max(rgt) from `tabAccount` where company=%s""", (filters.company,), )[0] gl_entries_by_account = {} opening_balances = get_opening_balances(filters) # add filter inside list so that the query in financial_statements.py doesn't break if filters.project: filters.project = [filters.project] set_gl_entries_by_account( filters.company, filters.from_date, filters.to_date, min_lft, max_rgt, filters, gl_entries_by_account, ignore_closing_entries=not flt(filters.with_period_closing_entry), ) calculate_values(accounts, gl_entries_by_account, opening_balances) accumulate_values_into_parents(accounts, accounts_by_name) data = prepare_data(accounts, filters, parent_children_map, company_currency) data = filter_out_zero_value_rows( data, parent_children_map, show_zero_values=filters.get("show_zero_values") ) return data def get_opening_balances(filters): balance_sheet_opening = get_rootwise_opening_balances(filters, "Balance Sheet") pl_opening = get_rootwise_opening_balances(filters, "Profit and Loss") balance_sheet_opening.update(pl_opening) return balance_sheet_opening def get_rootwise_opening_balances(filters, report_type): gle = [] last_period_closing_voucher = frappe.db.get_all( "Period Closing Voucher", filters={"docstatus": 1, "company": filters.company, "posting_date": ("<", filters.from_date)}, fields=["posting_date", "name"], order_by="posting_date desc", limit=1, ) accounting_dimensions = get_accounting_dimensions(as_list=False) if last_period_closing_voucher: gle = get_opening_balance( "Account Closing Balance", filters, report_type, accounting_dimensions, period_closing_voucher=last_period_closing_voucher[0].name, ) if getdate(last_period_closing_voucher[0].posting_date) < getdate( add_days(filters.from_date, -1) ): start_date = add_days(last_period_closing_voucher[0].posting_date, 1) gle += get_opening_balance( "GL Entry", filters, report_type, accounting_dimensions, start_date=start_date ) else: gle = get_opening_balance("GL Entry", filters, report_type, accounting_dimensions) opening = frappe._dict() for d in gle: opening.setdefault( d.account, { "account": d.account, "opening_debit": 0.0, "opening_credit": 0.0, }, ) opening[d.account]["opening_debit"] += flt(d.opening_debit) opening[d.account]["opening_credit"] += flt(d.opening_credit) return opening def get_opening_balance( doctype, filters, report_type, accounting_dimensions, period_closing_voucher=None, start_date=None ): closing_balance = frappe.qb.DocType(doctype) account = frappe.qb.DocType("Account") opening_balance = ( frappe.qb.from_(closing_balance) .select( closing_balance.account, Sum(closing_balance.debit).as_("opening_debit"), Sum(closing_balance.credit).as_("opening_credit"), ) .where( (closing_balance.company == filters.company) & ( closing_balance.account.isin( frappe.qb.from_(account).select("name").where(account.report_type == report_type) ) ) ) .groupby(closing_balance.account) ) if period_closing_voucher: opening_balance = opening_balance.where( closing_balance.period_closing_voucher == period_closing_voucher ) else: if start_date: opening_balance = opening_balance.where(closing_balance.posting_date >= start_date) opening_balance = opening_balance.where(closing_balance.is_opening == "No") opening_balance = opening_balance.where(closing_balance.posting_date < filters.from_date) if ( not filters.show_unclosed_fy_pl_balances and report_type == "Profit and Loss" and doctype == "GL Entry" ): opening_balance = opening_balance.where(closing_balance.posting_date >= filters.year_start_date) if not flt(filters.with_period_closing_entry): if doctype == "Account Closing Balance": opening_balance = opening_balance.where(closing_balance.is_period_closing_voucher_entry == 0) else: opening_balance = opening_balance.where( closing_balance.voucher_type != "Period Closing Voucher" ) if filters.cost_center: lft, rgt = frappe.db.get_value("Cost Center", filters.cost_center, ["lft", "rgt"]) cost_center = frappe.qb.DocType("Cost Center") opening_balance = opening_balance.where( closing_balance.cost_center.in_( frappe.qb.from_(cost_center) .select("name") .where((cost_center.lft >= lft) & (cost_center.rgt <= rgt)) ) ) if filters.project: opening_balance = opening_balance.where(closing_balance.project == filters.project) if filters.get("include_default_book_entries"): company_fb = frappe.get_cached_value("Company", filters.company, "default_finance_book") if filters.finance_book and company_fb and cstr(filters.finance_book) != cstr(company_fb): frappe.throw( _("To use a different finance book, please uncheck 'Include Default Book Entries'") ) opening_balance = opening_balance.where( (closing_balance.finance_book.isin([cstr(filters.finance_book), cstr(company_fb)])) | (closing_balance.finance_book.isnull()) ) else: opening_balance = opening_balance.where( (closing_balance.finance_book.isin([cstr(filters.finance_book)])) | (closing_balance.finance_book.isnull()) ) if accounting_dimensions: for dimension in accounting_dimensions: if filters.get(dimension.fieldname): if frappe.get_cached_value("DocType", dimension.document_type, "is_tree"): filters[dimension.fieldname] = get_dimension_with_children( dimension.document_type, filters.get(dimension.fieldname) ) opening_balance = opening_balance.where( closing_balance[dimension.fieldname].isin(filters[dimension.fieldname]) ) else: opening_balance = opening_balance.where( closing_balance[dimension.fieldname].isin(filters[dimension.fieldname]) ) gle = opening_balance.run(as_dict=1) return gle def calculate_values(accounts, gl_entries_by_account, opening_balances): init = { "opening_debit": 0.0, "opening_credit": 0.0, "debit": 0.0, "credit": 0.0, "closing_debit": 0.0, "closing_credit": 0.0, } for d in accounts: d.update(init.copy()) # add opening d["opening_debit"] = opening_balances.get(d.name, {}).get("opening_debit", 0) d["opening_credit"] = opening_balances.get(d.name, {}).get("opening_credit", 0) for entry in gl_entries_by_account.get(d.name, []): if cstr(entry.is_opening) != "Yes": d["debit"] += flt(entry.debit) d["credit"] += flt(entry.credit) d["closing_debit"] = d["opening_debit"] + d["debit"] d["closing_credit"] = d["opening_credit"] + d["credit"] prepare_opening_closing(d) def calculate_total_row(accounts, company_currency): total_row = { "account": "'" + _("Total") + "'", "account_name": "'" + _("Total") + "'", "warn_if_negative": True, "opening_debit": 0.0, "opening_credit": 0.0, "debit": 0.0, "credit": 0.0, "closing_debit": 0.0, "closing_credit": 0.0, "parent_account": None, "indent": 0, "has_value": True, "currency": company_currency, } for d in accounts: if not d.parent_account: for field in value_fields: total_row[field] += d[field] return total_row def accumulate_values_into_parents(accounts, accounts_by_name): for d in reversed(accounts): if d.parent_account: for key in value_fields: accounts_by_name[d.parent_account][key] += d[key] def prepare_data(accounts, filters, parent_children_map, company_currency): data = [] for d in accounts: # Prepare opening closing for group account if parent_children_map.get(d.account): prepare_opening_closing(d) has_value = False row = { "account": d.name, "parent_account": d.parent_account, "indent": d.indent, "from_date": filters.from_date, "to_date": filters.to_date, "currency": company_currency, "account_name": ( "{} - {}".format(d.account_number, d.account_name) if d.account_number else d.account_name ), } for key in value_fields: row[key] = flt(d.get(key, 0.0), 3) if abs(row[key]) >= 0.005: # ignore zero values has_value = True row["has_value"] = has_value data.append(row) total_row = calculate_total_row(accounts, company_currency) data.extend([{}, total_row]) return data def get_columns(): return [ { "fieldname": "account", "label": _("Account"), "fieldtype": "Link", "options": "Account", "width": 300, }, { "fieldname": "currency", "label": _("Currency"), "fieldtype": "Link", "options": "Currency", "hidden": 1, }, { "fieldname": "opening_debit", "label": _("Opening (Dr)"), "fieldtype": "Currency", "options": "currency", "width": 120, }, { "fieldname": "opening_credit", "label": _("Opening (Cr)"), "fieldtype": "Currency", "options": "currency", "width": 120, }, { "fieldname": "debit", "label": _("Debit"), "fieldtype": "Currency", "options": "currency", "width": 120, }, { "fieldname": "credit", "label": _("Credit"), "fieldtype": "Currency", "options": "currency", "width": 120, }, { "fieldname": "closing_debit", "label": _("Closing (Dr)"), "fieldtype": "Currency", "options": "currency", "width": 120, }, { "fieldname": "closing_credit", "label": _("Closing (Cr)"), "fieldtype": "Currency", "options": "currency", "width": 120, }, ] def prepare_opening_closing(row): dr_or_cr = "debit" if row["root_type"] in ["Asset", "Equity", "Expense"] else "credit" reverse_dr_or_cr = "credit" if dr_or_cr == "debit" else "debit" for col_type in ["opening", "closing"]: valid_col = col_type + "_" + dr_or_cr reverse_col = col_type + "_" + reverse_dr_or_cr row[valid_col] -= row[reverse_col] if row[valid_col] < 0: row[reverse_col] = abs(row[valid_col]) row[valid_col] = 0.0 else: row[reverse_col] = 0.0