# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors # License: GNU General Public License v3. See license.txt from json import loads from typing import TYPE_CHECKING, List, Optional, Tuple import frappe import frappe.defaults from frappe import _, qb, throw from frappe.model.meta import get_field_precision from frappe.query_builder import AliasedQuery, Criterion, Table from frappe.query_builder.functions import Sum from frappe.query_builder.utils import DocType from frappe.utils import ( cint, create_batch, cstr, flt, formatdate, get_number_format_info, getdate, now, nowdate, ) from pypika import Order from pypika.terms import ExistsCriterion import erpnext # imported to enable erpnext.accounts.utils.get_account_currency from erpnext.accounts.doctype.account.account import get_account_currency # noqa from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import get_dimensions from erpnext.stock import get_warehouse_account_map from erpnext.stock.utils import get_stock_value_on if TYPE_CHECKING: from erpnext.stock.doctype.repost_item_valuation.repost_item_valuation import RepostItemValuation class FiscalYearError(frappe.ValidationError): pass class PaymentEntryUnlinkError(frappe.ValidationError): pass GL_REPOSTING_CHUNK = 100 @frappe.whitelist() def get_fiscal_year( date=None, fiscal_year=None, label="Date", verbose=1, company=None, as_dict=False, boolean=False ): fiscal_years = get_fiscal_years( date, fiscal_year, label, verbose, company, as_dict=as_dict, boolean=boolean ) if boolean: return fiscal_years else: return fiscal_years[0] def get_fiscal_years( transaction_date=None, fiscal_year=None, label="Date", verbose=1, company=None, as_dict=False, boolean=False, ): fiscal_years = frappe.cache().hget("fiscal_years", company) or [] if not fiscal_years: # if year start date is 2012-04-01, year end date should be 2013-03-31 (hence subdate) FY = DocType("Fiscal Year") query = ( frappe.qb.from_(FY) .select(FY.name, FY.year_start_date, FY.year_end_date) .where(FY.disabled == 0) ) if fiscal_year: query = query.where(FY.name == fiscal_year) if company: FYC = DocType("Fiscal Year Company") query = query.where( ExistsCriterion(frappe.qb.from_(FYC).select(FYC.name).where(FYC.parent == FY.name)).negate() | ExistsCriterion( frappe.qb.from_(FYC) .select(FYC.company) .where(FYC.parent == FY.name) .where(FYC.company == company) ) ) query = query.orderby(FY.year_start_date, order=Order.desc) fiscal_years = query.run(as_dict=True) frappe.cache().hset("fiscal_years", company, fiscal_years) if not transaction_date and not fiscal_year: return fiscal_years if transaction_date: transaction_date = getdate(transaction_date) for fy in fiscal_years: matched = False if fiscal_year and fy.name == fiscal_year: matched = True if ( transaction_date and getdate(fy.year_start_date) <= transaction_date and getdate(fy.year_end_date) >= transaction_date ): matched = True if matched: if as_dict: return (fy,) else: return ((fy.name, fy.year_start_date, fy.year_end_date),) error_msg = _("""{0} {1} is not in any active Fiscal Year""").format( label, formatdate(transaction_date) ) if company: error_msg = _("""{0} for {1}""").format(error_msg, frappe.bold(company)) if boolean: return False if verbose == 1: frappe.msgprint(error_msg) raise FiscalYearError(error_msg) @frappe.whitelist() def get_fiscal_year_filter_field(company=None): field = {"fieldtype": "Select", "options": [], "operator": "Between", "query_value": True} fiscal_years = get_fiscal_years(company=company) for fiscal_year in fiscal_years: field["options"].append( { "label": fiscal_year.name, "value": fiscal_year.name, "query_value": [ fiscal_year.year_start_date.strftime("%Y-%m-%d"), fiscal_year.year_end_date.strftime("%Y-%m-%d"), ], } ) return field def validate_fiscal_year(date, fiscal_year, company, label="Date", doc=None): years = [f[0] for f in get_fiscal_years(date, label=_(label), company=company)] if fiscal_year not in years: if doc: doc.fiscal_year = years[0] else: throw(_("{0} '{1}' not in Fiscal Year {2}").format(label, formatdate(date), fiscal_year)) @frappe.whitelist() def get_balance_on( account=None, date=None, party_type=None, party=None, company=None, in_account_currency=True, cost_center=None, ignore_account_permission=False, account_type=None, ): if not account and frappe.form_dict.get("account"): account = frappe.form_dict.get("account") if not date and frappe.form_dict.get("date"): date = frappe.form_dict.get("date") if not party_type and frappe.form_dict.get("party_type"): party_type = frappe.form_dict.get("party_type") if not party and frappe.form_dict.get("party"): party = frappe.form_dict.get("party") if not cost_center and frappe.form_dict.get("cost_center"): cost_center = frappe.form_dict.get("cost_center") cond = ["is_cancelled=0"] if date: cond.append("posting_date <= %s" % frappe.db.escape(cstr(date))) else: # get balance of all entries that exist date = nowdate() if account: acc = frappe.get_doc("Account", account) try: year_start_date = get_fiscal_year(date, company=company, verbose=0)[1] except FiscalYearError: if getdate(date) > getdate(nowdate()): # if fiscal year not found and the date is greater than today # get fiscal year for today's date and its corresponding year start date year_start_date = get_fiscal_year(nowdate(), verbose=1)[1] else: # this indicates that it is a date older than any existing fiscal year. # hence, assuming balance as 0.0 return 0.0 if account: report_type = acc.report_type else: report_type = "" if cost_center and report_type == "Profit and Loss": cc = frappe.get_doc("Cost Center", cost_center) if cc.is_group: cond.append( """ exists ( select 1 from `tabCost Center` cc where cc.name = gle.cost_center and cc.lft >= %s and cc.rgt <= %s )""" % (cc.lft, cc.rgt) ) else: cond.append("""gle.cost_center = %s """ % (frappe.db.escape(cost_center, percent=False),)) if account: if not (frappe.flags.ignore_account_permission or ignore_account_permission): acc.check_permission("read") # different filter for group and ledger - improved performance if acc.is_group: cond.append( """exists ( select name from `tabAccount` ac where ac.name = gle.account and ac.lft >= %s and ac.rgt <= %s )""" % (acc.lft, acc.rgt) ) # If group and currency same as company, # always return balance based on debit and credit in company currency if acc.account_currency == frappe.get_cached_value("Company", acc.company, "default_currency"): in_account_currency = False else: cond.append("""gle.account = %s """ % (frappe.db.escape(account, percent=False),)) if account_type: accounts = frappe.db.get_all( "Account", filters={"company": company, "account_type": account_type, "is_group": 0}, pluck="name", order_by="lft", ) cond.append( """ gle.account in (%s) """ % (", ".join([frappe.db.escape(account) for account in accounts])) ) if party_type and party: cond.append( """gle.party_type = %s and gle.party = %s """ % (frappe.db.escape(party_type), frappe.db.escape(party, percent=False)) ) if company: cond.append("""gle.company = %s """ % (frappe.db.escape(company, percent=False))) if account or (party_type and party) or account_type: if in_account_currency: select_field = "sum(debit_in_account_currency) - sum(credit_in_account_currency)" else: select_field = "sum(debit) - sum(credit)" bal = frappe.db.sql( """ SELECT {0} FROM `tabGL Entry` gle WHERE {1}""".format( select_field, " and ".join(cond) ) )[0][0] # if bal is None, return 0 return flt(bal) def get_count_on(account, fieldname, date): cond = ["is_cancelled=0"] if date: cond.append("posting_date <= %s" % frappe.db.escape(cstr(date))) else: # get balance of all entries that exist date = nowdate() try: year_start_date = get_fiscal_year(date, verbose=0)[1] except FiscalYearError: if getdate(date) > getdate(nowdate()): # if fiscal year not found and the date is greater than today # get fiscal year for today's date and its corresponding year start date year_start_date = get_fiscal_year(nowdate(), verbose=1)[1] else: # this indicates that it is a date older than any existing fiscal year. # hence, assuming balance as 0.0 return 0.0 if account: acc = frappe.get_doc("Account", account) if not frappe.flags.ignore_account_permission: acc.check_permission("read") # for pl accounts, get balance within a fiscal year if acc.report_type == "Profit and Loss": cond.append( "posting_date >= '%s' and voucher_type != 'Period Closing Voucher'" % year_start_date ) # different filter for group and ledger - improved performance if acc.is_group: cond.append( """exists ( select name from `tabAccount` ac where ac.name = gle.account and ac.lft >= %s and ac.rgt <= %s )""" % (acc.lft, acc.rgt) ) else: cond.append("""gle.account = %s """ % (frappe.db.escape(account, percent=False),)) entries = frappe.db.sql( """ SELECT name, posting_date, account, party_type, party,debit,credit, voucher_type, voucher_no, against_voucher_type, against_voucher FROM `tabGL Entry` gle WHERE {0}""".format( " and ".join(cond) ), as_dict=True, ) count = 0 for gle in entries: if fieldname not in ("invoiced_amount", "payables"): count += 1 else: dr_or_cr = "debit" if fieldname == "invoiced_amount" else "credit" cr_or_dr = "credit" if fieldname == "invoiced_amount" else "debit" select_fields = ( "ifnull(sum(credit-debit),0)" if fieldname == "invoiced_amount" else "ifnull(sum(debit-credit),0)" ) if ( (not gle.against_voucher) or (gle.against_voucher_type in ["Sales Order", "Purchase Order"]) or (gle.against_voucher == gle.voucher_no and gle.get(dr_or_cr) > 0) ): payment_amount = frappe.db.sql( """ SELECT {0} FROM `tabGL Entry` gle WHERE docstatus < 2 and posting_date <= %(date)s and against_voucher = %(voucher_no)s and party = %(party)s and name != %(name)s""".format( select_fields ), {"date": date, "voucher_no": gle.voucher_no, "party": gle.party, "name": gle.name}, )[0][0] outstanding_amount = flt(gle.get(dr_or_cr)) - flt(gle.get(cr_or_dr)) - payment_amount currency_precision = get_currency_precision() or 2 if abs(flt(outstanding_amount)) > 0.1 / 10**currency_precision: count += 1 return count @frappe.whitelist() def add_ac(args=None): from frappe.desk.treeview import make_tree_args if not args: args = frappe.local.form_dict args.doctype = "Account" args = make_tree_args(**args) ac = frappe.new_doc("Account") if args.get("ignore_permissions"): ac.flags.ignore_permissions = True args.pop("ignore_permissions") ac.update(args) if not ac.parent_account: ac.parent_account = args.get("parent") ac.old_parent = "" ac.freeze_account = "No" if cint(ac.get("is_root")): ac.parent_account = None ac.flags.ignore_mandatory = True ac.insert() return ac.name @frappe.whitelist() def add_cc(args=None): from frappe.desk.treeview import make_tree_args if not args: args = frappe.local.form_dict args.doctype = "Cost Center" args = make_tree_args(**args) if args.parent_cost_center == args.company: args.parent_cost_center = "{0} - {1}".format( args.parent_cost_center, frappe.get_cached_value("Company", args.company, "abbr") ) cc = frappe.new_doc("Cost Center") cc.update(args) if not cc.parent_cost_center: cc.parent_cost_center = args.get("parent") cc.old_parent = "" cc.insert() return cc.name def reconcile_against_document(args, skip_ref_details_update_for_pe=False): # nosemgrep """ Cancel PE or JV, Update against document, split if required and resubmit """ # To optimize making GL Entry for PE or JV with multiple references reconciled_entries = {} for row in args: if not reconciled_entries.get((row.voucher_type, row.voucher_no)): reconciled_entries[(row.voucher_type, row.voucher_no)] = [] reconciled_entries[(row.voucher_type, row.voucher_no)].append(row) for key, entries in reconciled_entries.items(): voucher_type = key[0] voucher_no = key[1] # cancel advance entry doc = frappe.get_doc(voucher_type, voucher_no) frappe.flags.ignore_party_validation = True _delete_pl_entries(voucher_type, voucher_no) for entry in entries: check_if_advance_entry_modified(entry) validate_allocated_amount(entry) # update ref in advance entry if voucher_type == "Journal Entry": referenced_row = update_reference_in_journal_entry(entry, doc, do_not_save=False) # advance section in sales/purchase invoice and reconciliation tool,both pass on exchange gain/loss # amount and account in args # referenced_row is used to deduplicate gain/loss journal entry.update({"referenced_row": referenced_row}) doc.make_exchange_gain_loss_journal([entry]) else: update_reference_in_payment_entry( entry, doc, do_not_save=True, skip_ref_details_update_for_pe=skip_ref_details_update_for_pe ) doc.save(ignore_permissions=True) # re-submit advance entry doc = frappe.get_doc(entry.voucher_type, entry.voucher_no) gl_map = doc.build_gl_map() create_payment_ledger_entry(gl_map, update_outstanding="No", cancel=0, adv_adj=1) if voucher_type == "Payment Entry": doc.make_advance_gl_entries() # Only update outstanding for newly linked vouchers for entry in entries: update_voucher_outstanding( entry.against_voucher_type, entry.against_voucher, entry.account, entry.party_type, entry.party ) frappe.flags.ignore_party_validation = False def check_if_advance_entry_modified(args): """ check if there is already a voucher reference check if amount is same check if jv is submitted """ if not args.get("unreconciled_amount"): args.update({"unreconciled_amount": args.get("unadjusted_amount")}) ret = None if args.voucher_type == "Journal Entry": journal_entry = frappe.qb.DocType("Journal Entry") journal_acc = frappe.qb.DocType("Journal Entry Account") q = ( frappe.qb.from_(journal_entry) .inner_join(journal_acc) .on(journal_entry.name == journal_acc.parent) .select(journal_acc[args.get("dr_or_cr")]) .where( (journal_acc.account == args.get("account")) & ((journal_acc.party_type == args.get("party_type"))) & ((journal_acc.party == args.get("party"))) & ( (journal_acc.reference_type.isnull()) | (journal_acc.reference_type.isin(["", "Sales Order", "Purchase Order"])) ) & ((journal_entry.name == args.get("voucher_no"))) & ((journal_acc.name == args.get("voucher_detail_no"))) & ((journal_entry.docstatus == 1)) ) ) else: payment_entry = frappe.qb.DocType("Payment Entry") payment_ref = frappe.qb.DocType("Payment Entry Reference") q = ( frappe.qb.from_(payment_entry) .select(payment_entry.name) .where(payment_entry.name == args.get("voucher_no")) .where(payment_entry.docstatus == 1) .where(payment_entry.party_type == args.get("party_type")) .where(payment_entry.party == args.get("party")) ) if args.voucher_detail_no: q = ( q.inner_join(payment_ref) .on(payment_entry.name == payment_ref.parent) .where(payment_ref.name == args.get("voucher_detail_no")) .where(payment_ref.reference_doctype.isin(("", "Sales Order", "Purchase Order"))) .where(payment_ref.allocated_amount == args.get("unreconciled_amount")) ) else: q = q.where(payment_entry.unallocated_amount == args.get("unreconciled_amount")) ret = q.run(as_dict=True) if not ret: throw(_("""Payment Entry has been modified after you pulled it. Please pull it again.""")) def validate_allocated_amount(args): precision = args.get("precision") or frappe.db.get_single_value( "System Settings", "currency_precision" ) if args.get("allocated_amount") < 0: throw(_("Allocated amount cannot be negative")) elif flt(args.get("allocated_amount"), precision) > flt(args.get("unadjusted_amount"), precision): throw(_("Allocated amount cannot be greater than unadjusted amount")) def update_reference_in_journal_entry(d, journal_entry, do_not_save=False): """ Updates against document, if partial amount splits into rows """ jv_detail = journal_entry.get("accounts", {"name": d["voucher_detail_no"]})[0] if flt(d["unadjusted_amount"]) - flt(d["allocated_amount"]) != 0: # adjust the unreconciled balance amount_in_account_currency = flt(d["unadjusted_amount"]) - flt(d["allocated_amount"]) amount_in_company_currency = amount_in_account_currency * flt(jv_detail.exchange_rate) jv_detail.set(d["dr_or_cr"], amount_in_account_currency) jv_detail.set( "debit" if d["dr_or_cr"] == "debit_in_account_currency" else "credit", amount_in_company_currency, ) else: journal_entry.remove(jv_detail) # new row with references new_row = journal_entry.append("accounts") # Copy field values into new row [ new_row.set(field, jv_detail.get(field)) for field in frappe.get_meta("Journal Entry Account").get_fieldnames_with_value() ] new_row.set(d["dr_or_cr"], d["allocated_amount"]) new_row.set( "debit" if d["dr_or_cr"] == "debit_in_account_currency" else "credit", d["allocated_amount"] * flt(jv_detail.exchange_rate), ) new_row.set( "credit_in_account_currency" if d["dr_or_cr"] == "debit_in_account_currency" else "debit_in_account_currency", 0, ) new_row.set("credit" if d["dr_or_cr"] == "debit_in_account_currency" else "debit", 0) new_row.set("reference_type", d["against_voucher_type"]) new_row.set("reference_name", d["against_voucher"]) new_row.against_account = cstr(jv_detail.against_account) new_row.is_advance = cstr(jv_detail.is_advance) new_row.docstatus = 1 # will work as update after submit journal_entry.flags.ignore_validate_update_after_submit = True if not do_not_save: journal_entry.save(ignore_permissions=True) return new_row.name def update_reference_in_payment_entry( d, payment_entry, do_not_save=False, skip_ref_details_update_for_pe=False ): reference_details = { "reference_doctype": d.against_voucher_type, "reference_name": d.against_voucher, "total_amount": d.grand_total, "outstanding_amount": d.outstanding_amount, "allocated_amount": d.allocated_amount, "exchange_rate": d.exchange_rate if d.exchange_gain_loss else payment_entry.get_exchange_rate(), "exchange_gain_loss": d.exchange_gain_loss, # only populated from invoice in case of advance allocation "account": d.account, } if d.voucher_detail_no: existing_row = payment_entry.get("references", {"name": d["voucher_detail_no"]})[0] original_row = existing_row.as_dict().copy() existing_row.update(reference_details) if d.allocated_amount < original_row.allocated_amount: new_row = payment_entry.append("references") new_row.docstatus = 1 for field in list(reference_details): new_row.set(field, original_row[field]) new_row.allocated_amount = original_row.allocated_amount - d.allocated_amount else: new_row = payment_entry.append("references") new_row.docstatus = 1 new_row.update(reference_details) payment_entry.flags.ignore_validate_update_after_submit = True payment_entry.setup_party_account_field() payment_entry.set_missing_values() if not skip_ref_details_update_for_pe: payment_entry.set_missing_ref_details() payment_entry.set_amounts() payment_entry.make_exchange_gain_loss_journal() if not do_not_save: payment_entry.save(ignore_permissions=True) def cancel_exchange_gain_loss_journal(parent_doc: dict | object) -> None: """ Cancel Exchange Gain/Loss for Sales/Purchase Invoice, if they have any. """ if parent_doc.doctype in ["Sales Invoice", "Purchase Invoice", "Payment Entry", "Journal Entry"]: journals = frappe.db.get_all( "Journal Entry Account", filters={ "reference_type": parent_doc.doctype, "reference_name": parent_doc.name, "docstatus": 1, }, fields=["parent"], as_list=1, ) if journals: gain_loss_journals = frappe.db.get_all( "Journal Entry", filters={ "name": ["in", [x[0] for x in journals]], "voucher_type": "Exchange Gain Or Loss", "docstatus": 1, }, as_list=1, ) for doc in gain_loss_journals: frappe.get_doc("Journal Entry", doc[0]).cancel() def unlink_ref_doc_from_payment_entries(ref_doc): remove_ref_doc_link_from_jv(ref_doc.doctype, ref_doc.name) remove_ref_doc_link_from_pe(ref_doc.doctype, ref_doc.name) frappe.db.sql( """update `tabGL Entry` set against_voucher_type=null, against_voucher=null, modified=%s, modified_by=%s where against_voucher_type=%s and against_voucher=%s and voucher_no != ifnull(against_voucher, '')""", (now(), frappe.session.user, ref_doc.doctype, ref_doc.name), ) ple = qb.DocType("Payment Ledger Entry") qb.update(ple).set(ple.against_voucher_type, ple.voucher_type).set( ple.against_voucher_no, ple.voucher_no ).set(ple.modified, now()).set(ple.modified_by, frappe.session.user).where( (ple.against_voucher_type == ref_doc.doctype) & (ple.against_voucher_no == ref_doc.name) & (ple.delinked == 0) ).run() if ref_doc.doctype in ("Sales Invoice", "Purchase Invoice"): ref_doc.set("advances", []) frappe.db.sql( """delete from `tab{0} Advance` where parent = %s""".format(ref_doc.doctype), ref_doc.name ) def remove_ref_doc_link_from_jv(ref_type, ref_no): linked_jv = frappe.db.sql_list( """select parent from `tabJournal Entry Account` where reference_type=%s and reference_name=%s and docstatus < 2""", (ref_type, ref_no), ) if linked_jv: frappe.db.sql( """update `tabJournal Entry Account` set reference_type=null, reference_name = null, modified=%s, modified_by=%s where reference_type=%s and reference_name=%s and docstatus < 2""", (now(), frappe.session.user, ref_type, ref_no), ) frappe.msgprint(_("Journal Entries {0} are un-linked").format("\n".join(linked_jv))) def remove_ref_doc_link_from_pe(ref_type, ref_no): linked_pe = frappe.db.sql_list( """select parent from `tabPayment Entry Reference` where reference_doctype=%s and reference_name=%s and docstatus < 2""", (ref_type, ref_no), ) if linked_pe: frappe.db.sql( """update `tabPayment Entry Reference` set allocated_amount=0, modified=%s, modified_by=%s where reference_doctype=%s and reference_name=%s and docstatus < 2""", (now(), frappe.session.user, ref_type, ref_no), ) for pe in linked_pe: try: pe_doc = frappe.get_doc("Payment Entry", pe) pe_doc.set_amounts() pe_doc.make_advance_gl_entries(against_voucher_type=ref_type, against_voucher=ref_no, cancel=1) pe_doc.clear_unallocated_reference_document_rows() pe_doc.validate_payment_type_with_outstanding() except Exception as e: msg = _("There were issues unlinking payment entry {0}.").format(pe_doc.name) msg += "
" msg += _("Please cancel payment entry manually first") frappe.throw(msg, exc=PaymentEntryUnlinkError, title=_("Payment Unlink Error")) frappe.db.sql( """update `tabPayment Entry` set total_allocated_amount=%s, base_total_allocated_amount=%s, unallocated_amount=%s, modified=%s, modified_by=%s where name=%s""", ( pe_doc.total_allocated_amount, pe_doc.base_total_allocated_amount, pe_doc.unallocated_amount, now(), frappe.session.user, pe, ), ) frappe.msgprint(_("Payment Entries {0} are un-linked").format("\n".join(linked_pe))) @frappe.whitelist() def get_company_default(company, fieldname, ignore_validation=False): value = frappe.get_cached_value("Company", company, fieldname) if not ignore_validation and not value: throw( _("Please set default {0} in Company {1}").format( frappe.get_meta("Company").get_label(fieldname), company ) ) return value def fix_total_debit_credit(): vouchers = frappe.db.sql( """select voucher_type, voucher_no, sum(debit) - sum(credit) as diff from `tabGL Entry` group by voucher_type, voucher_no having sum(debit) != sum(credit)""", as_dict=1, ) for d in vouchers: if abs(d.diff) > 0: dr_or_cr = d.voucher_type == "Sales Invoice" and "credit" or "debit" frappe.db.sql( """update `tabGL Entry` set %s = %s + %s where voucher_type = %s and voucher_no = %s and %s > 0 limit 1""" % (dr_or_cr, dr_or_cr, "%s", "%s", "%s", dr_or_cr), (d.diff, d.voucher_type, d.voucher_no), ) def get_currency_precision(): precision = cint(frappe.db.get_default("currency_precision")) if not precision: number_format = frappe.db.get_default("number_format") or "#,###.##" precision = get_number_format_info(number_format)[2] return precision def get_stock_rbnb_difference(posting_date, company): stock_items = frappe.db.sql_list( """select distinct item_code from `tabStock Ledger Entry` where company=%s""", company, ) pr_valuation_amount = frappe.db.sql( """ select sum(pr_item.valuation_rate * pr_item.qty * pr_item.conversion_factor) from `tabPurchase Receipt Item` pr_item, `tabPurchase Receipt` pr where pr.name = pr_item.parent and pr.docstatus=1 and pr.company=%s and pr.posting_date <= %s and pr_item.item_code in (%s)""" % ("%s", "%s", ", ".join(["%s"] * len(stock_items))), tuple([company, posting_date] + stock_items), )[0][0] pi_valuation_amount = frappe.db.sql( """ select sum(pi_item.valuation_rate * pi_item.qty * pi_item.conversion_factor) from `tabPurchase Invoice Item` pi_item, `tabPurchase Invoice` pi where pi.name = pi_item.parent and pi.docstatus=1 and pi.company=%s and pi.posting_date <= %s and pi_item.item_code in (%s)""" % ("%s", "%s", ", ".join(["%s"] * len(stock_items))), tuple([company, posting_date] + stock_items), )[0][0] # Balance should be stock_rbnb = flt(pr_valuation_amount, 2) - flt(pi_valuation_amount, 2) # Balance as per system stock_rbnb_account = "Stock Received But Not Billed - " + frappe.get_cached_value( "Company", company, "abbr" ) sys_bal = get_balance_on(stock_rbnb_account, posting_date, in_account_currency=False) # Amount should be credited return flt(stock_rbnb) + flt(sys_bal) def get_held_invoices(party_type, party): """ Returns a list of names Purchase Invoices for the given party that are on hold """ held_invoices = None if party_type == "Supplier": held_invoices = frappe.db.sql( "select name from `tabPurchase Invoice` where on_hold = 1 and release_date IS NOT NULL and release_date > CURDATE()", as_dict=1, ) held_invoices = set(d["name"] for d in held_invoices) return held_invoices def get_outstanding_invoices( party_type, party, account, common_filter=None, posting_date=None, min_outstanding=None, max_outstanding=None, accounting_dimensions=None, vouchers=None, # list of dicts [{'voucher_type': '', 'voucher_no': ''}] for filtering limit=None, # passed by reconciliation tool voucher_no=None, # filter passed by reconciliation tool ): ple = qb.DocType("Payment Ledger Entry") outstanding_invoices = [] precision = frappe.get_precision("Sales Invoice", "outstanding_amount") or 2 if account: root_type, account_type = frappe.get_cached_value( "Account", account, ["root_type", "account_type"] ) party_account_type = "Receivable" if root_type == "Asset" else "Payable" party_account_type = account_type or party_account_type else: party_account_type = erpnext.get_party_account_type(party_type) held_invoices = get_held_invoices(party_type, party) common_filter = common_filter or [] common_filter.append(ple.account_type == party_account_type) common_filter.append(ple.account == account) common_filter.append(ple.party_type == party_type) common_filter.append(ple.party == party) ple_query = QueryPaymentLedger() invoice_list = ple_query.get_voucher_outstandings( vouchers=vouchers, common_filter=common_filter, posting_date=posting_date, min_outstanding=min_outstanding, max_outstanding=max_outstanding, get_invoices=True, accounting_dimensions=accounting_dimensions or [], limit=limit, voucher_no=voucher_no, ) for d in invoice_list: payment_amount = d.invoice_amount_in_account_currency - d.outstanding_in_account_currency outstanding_amount = d.outstanding_in_account_currency if outstanding_amount > 0.5 / (10**precision): if ( min_outstanding and max_outstanding and not (outstanding_amount >= min_outstanding and outstanding_amount <= max_outstanding) ): continue if not d.voucher_type == "Purchase Invoice" or d.voucher_no not in held_invoices: outstanding_invoices.append( frappe._dict( { "voucher_no": d.voucher_no, "voucher_type": d.voucher_type, "posting_date": d.posting_date, "invoice_amount": flt(d.invoice_amount_in_account_currency), "payment_amount": payment_amount, "outstanding_amount": outstanding_amount, "due_date": d.due_date, "currency": d.currency, "account": d.account, } ) ) outstanding_invoices = sorted( outstanding_invoices, key=lambda k: k["due_date"] or getdate(nowdate()) ) return outstanding_invoices def get_account_name( account_type=None, root_type=None, is_group=None, account_currency=None, company=None ): """return account based on matching conditions""" return frappe.db.get_value( "Account", { "account_type": account_type or "", "root_type": root_type or "", "is_group": is_group or 0, "account_currency": account_currency or frappe.defaults.get_defaults().currency, "company": company or frappe.defaults.get_defaults().company, }, "name", ) @frappe.whitelist() def get_companies(): """get a list of companies based on permission""" return [d.name for d in frappe.get_list("Company", fields=["name"], order_by="name")] @frappe.whitelist() def get_children(doctype, parent, company, is_root=False): from erpnext.accounts.report.financial_statements import sort_accounts parent_fieldname = "parent_" + doctype.lower().replace(" ", "_") fields = ["name as value", "is_group as expandable"] filters = [["docstatus", "<", 2]] filters.append(['ifnull(`{0}`,"")'.format(parent_fieldname), "=", "" if is_root else parent]) if is_root: fields += ["root_type", "report_type", "account_currency"] if doctype == "Account" else [] filters.append(["company", "=", company]) else: fields += ["root_type", "account_currency"] if doctype == "Account" else [] fields += [parent_fieldname + " as parent"] acc = frappe.get_list(doctype, fields=fields, filters=filters) if doctype == "Account": sort_accounts(acc, is_root, key="value") return acc @frappe.whitelist() def get_account_balances(accounts, company): if isinstance(accounts, str): accounts = loads(accounts) if not accounts: return [] company_currency = frappe.get_cached_value("Company", company, "default_currency") for account in accounts: account["company_currency"] = company_currency account["balance"] = flt( get_balance_on(account["value"], in_account_currency=False, company=company) ) if account["account_currency"] and account["account_currency"] != company_currency: account["balance_in_account_currency"] = flt(get_balance_on(account["value"], company=company)) return accounts def create_payment_gateway_account(gateway, payment_channel="Email"): from erpnext.setup.setup_wizard.operations.install_fixtures import create_bank_account company = frappe.get_cached_value("Global Defaults", "Global Defaults", "default_company") if not company: return # NOTE: we translate Payment Gateway account name because that is going to be used by the end user bank_account = frappe.db.get_value( "Account", {"account_name": _(gateway), "company": company}, ["name", "account_currency"], as_dict=1, ) if not bank_account: # check for untranslated one bank_account = frappe.db.get_value( "Account", {"account_name": gateway, "company": company}, ["name", "account_currency"], as_dict=1, ) if not bank_account: # try creating one bank_account = create_bank_account({"company_name": company, "bank_account": _(gateway)}) if not bank_account: frappe.msgprint(_("Payment Gateway Account not created, please create one manually.")) return # if payment gateway account exists, return if frappe.db.exists( "Payment Gateway Account", {"payment_gateway": gateway, "currency": bank_account.account_currency}, ): return try: frappe.get_doc( { "doctype": "Payment Gateway Account", "is_default": 1, "payment_gateway": gateway, "payment_account": bank_account.name, "currency": bank_account.account_currency, "payment_channel": payment_channel, } ).insert(ignore_permissions=True, ignore_if_duplicate=True) except frappe.DuplicateEntryError: # already exists, due to a reinstall? pass @frappe.whitelist() def update_cost_center(docname, cost_center_name, cost_center_number, company, merge): """ Renames the document by adding the number as a prefix to the current name and updates all transaction where it was present. """ validate_field_number("Cost Center", docname, cost_center_number, company, "cost_center_number") if cost_center_number: frappe.db.set_value("Cost Center", docname, "cost_center_number", cost_center_number.strip()) else: frappe.db.set_value("Cost Center", docname, "cost_center_number", "") frappe.db.set_value("Cost Center", docname, "cost_center_name", cost_center_name.strip()) new_name = get_autoname_with_number(cost_center_number, cost_center_name, company) if docname != new_name: frappe.rename_doc("Cost Center", docname, new_name, force=1, merge=merge) return new_name def validate_field_number(doctype_name, docname, number_value, company, field_name): """Validate if the number entered isn't already assigned to some other document.""" if number_value: filters = {field_name: number_value, "name": ["!=", docname]} if company: filters["company"] = company doctype_with_same_number = frappe.db.get_value(doctype_name, filters) if doctype_with_same_number: frappe.throw( _("{0} Number {1} is already used in {2} {3}").format( doctype_name, number_value, doctype_name.lower(), doctype_with_same_number ) ) def get_autoname_with_number(number_value, doc_title, company): """append title with prefix as number and suffix as company's abbreviation separated by '-'""" company_abbr = frappe.get_cached_value("Company", company, "abbr") parts = [doc_title.strip(), company_abbr] if cstr(number_value).strip(): parts.insert(0, cstr(number_value).strip()) return " - ".join(parts) def parse_naming_series_variable(doc, variable): if variable == "FY": date = doc.get("posting_date") or doc.get("transaction_date") or getdate() return get_fiscal_year(date=date, company=doc.get("company"))[0] @frappe.whitelist() def get_coa(doctype, parent, is_root=None, chart=None): from erpnext.accounts.doctype.account.chart_of_accounts.chart_of_accounts import ( build_tree_from_json, ) # add chart to flags to retrieve when called from expand all function chart = chart if chart else frappe.flags.chart frappe.flags.chart = chart parent = None if parent == _("All Accounts") else parent accounts = build_tree_from_json(chart) # returns alist of dict in a tree render-able form # filter out to show data for the selected node only accounts = [d for d in accounts if d["parent_account"] == parent] return accounts def update_gl_entries_after( posting_date, posting_time, for_warehouses=None, for_items=None, warehouse_account=None, company=None, ): stock_vouchers = get_future_stock_vouchers( posting_date, posting_time, for_warehouses, for_items, company ) repost_gle_for_stock_vouchers(stock_vouchers, posting_date, company, warehouse_account) def repost_gle_for_stock_vouchers( stock_vouchers: List[Tuple[str, str]], posting_date: str, company: Optional[str] = None, warehouse_account=None, repost_doc: Optional["RepostItemValuation"] = None, ): from erpnext.accounts.general_ledger import toggle_debit_credit_if_negative if not stock_vouchers: return if not warehouse_account: warehouse_account = get_warehouse_account_map(company) stock_vouchers = sort_stock_vouchers_by_posting_date(stock_vouchers) if repost_doc and repost_doc.gl_reposting_index: # Restore progress stock_vouchers = stock_vouchers[cint(repost_doc.gl_reposting_index) :] precision = get_field_precision(frappe.get_meta("GL Entry").get_field("debit")) or 2 for stock_vouchers_chunk in create_batch(stock_vouchers, GL_REPOSTING_CHUNK): gle = get_voucherwise_gl_entries(stock_vouchers_chunk, posting_date) for voucher_type, voucher_no in stock_vouchers_chunk: existing_gle = gle.get((voucher_type, voucher_no), []) voucher_obj = frappe.get_doc(voucher_type, voucher_no) # Some transactions post credit as negative debit, this is handled while posting GLE # but while comparing we need to make sure it's flipped so comparisons are accurate expected_gle = toggle_debit_credit_if_negative(voucher_obj.get_gl_entries(warehouse_account)) if expected_gle: if not existing_gle or not compare_existing_and_expected_gle( existing_gle, expected_gle, precision ): _delete_accounting_ledger_entries(voucher_type, voucher_no) voucher_obj.make_gl_entries(gl_entries=expected_gle, from_repost=True) else: _delete_accounting_ledger_entries(voucher_type, voucher_no) if not frappe.flags.in_test: frappe.db.commit() if repost_doc: repost_doc.db_set( "gl_reposting_index", cint(repost_doc.gl_reposting_index) + len(stock_vouchers_chunk), ) def _delete_pl_entries(voucher_type, voucher_no): ple = qb.DocType("Payment Ledger Entry") qb.from_(ple).delete().where( (ple.voucher_type == voucher_type) & (ple.voucher_no == voucher_no) ).run() def _delete_gl_entries(voucher_type, voucher_no): gle = qb.DocType("GL Entry") qb.from_(gle).delete().where( (gle.voucher_type == voucher_type) & (gle.voucher_no == voucher_no) ).run() def _delete_accounting_ledger_entries(voucher_type, voucher_no): """ Remove entries from both General and Payment Ledger for specified Voucher """ _delete_gl_entries(voucher_type, voucher_no) _delete_pl_entries(voucher_type, voucher_no) def sort_stock_vouchers_by_posting_date( stock_vouchers: List[Tuple[str, str]] ) -> List[Tuple[str, str]]: sle = frappe.qb.DocType("Stock Ledger Entry") voucher_nos = [v[1] for v in stock_vouchers] sles = ( frappe.qb.from_(sle) .select(sle.voucher_type, sle.voucher_no, sle.posting_date, sle.posting_time, sle.creation) .where((sle.is_cancelled == 0) & (sle.voucher_no.isin(voucher_nos))) .groupby(sle.voucher_type, sle.voucher_no) .orderby(sle.posting_date) .orderby(sle.posting_time) .orderby(sle.creation) ).run(as_dict=True) sorted_vouchers = [(sle.voucher_type, sle.voucher_no) for sle in sles] unknown_vouchers = set(stock_vouchers) - set(sorted_vouchers) if unknown_vouchers: sorted_vouchers.extend(unknown_vouchers) return sorted_vouchers def get_future_stock_vouchers( posting_date, posting_time, for_warehouses=None, for_items=None, company=None ): values = [] condition = "" if for_items: condition += " and item_code in ({})".format(", ".join(["%s"] * len(for_items))) values += for_items if for_warehouses: condition += " and warehouse in ({})".format(", ".join(["%s"] * len(for_warehouses))) values += for_warehouses if company: condition += " and company = %s" values.append(company) future_stock_vouchers = frappe.db.sql( """select distinct sle.voucher_type, sle.voucher_no from `tabStock Ledger Entry` sle where timestamp(sle.posting_date, sle.posting_time) >= timestamp(%s, %s) and is_cancelled = 0 {condition} order by timestamp(sle.posting_date, sle.posting_time) asc, creation asc for update""".format( condition=condition ), tuple([posting_date, posting_time] + values), as_dict=True, ) return [(d.voucher_type, d.voucher_no) for d in future_stock_vouchers] def get_voucherwise_gl_entries(future_stock_vouchers, posting_date): """Get voucherwise list of GL entries. Only fetches GLE fields required for comparing with new GLE. Check compare_existing_and_expected_gle function below. returns: Dict[Tuple[voucher_type, voucher_no], List[GL Entries]] """ gl_entries = {} if not future_stock_vouchers: return gl_entries voucher_nos = [d[1] for d in future_stock_vouchers] gles = frappe.db.sql( """ select name, account, credit, debit, cost_center, project, voucher_type, voucher_no from `tabGL Entry` where posting_date >= %s and voucher_no in (%s)""" % ("%s", ", ".join(["%s"] * len(voucher_nos))), tuple([posting_date] + voucher_nos), as_dict=1, ) for d in gles: gl_entries.setdefault((d.voucher_type, d.voucher_no), []).append(d) return gl_entries def compare_existing_and_expected_gle(existing_gle, expected_gle, precision): if len(existing_gle) != len(expected_gle): return False matched = True for entry in expected_gle: account_existed = False for e in existing_gle: if entry.account == e.account: account_existed = True if ( entry.account == e.account and (not entry.cost_center or not e.cost_center or entry.cost_center == e.cost_center) and ( flt(entry.debit, precision) != flt(e.debit, precision) or flt(entry.credit, precision) != flt(e.credit, precision) ) ): matched = False break if not account_existed: matched = False break return matched def get_stock_accounts(company, voucher_type=None, voucher_no=None): stock_accounts = [ d.name for d in frappe.db.get_all( "Account", {"account_type": "Stock", "company": company, "is_group": 0} ) ] if voucher_type and voucher_no: if voucher_type == "Journal Entry": stock_accounts = [ d.account for d in frappe.db.get_all( "Journal Entry Account", {"parent": voucher_no, "account": ["in", stock_accounts]}, "account" ) ] else: stock_accounts = [ d.account for d in frappe.db.get_all( "GL Entry", {"voucher_type": voucher_type, "voucher_no": voucher_no, "account": ["in", stock_accounts]}, "account", ) ] return stock_accounts def get_stock_and_account_balance(account=None, posting_date=None, company=None): if not posting_date: posting_date = nowdate() warehouse_account = get_warehouse_account_map(company) account_balance = get_balance_on( account, posting_date, in_account_currency=False, ignore_account_permission=True ) related_warehouses = [ wh for wh, wh_details in warehouse_account.items() if wh_details.account == account and not wh_details.is_group ] total_stock_value = get_stock_value_on(related_warehouses, posting_date) precision = frappe.get_precision("Journal Entry Account", "debit_in_account_currency") return flt(account_balance, precision), flt(total_stock_value, precision), related_warehouses def get_journal_entry(account, stock_adjustment_account, amount): db_or_cr_warehouse_account = ( "credit_in_account_currency" if amount < 0 else "debit_in_account_currency" ) db_or_cr_stock_adjustment_account = ( "debit_in_account_currency" if amount < 0 else "credit_in_account_currency" ) return { "accounts": [ {"account": account, db_or_cr_warehouse_account: abs(amount)}, {"account": stock_adjustment_account, db_or_cr_stock_adjustment_account: abs(amount)}, ] } def check_and_delete_linked_reports(report): """Check if reports are referenced in Desktop Icon""" icons = frappe.get_all("Desktop Icon", fields=["name"], filters={"_report": report}) if icons: for icon in icons: frappe.delete_doc("Desktop Icon", icon) def create_err_and_its_journals(companies: list = None) -> None: if companies: for company in companies: err = frappe.new_doc("Exchange Rate Revaluation") err.company = company.name err.posting_date = nowdate() err.rounding_loss_allowance = 0.0 err.fetch_and_calculate_accounts_data() if err.accounts: err.save().submit() response = err.make_jv_entries() if company.submit_err_jv: jv = response.get("revaluation_jv", None) jv and frappe.get_doc("Journal Entry", jv).submit() jv = response.get("zero_balance_jv", None) jv and frappe.get_doc("Journal Entry", jv).submit() def auto_create_exchange_rate_revaluation_daily() -> None: """ Executed by background job """ companies = frappe.db.get_all( "Company", filters={"auto_exchange_rate_revaluation": 1, "auto_err_frequency": "Daily"}, fields=["name", "submit_err_jv"], ) create_err_and_its_journals(companies) def auto_create_exchange_rate_revaluation_weekly() -> None: """ Executed by background job """ companies = frappe.db.get_all( "Company", filters={"auto_exchange_rate_revaluation": 1, "auto_err_frequency": "Weekly"}, fields=["name", "submit_err_jv"], ) create_err_and_its_journals(companies) def get_payment_ledger_entries(gl_entries, cancel=0): ple_map = [] if gl_entries: ple = None # companies account = qb.DocType("Account") companies = list(set([x.company for x in gl_entries])) # receivable/payable account accounts_with_types = ( qb.from_(account) .select(account.name, account.account_type) .where( (account.account_type.isin(["Receivable", "Payable"]) & (account.company.isin(companies))) ) .run(as_dict=True) ) receivable_or_payable_accounts = [y.name for y in accounts_with_types] def get_account_type(account): for entry in accounts_with_types: if entry.name == account: return entry.account_type dr_or_cr = 0 account_type = None for gle in gl_entries: if gle.account in receivable_or_payable_accounts: account_type = get_account_type(gle.account) if account_type == "Receivable": dr_or_cr = gle.debit - gle.credit dr_or_cr_account_currency = gle.debit_in_account_currency - gle.credit_in_account_currency elif account_type == "Payable": dr_or_cr = gle.credit - gle.debit dr_or_cr_account_currency = gle.credit_in_account_currency - gle.debit_in_account_currency if cancel: dr_or_cr *= -1 dr_or_cr_account_currency *= -1 ple = frappe._dict( doctype="Payment Ledger Entry", posting_date=gle.posting_date, company=gle.company, account_type=account_type, account=gle.account, party_type=gle.party_type, party=gle.party, cost_center=gle.cost_center, finance_book=gle.finance_book, due_date=gle.due_date, voucher_type=gle.voucher_type, voucher_no=gle.voucher_no, voucher_detail_no=gle.voucher_detail_no, against_voucher_type=gle.against_voucher_type if gle.against_voucher_type else gle.voucher_type, against_voucher_no=gle.against_voucher if gle.against_voucher else gle.voucher_no, account_currency=gle.account_currency, amount=dr_or_cr, amount_in_account_currency=dr_or_cr_account_currency, delinked=True if cancel else False, remarks=gle.remarks, ) dimensions_and_defaults = get_dimensions() if dimensions_and_defaults: for dimension in dimensions_and_defaults[0]: ple[dimension.fieldname] = gle.get(dimension.fieldname) ple_map.append(ple) return ple_map def create_payment_ledger_entry( gl_entries, cancel=0, adv_adj=0, update_outstanding="Yes", from_repost=0, partial_cancel=False ): if gl_entries: ple_map = get_payment_ledger_entries(gl_entries, cancel=cancel) for entry in ple_map: ple = frappe.get_doc(entry) if cancel: delink_original_entry(ple, partial_cancel=partial_cancel) ple.flags.ignore_permissions = 1 ple.flags.adv_adj = adv_adj ple.flags.from_repost = from_repost ple.flags.update_outstanding = update_outstanding ple.submit() def update_voucher_outstanding(voucher_type, voucher_no, account, party_type, party): ple = frappe.qb.DocType("Payment Ledger Entry") vouchers = [frappe._dict({"voucher_type": voucher_type, "voucher_no": voucher_no})] common_filter = [] if account: common_filter.append(ple.account == account) if party_type: common_filter.append(ple.party_type == party_type) if party: common_filter.append(ple.party == party) ple_query = QueryPaymentLedger() # on cancellation outstanding can be an empty list voucher_outstanding = ple_query.get_voucher_outstandings(vouchers, common_filter=common_filter) if ( voucher_type in ["Sales Invoice", "Purchase Invoice", "Fees"] and party_type and party and voucher_outstanding ): outstanding = voucher_outstanding[0] ref_doc = frappe.get_doc(voucher_type, voucher_no) # Didn't use db_set for optimisation purpose ref_doc.outstanding_amount = outstanding["outstanding_in_account_currency"] or 0.0 frappe.db.set_value( voucher_type, voucher_no, "outstanding_amount", outstanding["outstanding_in_account_currency"] or 0.0, ) ref_doc.set_status(update=True) def delink_original_entry(pl_entry, partial_cancel=False): if pl_entry: ple = qb.DocType("Payment Ledger Entry") query = ( qb.update(ple) .set(ple.delinked, True) .set(ple.modified, now()) .set(ple.modified_by, frappe.session.user) .where( (ple.company == pl_entry.company) & (ple.account_type == pl_entry.account_type) & (ple.account == pl_entry.account) & (ple.party_type == pl_entry.party_type) & (ple.party == pl_entry.party) & (ple.voucher_type == pl_entry.voucher_type) & (ple.voucher_no == pl_entry.voucher_no) & (ple.against_voucher_type == pl_entry.against_voucher_type) & (ple.against_voucher_no == pl_entry.against_voucher_no) ) ) if partial_cancel: query = query.where(ple.voucher_detail_no == pl_entry.voucher_detail_no) query.run() class QueryPaymentLedger(object): """ Helper Class for Querying Payment Ledger Entry """ def __init__(self): self.ple = qb.DocType("Payment Ledger Entry") # query result self.voucher_outstandings = [] # query filters self.vouchers = [] self.common_filter = [] self.voucher_posting_date = [] self.min_outstanding = None self.max_outstanding = None self.limit = self.voucher_no = None def reset(self): # clear filters self.vouchers.clear() self.common_filter.clear() self.min_outstanding = self.max_outstanding = self.limit = None # clear result self.voucher_outstandings.clear() def query_for_outstanding(self): """ Database query to fetch voucher amount and voucher outstanding using Common Table Expression """ ple = self.ple filter_on_voucher_no = [] filter_on_against_voucher_no = [] if self.vouchers: voucher_types = set([x.voucher_type for x in self.vouchers]) voucher_nos = set([x.voucher_no for x in self.vouchers]) filter_on_voucher_no.append(ple.voucher_type.isin(voucher_types)) filter_on_voucher_no.append(ple.voucher_no.isin(voucher_nos)) filter_on_against_voucher_no.append(ple.against_voucher_type.isin(voucher_types)) filter_on_against_voucher_no.append(ple.against_voucher_no.isin(voucher_nos)) if self.voucher_no: filter_on_voucher_no.append(ple.voucher_no.like(f"%{self.voucher_no}%")) filter_on_against_voucher_no.append(ple.against_voucher_no.like(f"%{self.voucher_no}%")) # build outstanding amount filter filter_on_outstanding_amount = [] if self.min_outstanding: if self.min_outstanding > 0: filter_on_outstanding_amount.append( Table("outstanding").amount_in_account_currency >= self.min_outstanding ) else: filter_on_outstanding_amount.append( Table("outstanding").amount_in_account_currency <= self.min_outstanding ) if self.max_outstanding: if self.max_outstanding > 0: filter_on_outstanding_amount.append( Table("outstanding").amount_in_account_currency <= self.max_outstanding ) else: filter_on_outstanding_amount.append( Table("outstanding").amount_in_account_currency >= self.max_outstanding ) # build query for voucher amount query_voucher_amount = ( qb.from_(ple) .select( ple.account, ple.voucher_type, ple.voucher_no, ple.party_type, ple.party, ple.posting_date, ple.due_date, ple.account_currency.as_("currency"), ple.cost_center.as_("cost_center"), Sum(ple.amount).as_("amount"), Sum(ple.amount_in_account_currency).as_("amount_in_account_currency"), ) .where(ple.delinked == 0) .where(Criterion.all(filter_on_voucher_no)) .where(Criterion.all(self.common_filter)) .where(Criterion.all(self.dimensions_filter)) .where(Criterion.all(self.voucher_posting_date)) .groupby(ple.voucher_type, ple.voucher_no, ple.party_type, ple.party) ) # build query for voucher outstanding query_voucher_outstanding = ( qb.from_(ple) .select( ple.account, ple.against_voucher_type.as_("voucher_type"), ple.against_voucher_no.as_("voucher_no"), ple.party_type, ple.party, ple.posting_date, ple.due_date, ple.account_currency.as_("currency"), Sum(ple.amount).as_("amount"), Sum(ple.amount_in_account_currency).as_("amount_in_account_currency"), ) .where(ple.delinked == 0) .where(Criterion.all(filter_on_against_voucher_no)) .where(Criterion.all(self.common_filter)) .groupby(ple.against_voucher_type, ple.against_voucher_no, ple.party_type, ple.party) ) # build CTE for combining voucher amount and outstanding self.cte_query_voucher_amount_and_outstanding = ( qb.with_(query_voucher_amount, "vouchers") .with_(query_voucher_outstanding, "outstanding") .from_(AliasedQuery("vouchers")) .left_join(AliasedQuery("outstanding")) .on( (AliasedQuery("vouchers").account == AliasedQuery("outstanding").account) & (AliasedQuery("vouchers").voucher_type == AliasedQuery("outstanding").voucher_type) & (AliasedQuery("vouchers").voucher_no == AliasedQuery("outstanding").voucher_no) & (AliasedQuery("vouchers").party_type == AliasedQuery("outstanding").party_type) & (AliasedQuery("vouchers").party == AliasedQuery("outstanding").party) ) .select( Table("vouchers").account, Table("vouchers").voucher_type, Table("vouchers").voucher_no, Table("vouchers").party_type, Table("vouchers").party, Table("vouchers").posting_date, Table("vouchers").amount.as_("invoice_amount"), Table("vouchers").amount_in_account_currency.as_("invoice_amount_in_account_currency"), Table("outstanding").amount.as_("outstanding"), Table("outstanding").amount_in_account_currency.as_("outstanding_in_account_currency"), (Table("vouchers").amount - Table("outstanding").amount).as_("paid_amount"), ( Table("vouchers").amount_in_account_currency - Table("outstanding").amount_in_account_currency ).as_("paid_amount_in_account_currency"), Table("vouchers").due_date, Table("vouchers").currency, Table("vouchers").cost_center.as_("cost_center"), ) .where(Criterion.all(filter_on_outstanding_amount)) ) # build CTE filter # only fetch invoices if self.get_invoices: self.cte_query_voucher_amount_and_outstanding = ( self.cte_query_voucher_amount_and_outstanding.having( qb.Field("outstanding_in_account_currency") > 0 ) ) # only fetch payments elif self.get_payments: self.cte_query_voucher_amount_and_outstanding = ( self.cte_query_voucher_amount_and_outstanding.having( qb.Field("outstanding_in_account_currency") < 0 ) ) if self.limit: self.cte_query_voucher_amount_and_outstanding = ( self.cte_query_voucher_amount_and_outstanding.limit(self.limit) ) # execute SQL self.voucher_outstandings = self.cte_query_voucher_amount_and_outstanding.run(as_dict=True) def get_voucher_outstandings( self, vouchers=None, common_filter=None, posting_date=None, min_outstanding=None, max_outstanding=None, get_payments=False, get_invoices=False, accounting_dimensions=None, limit=None, voucher_no=None, ): """ Fetch voucher amount and outstanding amount from Payment Ledger using Database CTE vouchers - dict of vouchers to get common_filter - array of criterions min_outstanding - filter on minimum total outstanding amount max_outstanding - filter on maximum total outstanding amount get_invoices - only fetch vouchers(ledger entries with +ve outstanding) get_payments - only fetch payments(ledger entries with -ve outstanding) """ self.reset() self.vouchers = vouchers self.common_filter = common_filter or [] self.dimensions_filter = accounting_dimensions or [] self.voucher_posting_date = posting_date or [] self.min_outstanding = min_outstanding self.max_outstanding = max_outstanding self.get_payments = get_payments self.get_invoices = get_invoices self.limit = limit self.voucher_no = voucher_no self.query_for_outstanding() return self.voucher_outstandings def create_gain_loss_journal( company, posting_date, party_type, party, party_account, gain_loss_account, exc_gain_loss, dr_or_cr, reverse_dr_or_cr, ref1_dt, ref1_dn, ref1_detail_no, ref2_dt, ref2_dn, ref2_detail_no, cost_center, ) -> str: journal_entry = frappe.new_doc("Journal Entry") journal_entry.voucher_type = "Exchange Gain Or Loss" journal_entry.company = company journal_entry.posting_date = posting_date or nowdate() journal_entry.multi_currency = 1 journal_entry.is_system_generated = True party_account_currency = frappe.get_cached_value("Account", party_account, "account_currency") if not gain_loss_account: frappe.throw(_("Please set default Exchange Gain/Loss Account in Company {}").format(company)) gain_loss_account_currency = get_account_currency(gain_loss_account) company_currency = frappe.get_cached_value("Company", company, "default_currency") if gain_loss_account_currency != company_currency: frappe.throw(_("Currency for {0} must be {1}").format(gain_loss_account, company_currency)) journal_account = frappe._dict( { "account": party_account, "party_type": party_type, "party": party, "account_currency": party_account_currency, "exchange_rate": 0, "cost_center": cost_center or erpnext.get_default_cost_center(company), "reference_type": ref1_dt, "reference_name": ref1_dn, "reference_detail_no": ref1_detail_no, dr_or_cr: abs(exc_gain_loss), dr_or_cr + "_in_account_currency": 0, } ) journal_entry.append("accounts", journal_account) journal_account = frappe._dict( { "account": gain_loss_account, "account_currency": gain_loss_account_currency, "exchange_rate": 1, "cost_center": cost_center or erpnext.get_default_cost_center(company), "reference_type": ref2_dt, "reference_name": ref2_dn, "reference_detail_no": ref2_detail_no, reverse_dr_or_cr + "_in_account_currency": 0, reverse_dr_or_cr: abs(exc_gain_loss), } ) journal_entry.append("accounts", journal_account) journal_entry.save() journal_entry.submit() return journal_entry.name