refactor: use qb to fetch PE JV and Inv

This commit is contained in:
Gursheen Anand 2023-07-12 14:43:18 +05:30
parent 1e8b8b5b29
commit 6c11ca1b75
3 changed files with 206 additions and 218 deletions

View File

@ -4,13 +4,15 @@
import frappe
from frappe import _, msgprint
from frappe.query_builder.custom import ConstantColumn
from frappe.utils import flt
from pypika import Order
from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import (
get_accounting_dimensions,
get_dimension_with_children,
)
from erpnext.accounts.report.utils import (
get_conditions,
get_journal_entries,
get_party_details,
get_payment_entries,
@ -203,99 +205,53 @@ def get_columns(invoice_list, additional_table_columns, include_payments=False):
return columns, expense_accounts, tax_accounts, unrealized_profit_loss_accounts
def get_conditions(filters, payments=False):
conditions = ""
if filters.get("company"):
conditions += " and company=%(company)s"
if filters.get("supplier"):
conditions += " and party = %(supplier)s" if payments else " and supplier = %(supplier)s"
if filters.get("from_date"):
conditions += " and posting_date>=%(from_date)s"
if filters.get("to_date"):
conditions += " and posting_date<=%(to_date)s"
if filters.get("mode_of_payment"):
conditions += " and ifnull(mode_of_payment, '') = %(mode_of_payment)s"
if filters.get("cost_center"):
if payments:
conditions += " and cost_center = %(cost_center)s"
else:
conditions += """ and exists(select name from `tabPurchase Invoice Item`
where parent=`tabPurchase Invoice`.name
and ifnull(`tabPurchase Invoice Item`.cost_center, '') = %(cost_center)s)"""
if filters.get("warehouse") and not payments:
conditions += """ and exists(select name from `tabPurchase Invoice Item`
where parent=`tabPurchase Invoice`.name
and ifnull(`tabPurchase Invoice Item`.warehouse, '') = %(warehouse)s)"""
if filters.get("item_group") and not payments:
conditions += """ and exists(select name from `tabPurchase Invoice Item`
where parent=`tabPurchase Invoice`.name
and ifnull(`tabPurchase Invoice Item`.item_group, '') = %(item_group)s)"""
accounting_dimensions = get_accounting_dimensions(as_list=False)
if accounting_dimensions:
common_condition = """
and exists(select name from `tabPurchase Invoice Item`
where parent=`tabPurchase Invoice`.name
"""
for dimension in accounting_dimensions:
if filters.get(dimension.fieldname):
if frappe.get_cached_value("DocType", dimension.document_type, "is_tree"):
filters[dimension.fieldname] = get_dimension_with_children(
dimension.document_type, filters.get(dimension.fieldname)
)
conditions += (
common_condition
+ "and ifnull(`tabPurchase Invoice`.{0}, '') in %({0})s)".format(dimension.fieldname)
)
else:
conditions += (
common_condition
+ "and ifnull(`tabPurchase Invoice`.{0}, '') in %({0})s)".format(dimension.fieldname)
)
return conditions
def get_invoices(filters, additional_query_columns):
conditions = get_conditions(filters)
return frappe.db.sql(
"""
select
'Purchase Invoice' as doctype, name, posting_date, credit_to, supplier, supplier_name, tax_id, bill_no, bill_date,
remarks, base_net_total, base_grand_total, outstanding_amount,
mode_of_payment {0}
from `tabPurchase Invoice`
where docstatus = 1 {1}
order by posting_date desc, name desc""".format(
additional_query_columns, conditions
),
filters,
as_dict=1,
accounting_dimensions = get_accounting_dimensions(as_list=False)
pi = frappe.qb.DocType("Purchase Invoice")
invoice_item = frappe.qb.DocType("Purchase Invoice Item")
query = (
frappe.qb.from_(pi)
.inner_join(invoice_item)
.on(pi.name == invoice_item.parent)
.select(
ConstantColumn("Purchase Invoice").as_("doctype"),
pi.name,
pi.posting_date,
pi.credit_to,
pi.supplier,
pi.supplier_name,
pi.tax_id,
pi.bill_no,
pi.bill_date,
pi.remarks,
pi.base_net_total,
pi.base_grand_total,
pi.outstanding_amount,
pi.mode_of_payment,
)
.where((pi.docstatus == 1))
.orderby(pi.posting_date, pi.name, order=Order.desc)
)
if filters.get("supplier"):
query = query.where(pi.supplier == filters.supplier)
query = get_conditions(filters, query, [pi, invoice_item], accounting_dimensions)
invoices = query.run(as_dict=True, debug=True)
return invoices
def get_payments(filters, additional_query_columns):
if additional_query_columns:
additional_query_columns = ", " + ", ".join(additional_query_columns)
conditions = get_conditions(filters, payments=True)
args = frappe._dict(
account="credit_to",
party="supplier",
party_name="supplier_name",
additional_query_columns="" if not additional_query_columns else additional_query_columns,
conditions=conditions,
)
payment_entries = get_payment_entries(filters, args)
journal_entries = get_journal_entries(filters, args)
accounting_dimensions = get_accounting_dimensions(as_list=False)
payment_entries = get_payment_entries(filters, accounting_dimensions, args)
journal_entries = get_journal_entries(filters, accounting_dimensions, args)
return payment_entries + journal_entries

View File

@ -5,13 +5,15 @@
import frappe
from frappe import _, msgprint
from frappe.model.meta import get_field_precision
from frappe.query_builder.custom import ConstantColumn
from frappe.utils import flt
from pypika import Order
from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import (
get_accounting_dimensions,
get_dimension_with_children,
)
from erpnext.accounts.report.utils import (
get_conditions,
get_journal_entries,
get_party_details,
get_payment_entries,
@ -359,96 +361,61 @@ def get_columns(invoice_list, additional_table_columns, include_payments=False):
return columns, income_accounts, tax_accounts, unrealized_profit_loss_accounts
def get_conditions(filters, payments=False):
conditions = ""
accounting_dimensions = get_accounting_dimensions(as_list=False) or []
accounting_dimensions_list = [d.fieldname for d in accounting_dimensions]
if filters.get("company"):
conditions += " and company=%(company)s"
if filters.get("customer") and "customer" not in accounting_dimensions_list:
conditions += " and party = %(customer)s" if payments else " and customer = %(customer)s"
if filters.get("from_date"):
conditions += " and posting_date >= %(from_date)s"
if filters.get("to_date"):
conditions += " and posting_date <= %(to_date)s"
if filters.get("owner"):
conditions += " and owner = %(owner)s"
def get_sales_invoice_item_field_condition(field, table="Sales Invoice Item") -> str:
if not filters.get(field) or field in accounting_dimensions_list:
return ""
return f""" and exists(select name from `tab{table}`
where parent=`tabSales Invoice`.name
and ifnull(`tab{table}`.{field}, '') = %({field})s)"""
conditions += get_sales_invoice_item_field_condition("mode_of_payment", "Sales Invoice Payment")
conditions += get_sales_invoice_item_field_condition("cost_center")
conditions += get_sales_invoice_item_field_condition("warehouse")
conditions += get_sales_invoice_item_field_condition("brand")
conditions += get_sales_invoice_item_field_condition("item_group")
if accounting_dimensions:
common_condition = """
and exists(select name from `tabSales Invoice Item`
where parent=`tabSales Invoice`.name
"""
for dimension in accounting_dimensions:
if filters.get(dimension.fieldname):
if frappe.get_cached_value("DocType", dimension.document_type, "is_tree"):
filters[dimension.fieldname] = get_dimension_with_children(
dimension.document_type, filters.get(dimension.fieldname)
)
conditions += (
common_condition
+ "and ifnull(`tabSales Invoice`.{0}, '') in %({0})s)".format(dimension.fieldname)
)
else:
conditions += (
common_condition
+ "and ifnull(`tabSales Invoice`.{0}, '') in %({0})s)".format(dimension.fieldname)
)
return conditions
def get_invoices(filters, additional_query_columns):
conditions = get_conditions(filters)
return frappe.db.sql(
"""
select 'Sales Invoice' as doctype, name, posting_date, debit_to, project, customer,
customer_name, owner, remarks, territory, tax_id, customer_group,
base_net_total, base_grand_total, base_rounded_total, outstanding_amount,
is_internal_customer, represents_company, company {0}
from `tabSales Invoice`
where docstatus = 1 {1}
order by posting_date desc, name desc""".format(
additional_query_columns, conditions
),
filters,
as_dict=1,
accounting_dimensions = get_accounting_dimensions(as_list=False)
si = frappe.qb.DocType("Sales Invoice")
invoice_item = frappe.qb.DocType("Sales Invoice Item")
invoice_payment = frappe.qb.DocType("Sales Invoice Payment")
query = (
frappe.qb.from_(si)
.inner_join(invoice_item)
.on(si.name == invoice_item.parent)
.left_join(invoice_payment)
.on(si.name == invoice_payment.parent)
.select(
ConstantColumn("Sales Invoice").as_("doctype"),
si.name,
si.posting_date,
si.debit_to,
si.project,
si.customer,
si.customer_name,
si.owner,
si.remarks,
si.territory,
si.tax_id,
si.customer_group,
si.base_net_total,
si.base_grand_total,
si.base_rounded_total,
si.outstanding_amount,
si.is_internal_customer,
si.represents_company,
si.company,
)
.where((si.docstatus == 1))
.orderby(si.posting_date, si.name, order=Order.desc)
)
if filters.get("customer"):
query = query.where(si.customer == filters.customer)
query = get_conditions(filters, query, [si, invoice_item, invoice_payment], accounting_dimensions)
invoices = query.run(as_dict=True, debug=True)
return invoices
def get_payments(filters, additional_query_columns):
if additional_query_columns:
additional_query_columns = ", " + ", ".join(additional_query_columns)
conditions = get_conditions(filters, payments=True)
args = frappe._dict(
account="debit_to",
party="customer",
party_name="customer_name",
additional_query_columns="" if not additional_query_columns else additional_query_columns,
conditions=conditions,
)
payment_entries = get_payment_entries(filters, args)
journal_entries = get_journal_entries(filters, args)
accounting_dimensions = get_accounting_dimensions(as_list=False)
payment_entries = get_payment_entries(filters, accounting_dimensions, args)
journal_entries = get_journal_entries(filters, accounting_dimensions, args)
return payment_entries + journal_entries

View File

@ -1,7 +1,12 @@
import frappe
from frappe.query_builder.custom import ConstantColumn
from frappe.utils import flt, formatdate, get_datetime_str, get_table_name
from pypika import Order
from erpnext import get_company_currency, get_default_company
from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import (
get_dimension_with_children,
)
from erpnext.accounts.doctype.fiscal_year.fiscal_year import get_from_and_to_date
from erpnext.setup.utils import get_exchange_rate
@ -152,6 +157,35 @@ def get_invoiced_item_gross_margin(
return result
def get_query_columns(report_columns):
if not report_columns:
return ""
columns = []
for column in report_columns:
fieldname = column["fieldname"]
if doctype := column.get("_doctype"):
columns.append(f"`{get_table_name(doctype)}`.`{fieldname}`")
else:
columns.append(fieldname)
return ", " + ", ".join(columns)
def get_values_for_columns(report_columns, report_row):
values = {}
if not report_columns:
return values
for column in report_columns:
fieldname = column["fieldname"]
values[fieldname] = report_row.get(fieldname)
return values
def get_party_details(party_type, party_list):
party_details = {}
party = frappe.qb.DocType(party_type)
@ -190,74 +224,105 @@ def get_taxes_query(invoice_list, doctype, parenttype):
return query.where(taxes.charge_type.isin(["On Paid Amount", "Actual"]))
def get_journal_entries(filters, args):
return frappe.db.sql(
"""
select je.voucher_type as doctype, je.name, je.posting_date,
jea.account as {0}, jea.party as {1}, jea.party as {2},
je.bill_no, je.bill_date, je.remark, je.total_amount as base_net_total,
je.total_amount as base_grand_total, je.mode_of_payment, jea.project {3}
from `tabJournal Entry` je left join `tabJournal Entry Account` jea on jea.parent=je.name
where je.voucher_type='Journal Entry' and jea.party='{4}' {5}
order by je.posting_date desc, je.name desc""".format(
args.account,
args.party,
args.party_name,
args.additional_query_columns,
filters.get(args.party),
args.conditions,
),
filters,
as_dict=1,
def get_journal_entries(filters, accounting_dimensions, args):
je = frappe.qb.DocType("Journal Entry")
journal_account = frappe.qb.DocType("Journal Entry Account")
query = (
frappe.qb.from_(je)
.inner_join(journal_account)
.on(je.name == journal_account.parent)
.select(
je.voucher_type.as_("doctype"),
je.name,
je.posting_date,
journal_account.account.as_(args.account),
journal_account.party.as_(args.party),
journal_account.party.as_(args.party_name),
je.bill_no,
je.bill_date,
je.remark.as_("remarks"),
je.total_amount.as_("base_net_total"),
je.total_amount.as_("base_grand_total"),
je.mode_of_payment,
journal_account.project,
)
.where((je.voucher_type == "Journal Entry") & (journal_account.party == filters.get(args.party)))
.orderby(je.posting_date, je.name, order=Order.desc)
)
query = get_conditions(filters, query, [je], accounting_dimensions, payments=True)
journal_entries = query.run(as_dict=True, debug=True)
return journal_entries
def get_payment_entries(filters, args):
return frappe.db.sql(
"""
select 'Payment Entry' as doctype, name, posting_date, paid_to as {0},
party as {1}, party_name as {2}, remarks,
paid_amount as base_net_total, paid_amount_after_tax as base_grand_total,
mode_of_payment, project, cost_center {3}
from `tabPayment Entry`
where party='{4}' {5}
order by posting_date desc, name desc""".format(
args.account,
args.party,
args.party_name,
args.additional_query_columns,
filters.get(args.party),
args.conditions,
),
filters,
as_dict=1,
def get_payment_entries(filters, accounting_dimensions, args):
pe = frappe.qb.DocType("Payment Entry")
query = (
frappe.qb.from_(pe)
.select(
ConstantColumn("Payment Entry").as_("doctype"),
pe.name,
pe.posting_date,
pe.paid_to.as_(args.account),
pe.party.as_(args.party),
pe.party_name.as_(args.party_name),
pe.remarks,
pe.paid_amount.as_("base_net_total"),
pe.paid_amount_after_tax.as_("base_grand_total"),
pe.mode_of_payment,
pe.project,
pe.cost_center,
)
.where((pe.party == filters.get(args.party)))
.orderby(pe.posting_date, pe.name, order=Order.desc)
)
query = get_conditions(filters, query, [pe], accounting_dimensions, payments=True)
payment_entries = query.run(as_dict=True, debug=True)
return payment_entries
def get_query_columns(report_columns):
if not report_columns:
return ""
def get_conditions(filters, query, docs, accounting_dimensions, payments=False):
parent_doc = docs[0]
if not payments:
child_doc = docs[1]
columns = []
for column in report_columns:
fieldname = column["fieldname"]
if parent_doc.get_table_name() == "tabSales Invoice":
if filters.get("owner"):
query = query.where(parent_doc.owner == filters.owner)
if filters.get("mode_of_payment"):
payment_doc = docs[2]
query = query.where(payment_doc.mode_of_payment == filters.mode_of_payment)
if not payments:
if filters.get("brand"):
query = query.where(child_doc.brand == filters.brand)
else:
if filters.get("mode_of_payment"):
query = query.where(parent_doc.mode_of_payment == filters.mode_of_payment)
if doctype := column.get("_doctype"):
columns.append(f"`{get_table_name(doctype)}`.`{fieldname}`")
else:
columns.append(fieldname)
if filters.get("company"):
query = query.where(parent_doc.company == filters.company)
if filters.get("from_date"):
query = query.where(parent_doc.posting_date >= filters.from_date)
if filters.get("to_date"):
query = query.where(parent_doc.posting_date <= filters.to_date)
return ", " + ", ".join(columns)
if payments:
if filters.get("cost_center"):
query = query.where(parent_doc.cost_center == filters.cost_center)
else:
if filters.get("cost_center"):
query = query.where(child_doc.cost_center == filters.cost_center)
if filters.get("warehouse"):
query = query.where(child_doc.warehouse == filters.warehouse)
if filters.get("item_group"):
query = query.where(child_doc.item_group == filters.item_group)
def get_values_for_columns(report_columns, report_row):
values = {}
if not report_columns:
return values
for column in report_columns:
fieldname = column["fieldname"]
values[fieldname] = report_row.get(fieldname)
return values
if accounting_dimensions:
for dimension in accounting_dimensions:
if filters.get(dimension.fieldname):
if frappe.get_cached_value("DocType", dimension.document_type, "is_tree"):
filters[dimension.fieldname] = get_dimension_with_children(
dimension.document_type, filters.get(dimension.fieldname)
)
fieldname = dimension.fieldname
query = query.where(parent_doc.fieldname.isin(filters.fieldname))
return query