brotherton-erpnext/erpnext/accounts/report/trial_balance/trial_balance.py
Anand Baburajan 5a3acab110
fix: handle empty FBs properly in TB and GL [develop] (#35190)
fix: handle empty FBs properly in TB and GL
2023-05-08 16:54:00 +05:30

459 lines
12 KiB
Python

# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
import frappe
from frappe import _
from frappe.query_builder.functions import Sum
from frappe.utils import add_days, cstr, flt, formatdate, getdate
import erpnext
from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import (
get_accounting_dimensions,
get_dimension_with_children,
)
from erpnext.accounts.report.financial_statements import (
filter_accounts,
filter_out_zero_value_rows,
set_gl_entries_by_account,
)
value_fields = (
"opening_debit",
"opening_credit",
"debit",
"credit",
"closing_debit",
"closing_credit",
)
def execute(filters=None):
validate_filters(filters)
data = get_data(filters)
columns = get_columns()
return columns, data
def validate_filters(filters):
if not filters.fiscal_year:
frappe.throw(_("Fiscal Year {0} is required").format(filters.fiscal_year))
fiscal_year = frappe.get_cached_value(
"Fiscal Year", filters.fiscal_year, ["year_start_date", "year_end_date"], as_dict=True
)
if not fiscal_year:
frappe.throw(_("Fiscal Year {0} does not exist").format(filters.fiscal_year))
else:
filters.year_start_date = getdate(fiscal_year.year_start_date)
filters.year_end_date = getdate(fiscal_year.year_end_date)
if not filters.from_date:
filters.from_date = filters.year_start_date
if not filters.to_date:
filters.to_date = filters.year_end_date
filters.from_date = getdate(filters.from_date)
filters.to_date = getdate(filters.to_date)
if filters.from_date > filters.to_date:
frappe.throw(_("From Date cannot be greater than To Date"))
if (filters.from_date < filters.year_start_date) or (filters.from_date > filters.year_end_date):
frappe.msgprint(
_("From Date should be within the Fiscal Year. Assuming From Date = {0}").format(
formatdate(filters.year_start_date)
)
)
filters.from_date = filters.year_start_date
if (filters.to_date < filters.year_start_date) or (filters.to_date > filters.year_end_date):
frappe.msgprint(
_("To Date should be within the Fiscal Year. Assuming To Date = {0}").format(
formatdate(filters.year_end_date)
)
)
filters.to_date = filters.year_end_date
def get_data(filters):
accounts = frappe.db.sql(
"""select name, account_number, parent_account, account_name, root_type, report_type, lft, rgt
from `tabAccount` where company=%s order by lft""",
filters.company,
as_dict=True,
)
company_currency = filters.presentation_currency or erpnext.get_company_currency(filters.company)
if not accounts:
return None
accounts, accounts_by_name, parent_children_map = filter_accounts(accounts)
min_lft, max_rgt = frappe.db.sql(
"""select min(lft), max(rgt) from `tabAccount`
where company=%s""",
(filters.company,),
)[0]
gl_entries_by_account = {}
opening_balances = get_opening_balances(filters)
# add filter inside list so that the query in financial_statements.py doesn't break
if filters.project:
filters.project = [filters.project]
set_gl_entries_by_account(
filters.company,
filters.from_date,
filters.to_date,
min_lft,
max_rgt,
filters,
gl_entries_by_account,
ignore_closing_entries=not flt(filters.with_period_closing_entry),
)
calculate_values(accounts, gl_entries_by_account, opening_balances)
accumulate_values_into_parents(accounts, accounts_by_name)
data = prepare_data(accounts, filters, parent_children_map, company_currency)
data = filter_out_zero_value_rows(
data, parent_children_map, show_zero_values=filters.get("show_zero_values")
)
return data
def get_opening_balances(filters):
balance_sheet_opening = get_rootwise_opening_balances(filters, "Balance Sheet")
pl_opening = get_rootwise_opening_balances(filters, "Profit and Loss")
balance_sheet_opening.update(pl_opening)
return balance_sheet_opening
def get_rootwise_opening_balances(filters, report_type):
gle = []
last_period_closing_voucher = frappe.db.get_all(
"Period Closing Voucher",
filters={"docstatus": 1, "company": filters.company, "posting_date": ("<", filters.from_date)},
fields=["posting_date", "name"],
order_by="posting_date desc",
limit=1,
)
accounting_dimensions = get_accounting_dimensions(as_list=False)
if last_period_closing_voucher:
gle = get_opening_balance(
"Account Closing Balance",
filters,
report_type,
accounting_dimensions,
period_closing_voucher=last_period_closing_voucher[0].name,
)
if getdate(last_period_closing_voucher[0].posting_date) < getdate(
add_days(filters.from_date, -1)
):
start_date = add_days(last_period_closing_voucher[0].posting_date, 1)
gle += get_opening_balance(
"GL Entry", filters, report_type, accounting_dimensions, start_date=start_date
)
else:
gle = get_opening_balance("GL Entry", filters, report_type, accounting_dimensions)
opening = frappe._dict()
for d in gle:
opening.setdefault(
d.account,
{
"account": d.account,
"opening_debit": 0.0,
"opening_credit": 0.0,
},
)
opening[d.account]["opening_debit"] += flt(d.opening_debit)
opening[d.account]["opening_credit"] += flt(d.opening_credit)
return opening
def get_opening_balance(
doctype, filters, report_type, accounting_dimensions, period_closing_voucher=None, start_date=None
):
closing_balance = frappe.qb.DocType(doctype)
account = frappe.qb.DocType("Account")
opening_balance = (
frappe.qb.from_(closing_balance)
.select(
closing_balance.account,
Sum(closing_balance.debit).as_("opening_debit"),
Sum(closing_balance.credit).as_("opening_credit"),
)
.where(
(closing_balance.company == filters.company)
& (
closing_balance.account.isin(
frappe.qb.from_(account).select("name").where(account.report_type == report_type)
)
)
)
.groupby(closing_balance.account)
)
if period_closing_voucher:
opening_balance = opening_balance.where(
closing_balance.period_closing_voucher == period_closing_voucher
)
else:
if start_date:
opening_balance = opening_balance.where(closing_balance.posting_date >= start_date)
opening_balance = opening_balance.where(closing_balance.is_opening == "No")
opening_balance = opening_balance.where(closing_balance.posting_date < filters.from_date)
if (
not filters.show_unclosed_fy_pl_balances
and report_type == "Profit and Loss"
and doctype == "GL Entry"
):
opening_balance = opening_balance.where(closing_balance.posting_date >= filters.year_start_date)
if not flt(filters.with_period_closing_entry):
if doctype == "Account Closing Balance":
opening_balance = opening_balance.where(closing_balance.is_period_closing_voucher_entry == 0)
else:
opening_balance = opening_balance.where(
closing_balance.voucher_type != "Period Closing Voucher"
)
if filters.cost_center:
lft, rgt = frappe.db.get_value("Cost Center", filters.cost_center, ["lft", "rgt"])
cost_center = frappe.qb.DocType("Cost Center")
opening_balance = opening_balance.where(
closing_balance.cost_center.in_(
frappe.qb.from_(cost_center)
.select("name")
.where((cost_center.lft >= lft) & (cost_center.rgt <= rgt))
)
)
if filters.project:
opening_balance = opening_balance.where(closing_balance.project == filters.project)
if filters.get("include_default_book_entries"):
company_fb = frappe.get_cached_value("Company", filters.company, "default_finance_book")
if filters.finance_book and company_fb and cstr(filters.finance_book) != cstr(company_fb):
frappe.throw(
_("To use a different finance book, please uncheck 'Include Default Book Entries'")
)
opening_balance = opening_balance.where(
(closing_balance.finance_book.isin([cstr(filters.finance_book), cstr(company_fb), ""]))
| (closing_balance.finance_book.isnull())
)
else:
opening_balance = opening_balance.where(
(closing_balance.finance_book.isin([cstr(filters.finance_book), ""]))
| (closing_balance.finance_book.isnull())
)
if accounting_dimensions:
for dimension in accounting_dimensions:
if filters.get(dimension.fieldname):
if frappe.get_cached_value("DocType", dimension.document_type, "is_tree"):
filters[dimension.fieldname] = get_dimension_with_children(
dimension.document_type, filters.get(dimension.fieldname)
)
opening_balance = opening_balance.where(
closing_balance[dimension.fieldname].isin(filters[dimension.fieldname])
)
else:
opening_balance = opening_balance.where(
closing_balance[dimension.fieldname].isin(filters[dimension.fieldname])
)
gle = opening_balance.run(as_dict=1)
return gle
def calculate_values(accounts, gl_entries_by_account, opening_balances):
init = {
"opening_debit": 0.0,
"opening_credit": 0.0,
"debit": 0.0,
"credit": 0.0,
"closing_debit": 0.0,
"closing_credit": 0.0,
}
for d in accounts:
d.update(init.copy())
# add opening
d["opening_debit"] = opening_balances.get(d.name, {}).get("opening_debit", 0)
d["opening_credit"] = opening_balances.get(d.name, {}).get("opening_credit", 0)
for entry in gl_entries_by_account.get(d.name, []):
if cstr(entry.is_opening) != "Yes":
d["debit"] += flt(entry.debit)
d["credit"] += flt(entry.credit)
d["closing_debit"] = d["opening_debit"] + d["debit"]
d["closing_credit"] = d["opening_credit"] + d["credit"]
prepare_opening_closing(d)
def calculate_total_row(accounts, company_currency):
total_row = {
"account": "'" + _("Total") + "'",
"account_name": "'" + _("Total") + "'",
"warn_if_negative": True,
"opening_debit": 0.0,
"opening_credit": 0.0,
"debit": 0.0,
"credit": 0.0,
"closing_debit": 0.0,
"closing_credit": 0.0,
"parent_account": None,
"indent": 0,
"has_value": True,
"currency": company_currency,
}
for d in accounts:
if not d.parent_account:
for field in value_fields:
total_row[field] += d[field]
return total_row
def accumulate_values_into_parents(accounts, accounts_by_name):
for d in reversed(accounts):
if d.parent_account:
for key in value_fields:
accounts_by_name[d.parent_account][key] += d[key]
def prepare_data(accounts, filters, parent_children_map, company_currency):
data = []
for d in accounts:
# Prepare opening closing for group account
if parent_children_map.get(d.account):
prepare_opening_closing(d)
has_value = False
row = {
"account": d.name,
"parent_account": d.parent_account,
"indent": d.indent,
"from_date": filters.from_date,
"to_date": filters.to_date,
"currency": company_currency,
"account_name": (
"{} - {}".format(d.account_number, d.account_name) if d.account_number else d.account_name
),
}
for key in value_fields:
row[key] = flt(d.get(key, 0.0), 3)
if abs(row[key]) >= 0.005:
# ignore zero values
has_value = True
row["has_value"] = has_value
data.append(row)
total_row = calculate_total_row(accounts, company_currency)
data.extend([{}, total_row])
return data
def get_columns():
return [
{
"fieldname": "account",
"label": _("Account"),
"fieldtype": "Link",
"options": "Account",
"width": 300,
},
{
"fieldname": "currency",
"label": _("Currency"),
"fieldtype": "Link",
"options": "Currency",
"hidden": 1,
},
{
"fieldname": "opening_debit",
"label": _("Opening (Dr)"),
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
{
"fieldname": "opening_credit",
"label": _("Opening (Cr)"),
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
{
"fieldname": "debit",
"label": _("Debit"),
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
{
"fieldname": "credit",
"label": _("Credit"),
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
{
"fieldname": "closing_debit",
"label": _("Closing (Dr)"),
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
{
"fieldname": "closing_credit",
"label": _("Closing (Cr)"),
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
]
def prepare_opening_closing(row):
dr_or_cr = "debit" if row["root_type"] in ["Asset", "Equity", "Expense"] else "credit"
reverse_dr_or_cr = "credit" if dr_or_cr == "debit" else "debit"
for col_type in ["opening", "closing"]:
valid_col = col_type + "_" + dr_or_cr
reverse_col = col_type + "_" + reverse_dr_or_cr
row[valid_col] -= row[reverse_col]
if row[valid_col] < 0:
row[reverse_col] = abs(row[valid_col])
row[valid_col] = 0.0
else:
row[reverse_col] = 0.0