brotherton-erpnext/erpnext/accounts/report/utils.py

387 lines
12 KiB
Python
Raw Normal View History

import frappe
from frappe.query_builder.custom import ConstantColumn
from frappe.query_builder.functions import Sum
from frappe.utils import flt, formatdate, get_datetime_str, get_table_name
from pypika import Order
from erpnext import get_company_currency, get_default_company
from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import (
get_accounting_dimensions,
get_dimension_with_children,
)
from erpnext.accounts.doctype.fiscal_year.fiscal_year import get_from_and_to_date
from erpnext.accounts.party import get_party_account
from erpnext.setup.utils import get_exchange_rate
__exchange_rates = {}
2022-03-28 13:22:46 +00:00
def get_currency(filters):
"""
Returns a dictionary containing currency information. The keys of the dict are
- company: The company for which we are fetching currency information. if no
company is specified, it will fallback to the default company.
- company currency: The functional currency of the said company.
- presentation currency: The presentation currency to use. Only currencies that
have been used for transactions will be allowed.
- report date: The report date.
:param filters: Report filters
:type filters: dict
:return: str - Currency
"""
company = get_appropriate_company(filters)
company_currency = get_company_currency(company)
2022-03-28 13:22:46 +00:00
presentation_currency = (
filters["presentation_currency"] if filters.get("presentation_currency") else company_currency
)
report_date = filters.get("to_date") or filters.get("period_end_date")
if not report_date:
2022-03-28 13:22:46 +00:00
fiscal_year_to_date = get_from_and_to_date(filters.get("to_fiscal_year"))["to_date"]
report_date = formatdate(get_datetime_str(fiscal_year_to_date), "dd-MM-yyyy")
2022-03-28 13:22:46 +00:00
currency_map = dict(
company=company,
company_currency=company_currency,
presentation_currency=presentation_currency,
report_date=report_date,
)
return currency_map
def convert(value, from_, to, date):
"""
convert `value` from `from_` to `to` on `date`
:param value: Amount to be converted
:param from_: Currency of `value`
:param to: Currency to convert to
:param date: exchange rate as at this date
:return: Result of converting `value`
"""
rate = get_rate_as_at(date, from_, to)
converted_value = flt(value) / (rate or 1)
return converted_value
def get_rate_as_at(date, from_currency, to_currency):
"""
Gets exchange rate as at `date` for `from_currency` - `to_currency` exchange rate.
This calls `get_exchange_rate` so that we can get the correct exchange rate as per
the user's Accounts Settings.
It is made efficient by memoising results to `__exchange_rates`
:param date: exchange rate as at this date
:param from_currency: Base currency
:param to_currency: Quote currency
:return: Retrieved exchange rate
"""
2022-03-28 13:22:46 +00:00
rate = __exchange_rates.get("{0}-{1}@{2}".format(from_currency, to_currency, date))
if not rate:
rate = get_exchange_rate(from_currency, to_currency, date) or 1
2022-03-28 13:22:46 +00:00
__exchange_rates["{0}-{1}@{2}".format(from_currency, to_currency, date)] = rate
return rate
2022-03-28 13:22:46 +00:00
def convert_to_presentation_currency(gl_entries, currency_info):
"""
Take a list of GL Entries and change the 'debit' and 'credit' values to currencies
in `currency_info`.
:param gl_entries:
:param currency_info:
:return:
"""
converted_gl_list = []
2022-03-28 13:22:46 +00:00
presentation_currency = currency_info["presentation_currency"]
company_currency = currency_info["company_currency"]
2022-03-28 13:22:46 +00:00
account_currencies = list(set(entry["account_currency"] for entry in gl_entries))
for entry in gl_entries:
2022-03-28 13:22:46 +00:00
debit = flt(entry["debit"])
credit = flt(entry["credit"])
debit_in_account_currency = flt(entry["debit_in_account_currency"])
credit_in_account_currency = flt(entry["credit_in_account_currency"])
account_currency = entry["account_currency"]
if len(account_currencies) == 1 and account_currency == presentation_currency:
entry["debit"] = debit_in_account_currency
entry["credit"] = credit_in_account_currency
else:
2022-03-28 13:22:46 +00:00
date = currency_info["report_date"]
converted_debit_value = convert(debit, presentation_currency, company_currency, date)
converted_credit_value = convert(credit, presentation_currency, company_currency, date)
2022-03-28 13:22:46 +00:00
if entry.get("debit"):
entry["debit"] = converted_debit_value
2022-03-28 13:22:46 +00:00
if entry.get("credit"):
entry["credit"] = converted_credit_value
converted_gl_list.append(entry)
return converted_gl_list
def get_appropriate_company(filters):
2022-03-28 13:22:46 +00:00
if filters.get("company"):
company = filters["company"]
else:
company = get_default_company()
return company
2022-03-28 13:22:46 +00:00
@frappe.whitelist()
2022-03-28 13:22:46 +00:00
def get_invoiced_item_gross_margin(
sales_invoice=None, item_code=None, company=None, with_item_data=False
):
from erpnext.accounts.report.gross_profit.gross_profit import GrossProfitGenerator
2022-03-28 13:22:46 +00:00
sales_invoice = sales_invoice or frappe.form_dict.get("sales_invoice")
item_code = item_code or frappe.form_dict.get("item_code")
company = company or frappe.get_cached_value("Sales Invoice", sales_invoice, "company")
filters = {
2022-03-28 13:22:46 +00:00
"sales_invoice": sales_invoice,
"item_code": item_code,
"company": company,
"group_by": "Invoice",
}
gross_profit_data = GrossProfitGenerator(filters)
result = gross_profit_data.grouped_data
if not with_item_data:
result = sum(d.gross_profit for d in result)
return result
def get_query_columns(report_columns):
if not report_columns:
return ""
columns = []
for column in report_columns:
fieldname = column["fieldname"]
if doctype := column.get("_doctype"):
columns.append(f"`{get_table_name(doctype)}`.`{fieldname}`")
else:
columns.append(fieldname)
return columns
def get_values_for_columns(report_columns, report_row):
values = {}
if not report_columns:
return values
for column in report_columns:
fieldname = column["fieldname"]
values[fieldname] = report_row.get(fieldname)
return values
def get_party_details(party_type, party_list):
party_details = {}
party = frappe.qb.DocType(party_type)
query = frappe.qb.from_(party).select(party.name, party.tax_id).where(party.name.isin(party_list))
if party_type == "Supplier":
query = query.select(party.supplier_group)
else:
query = query.select(party.customer_group, party.territory)
party_detail_list = query.run(as_dict=True)
for party_dict in party_detail_list:
party_details[party_dict.name] = party_dict
return party_details
def get_taxes_query(invoice_list, doctype, parenttype):
taxes = frappe.qb.DocType(doctype)
query = (
frappe.qb.from_(taxes)
.select(taxes.account_head)
.distinct()
.where(
(taxes.parenttype == parenttype)
& (taxes.docstatus == 1)
& (taxes.account_head.isnotnull())
& (taxes.parent.isin([inv.name for inv in invoice_list]))
)
.orderby(taxes.account_head)
)
if doctype == "Purchase Taxes and Charges":
return query.where(taxes.category.isin(["Total", "Valuation and Total"]))
elif doctype == "Sales Taxes and Charges":
return query
return query.where(taxes.charge_type.isin(["On Paid Amount", "Actual"]))
2023-07-11 09:17:23 +00:00
def get_journal_entries(filters, args):
je = frappe.qb.DocType("Journal Entry")
journal_account = frappe.qb.DocType("Journal Entry Account")
query = (
frappe.qb.from_(je)
.inner_join(journal_account)
.on(je.name == journal_account.parent)
.select(
je.voucher_type.as_("doctype"),
je.name,
je.posting_date,
journal_account.account.as_(args.account),
journal_account.party.as_(args.party),
journal_account.party.as_(args.party_name),
je.bill_no,
je.bill_date,
je.remark.as_("remarks"),
je.total_amount.as_("base_net_total"),
je.total_amount.as_("base_grand_total"),
je.mode_of_payment,
journal_account.project,
)
.where(
(je.voucher_type == "Journal Entry")
& (journal_account.party == filters.get(args.party))
& (journal_account.account.isin(args.party_account))
)
.orderby(je.posting_date, je.name, order=Order.desc)
2023-07-11 09:17:23 +00:00
)
query = get_conditions(filters, query, doctype="Journal Entry", payments=True)
journal_entries = query.run(as_dict=True)
return journal_entries
2023-07-11 09:17:23 +00:00
def get_payment_entries(filters, args):
pe = frappe.qb.DocType("Payment Entry")
query = (
frappe.qb.from_(pe)
.select(
ConstantColumn("Payment Entry").as_("doctype"),
pe.name,
pe.posting_date,
2023-07-27 04:38:26 +00:00
pe[args.account_fieldname].as_(args.account),
pe.party.as_(args.party),
pe.party_name.as_(args.party_name),
pe.remarks,
pe.paid_amount.as_("base_net_total"),
pe.paid_amount_after_tax.as_("base_grand_total"),
pe.mode_of_payment,
pe.project,
pe.cost_center,
)
2023-07-27 04:38:26 +00:00
.where(
(pe.party == filters.get(args.party)) & (pe[args.account_fieldname].isin(args.party_account))
)
.orderby(pe.posting_date, pe.name, order=Order.desc)
2023-07-11 09:17:23 +00:00
)
query = get_conditions(filters, query, doctype="Payment Entry", payments=True)
payment_entries = query.run(as_dict=True)
return payment_entries
def get_conditions(filters, query, doctype, child_doctype=None, payments=False):
parent_doc = frappe.qb.DocType(doctype)
if child_doctype:
child_doc = frappe.qb.DocType(child_doctype)
if parent_doc.get_table_name() == "tabSales Invoice":
if filters.get("owner"):
query = query.where(parent_doc.owner == filters.owner)
if filters.get("mode_of_payment"):
payment_doc = frappe.qb.DocType("Sales Invoice Payment")
query = query.where(payment_doc.mode_of_payment == filters.mode_of_payment)
if not payments:
if filters.get("brand"):
query = query.where(child_doc.brand == filters.brand)
else:
if filters.get("mode_of_payment"):
query = query.where(parent_doc.mode_of_payment == filters.mode_of_payment)
if filters.get("company"):
query = query.where(parent_doc.company == filters.company)
if filters.get("from_date"):
query = query.where(parent_doc.posting_date >= filters.from_date)
if filters.get("to_date"):
query = query.where(parent_doc.posting_date <= filters.to_date)
if payments:
if filters.get("cost_center"):
query = query.where(parent_doc.cost_center == filters.cost_center)
else:
if filters.get("cost_center"):
query = query.where(child_doc.cost_center == filters.cost_center)
if filters.get("warehouse"):
query = query.where(child_doc.warehouse == filters.warehouse)
if filters.get("item_group"):
query = query.where(child_doc.item_group == filters.item_group)
if parent_doc.get_table_name() != "tabJournal Entry":
query = filter_invoices_based_on_dimensions(filters, query, parent_doc)
return query
def get_advance_taxes_and_charges(invoice_list):
adv_taxes = frappe.qb.DocType("Advance Taxes and Charges")
return (
frappe.qb.from_(adv_taxes)
.select(
adv_taxes.parent,
adv_taxes.account_head,
(
frappe.qb.terms.Case()
.when(adv_taxes.add_deduct_tax == "Add", Sum(adv_taxes.base_tax_amount))
.else_(Sum(adv_taxes.base_tax_amount) * -1)
).as_("tax_amount"),
)
.where(
(adv_taxes.parent.isin([inv.name for inv in invoice_list]))
& (adv_taxes.charge_type.isin(["On Paid Amount", "Actual"]))
& (adv_taxes.base_tax_amount != 0)
)
.groupby(adv_taxes.parent, adv_taxes.account_head, adv_taxes.add_deduct_tax)
).run(as_dict=True)
def filter_invoices_based_on_dimensions(filters, query, parent_doc):
accounting_dimensions = get_accounting_dimensions(as_list=False)
if accounting_dimensions:
for dimension in accounting_dimensions:
if filters.get(dimension.fieldname):
if frappe.get_cached_value("DocType", dimension.document_type, "is_tree"):
filters[dimension.fieldname] = get_dimension_with_children(
dimension.document_type, filters.get(dimension.fieldname)
)
fieldname = dimension.fieldname
query = query.where(parent_doc[fieldname] == filters.fieldname)
return query
def get_opening_row(party_type, party, from_date, company):
party_account = get_party_account(party_type, party, company, include_advance=True)
gle = frappe.qb.DocType("GL Entry")
return (
frappe.qb.from_(gle)
.select(
ConstantColumn("Opening").as_("account"),
Sum(gle.debit).as_("debit"),
Sum(gle.credit).as_("credit"),
(Sum(gle.debit) - Sum(gle.credit)).as_("balance"),
)
.where(
(gle.account.isin(party_account))
& (gle.party == party)
& (gle.posting_date < from_date)
& (gle.is_cancelled == 0)
)
).run(as_dict=True)