From 6f718a31f0a04a75018a408e285a3092db10b064 Mon Sep 17 00:00:00 2001 From: Saqib Ansari Date: Wed, 27 Jan 2021 16:59:07 +0530 Subject: [PATCH] refactor: tax withholding category against customer --- .../tax_withholding_category.py | 296 ++++++++++-------- .../test_tax_withholding_category.py | 11 + 2 files changed, 175 insertions(+), 132 deletions(-) diff --git a/erpnext/accounts/doctype/tax_withholding_category/tax_withholding_category.py b/erpnext/accounts/doctype/tax_withholding_category/tax_withholding_category.py index 32ad4cb03a..3e0ba9ac6a 100644 --- a/erpnext/accounts/doctype/tax_withholding_category/tax_withholding_category.py +++ b/erpnext/accounts/doctype/tax_withholding_category/tax_withholding_category.py @@ -12,37 +12,54 @@ from erpnext.accounts.utils import get_fiscal_year class TaxWithholdingCategory(Document): pass -def get_party_tax_withholding_details(ref_doc, tax_withholding_category=None): +def get_party_details(ref_doc): + party_type, party = '', '' + if ref_doc.doctype == 'Sales Invoice': + party_type = 'Customer' + party = ref_doc.customer + else: + party_type = 'Supplier' + party = ref_doc.supplier + + return party_type, party + +def get_party_tax_withholding_details(ref_doc, tax_withholding_category=None): pan_no = '' - suppliers = [] + parties = [] + party_type, party = get_party_details(ref_doc) if not tax_withholding_category: - tax_withholding_category, pan_no = frappe.db.get_value('Supplier', ref_doc.supplier, ['tax_withholding_category', 'pan']) + tax_withholding_category, pan_no = frappe.db.get_value(party_type, party, ['tax_withholding_category', 'pan']) if not tax_withholding_category: return + # if tax_withholding_category passed as an argument but not pan_no if not pan_no: - pan_no = frappe.db.get_value('Supplier', ref_doc.supplier, 'pan') + pan_no = frappe.db.get_value(party_type, party, 'pan') # Get others suppliers with the same PAN No if pan_no: - suppliers = [d.name for d in frappe.get_all('Supplier', fields=['name'], filters={'pan': pan_no})] + parties = frappe.get_all(party_type, filters={ 'pan': pan_no }, pluck='name') - if not suppliers: - suppliers.append(ref_doc.supplier) + if not parties: + parties.append(party) + + fiscal_year = get_fiscal_year(ref_doc.posting_date, company=ref_doc.company) + tax_details = get_tax_withholding_details(tax_withholding_category, fiscal_year[0], ref_doc.company) - fy = get_fiscal_year(ref_doc.posting_date, company=ref_doc.company) - tax_details = get_tax_withholding_details(tax_withholding_category, fy[0], ref_doc.company) if not tax_details: frappe.throw(_('Please set associated account in Tax Withholding Category {0} against Company {1}') .format(tax_withholding_category, ref_doc.company)) - tds_amount = get_tds_amount(suppliers, ref_doc.net_total, ref_doc.company, - tax_details, fy, ref_doc.posting_date, pan_no) + tax_amount = get_tax_amount( + party_type, parties, + ref_doc, tax_details, + fiscal_year, pan_no + ) - tax_row = get_tax_row(tax_details, tds_amount) + tax_row = get_tax_row(tax_details, tax_amount) return tax_row @@ -69,147 +86,162 @@ def get_tax_withholding_rates(tax_withholding, fiscal_year): frappe.throw(_("No Tax Withholding data found for the current Fiscal Year.")) -def get_tax_row(tax_details, tds_amount): - +def get_tax_row(tax_details, tax_amount): return { "category": "Total", "add_deduct_tax": "Deduct", "charge_type": "Actual", "account_head": tax_details.account_head, "description": tax_details.description, - "tax_amount": tds_amount + "tax_amount": tax_amount } -def get_tds_amount(suppliers, net_total, company, tax_details, fiscal_year_details, posting_date, pan_no=None): - fiscal_year, year_start_date, year_end_date = fiscal_year_details - tds_amount = 0 - tds_deducted = 0 - - def _get_tds(amount, rate): - if amount <= 0: - return 0 - - return amount * rate / 100 - - ldc_name = frappe.db.get_value('Lower Deduction Certificate', - { - 'pan_no': pan_no, - 'fiscal_year': fiscal_year - }, 'name') - ldc = '' - +def get_lower_deduction_certificate(fiscal_year, pan_no): + ldc_name = frappe.db.get_value('Lower Deduction Certificate', { 'pan_no': pan_no, 'fiscal_year': fiscal_year }, 'name') if ldc_name: - ldc = frappe.get_doc('Lower Deduction Certificate', ldc_name) + return frappe.get_doc('Lower Deduction Certificate', ldc_name) - entries = frappe.db.sql(""" - select voucher_no, credit - from `tabGL Entry` - where company = %s and - party in %s and fiscal_year=%s and credit > 0 - and is_opening = 'No' - """, (company, tuple(suppliers), fiscal_year), as_dict=1) +def get_tax_amount(party_type, parties, ref_doc, tax_details, fiscal_year_details, pan_no=None): + fiscal_year = fiscal_year_details[0] - vouchers = [d.voucher_no for d in entries] - advance_vouchers = get_advance_vouchers(suppliers, fiscal_year=fiscal_year, company=company) + vouchers = get_invoice_vouchers(parties, fiscal_year, ref_doc.company, party_type=party_type) or [""] + advance_vouchers = get_advance_vouchers(parties, fiscal_year, ref_doc.company, party_type=party_type) + tax_vouchers = vouchers + advance_vouchers - tds_vouchers = vouchers + advance_vouchers + tax_deducted = 0 + dr_or_cr = 'credit' if party_type == 'Supplier' else 'debit' + if tax_vouchers: + filters = { + dr_or_cr: ['>', 0], + 'account': tax_details.account_head, + 'fiscal_year': fiscal_year, + 'voucher_no': ['in', tax_vouchers], + 'is_cancelled': 0 + } + field = "sum({})".format(dr_or_cr) - if tds_vouchers: - tds_deducted = frappe.db.sql(""" - SELECT sum(credit) FROM `tabGL Entry` - WHERE - account=%s and fiscal_year=%s and credit > 0 - and voucher_no in ({0})""". format(','.join(['%s'] * len(tds_vouchers))), - ((tax_details.account_head, fiscal_year) + tuple(tds_vouchers))) + tax_deducted = frappe.db.get_value('GL Entry', filters, field) or 0.0 - tds_deducted = tds_deducted[0][0] if tds_deducted and tds_deducted[0][0] else 0 + tax_amount = 0 + if party_type == 'Supplier': + net_total = ref_doc.net_total + posting_date = ref_doc.posting_date + ldc = get_lower_deduction_certificate(fiscal_year, pan_no) - if tds_deducted: - if ldc: - limit_consumed = frappe.db.get_value('Purchase Invoice', - { - 'supplier': ('in', suppliers), - 'apply_tds': 1, - 'docstatus': 1 - }, 'sum(net_total)') - - if ldc and is_valid_certificate(ldc.valid_from, ldc.valid_upto, posting_date, limit_consumed, net_total, - ldc.certificate_limit): - - tds_amount = get_ltds_amount(net_total, limit_consumed, ldc.certificate_limit, ldc.rate, tax_details) - else: - tds_amount = _get_tds(net_total, tax_details.rate) - else: - supplier_credit_amount = frappe.get_all('Purchase Invoice', - fields = ['sum(net_total)'], - filters = {'name': ('in', vouchers), 'docstatus': 1, "apply_tds": 1}, as_list=1) - - supplier_credit_amount = (supplier_credit_amount[0][0] - if supplier_credit_amount and supplier_credit_amount[0][0] else 0) - - jv_supplier_credit_amt = frappe.get_all('Journal Entry Account', - fields = ['sum(credit_in_account_currency)'], - filters = { - 'parent': ('in', vouchers), 'docstatus': 1, - 'party': ('in', suppliers), - 'reference_type': ('not in', ['Purchase Invoice']) - }, as_list=1) - - supplier_credit_amount += (jv_supplier_credit_amt[0][0] - if jv_supplier_credit_amt and jv_supplier_credit_amt[0][0] else 0) - - supplier_credit_amount += net_total - - debit_note_amount = get_debit_note_amount(suppliers, year_start_date, year_end_date) - supplier_credit_amount -= debit_note_amount - - if ((tax_details.get('threshold', 0) and supplier_credit_amount >= tax_details.threshold) - or (tax_details.get('cumulative_threshold', 0) and supplier_credit_amount >= tax_details.cumulative_threshold)): - - if ldc and is_valid_certificate(ldc.valid_from, ldc.valid_upto, posting_date, tds_deducted, net_total, - ldc.certificate_limit): - tds_amount = get_ltds_amount(supplier_credit_amount, 0, ldc.certificate_limit, ldc.rate, - tax_details) + if tax_deducted: + if ldc: + tax_amount = get_tds_amount_from_ldc(ldc, parties, fiscal_year, pan_no, tax_details, posting_date, net_total) else: - tds_amount = _get_tds(supplier_credit_amount, tax_details.rate) + tax_amount = net_total * tax_details.rate / 100 if net_total > 0 else 0 + else: + tax_amount = get_tds_amount( + ldc, parties, ref_doc, tax_details, + fiscal_year_details, vouchers + ) + + return tax_amount + +def get_invoice_vouchers(parties, fiscal_year, company, party_type='Supplier'): + dr_or_cr = 'credit' if party_type == 'Supplier' else 'debit' + + filters = { + dr_or_cr: ['>', 0], + 'company': company, + 'party_type': party_type, + 'party': ['in', parties], + 'fiscal_year': fiscal_year, + 'is_opening': 'No', + 'is_cancelled': 0 + } + + return frappe.get_all('GL Entry', filters=filters, distinct=1, pluck="voucher_no") + +def get_advance_vouchers(parties, fiscal_year=None, company=None, from_date=None, to_date=None, party_type='Supplier'): + # for advance vouchers, debit and credit is reversed + dr_or_cr = 'debit' if party_type == 'Supplier' else 'credit' + + filters = { + dr_or_cr: ['>', 0], + 'party_type': party_type, + 'party': ['in', parties], + 'is_opening': 'No', + 'is_cancelled': 0 + } + + if fiscal_year: + filters['fiscal_year'] = fiscal_year + if company: + filters['company'] = company + if from_date and to_date: + filters['posting_date'] = ['between', (from_date, to_date)] + + return frappe.get_all('GL Entry', filters=filters, distinct=1, pluck='voucher_no') + +def get_tds_amount(ldc, parties, ref_doc, tax_details, fiscal_year_details, vouchers): + tds_amount = 0 + + supp_credit_amt = frappe.db.get_value('Purchase Invoice', { + 'name': ('in', vouchers), 'docstatus': 1, 'apply_tds': 1 + }, 'sum(net_total)') or 0.0 + + supp_jv_credit_amt = frappe.db.get_value('Journal Entry Account', { + 'parent': ('in', vouchers), 'docstatus': 1, + 'party': ('in', parties), 'reference_type': ('!=', 'Purchase Invoice') + }, 'sum(credit_in_account_currency)') or 0.0 + + supp_credit_amt += supp_jv_credit_amt + supp_credit_amt += ref_doc.net_total + + debit_note_amount = get_debit_note_amount(parties, fiscal_year_details, ref_doc.company) + supp_credit_amt -= debit_note_amount + + threshold = tax_details.get('threshold', 0) + cumulative_threshold = tax_details.get('cumulative_threshold', 0) + + if ((threshold and supp_credit_amt >= threshold) or (cumulative_threshold and supp_credit_amt >= cumulative_threshold)): + if ldc and is_valid_certificate( + ldc.valid_from, ldc.valid_upto, + ref_doc.posting_date, tax_deducted, + net_total, ldc.certificate_limit + ): + tds_amount = get_ltds_amount(supp_credit_amt, 0, ldc.certificate_limit, ldc.rate, tax_details) + else: + tds_amount = supp_credit_amt * tax_details.rate / 100 if supp_credit_amt > 0 else 0 return tds_amount -def get_advance_vouchers(suppliers, fiscal_year=None, company=None, from_date=None, to_date=None): - condition = "fiscal_year=%s" % fiscal_year +def get_tds_amount_from_ldc(ldc, parties, fiscal_year, pan_no, tax_details, posting_date, net_total): + tds_amount = 0 + limit_consumed = frappe.db.get_value('Purchase Invoice', { + 'supplier': ('in', parties), + 'apply_tds': 1, + 'docstatus': 1 + }, 'sum(net_total)') + + if is_valid_certificate( + ldc.valid_from, ldc.valid_upto, + posting_date, limit_consumed, + net_total, ldc.certificate_limit + ): + tds_amount = get_ltds_amount(net_total, limit_consumed, ldc.certificate_limit, ldc.rate, tax_details) + + return tds_amount + +def get_debit_note_amount(suppliers, fiscal_year_details, company=None): + _, year_start_date, year_end_date = fiscal_year_details + + filters = { + 'supplier': ['in', suppliers], + 'is_return': 1, + 'docstatus': 1, + 'posting_date': ['between', (year_start_date, year_end_date)] + } + fields = ['abs(sum(net_total)) as net_total'] if company: - condition += "and company =%s" % (company) - if from_date and to_date: - condition += "and posting_date between %s and %s" % (from_date, to_date) + filters['company'] = company - ## Appending the same supplier again if length of suppliers list is 1 - ## since tuple of single element list contains None, For example ('Test Supplier 1', ) - ## and the below query fails - if len(suppliers) == 1: - suppliers.append(suppliers[0]) - - return frappe.db.sql_list(""" - select distinct voucher_no - from `tabGL Entry` - where party in %s and %s and debit > 0 - and is_opening = 'No' - """, (tuple(suppliers), condition)) or [] - -def get_debit_note_amount(suppliers, year_start_date, year_end_date, company=None): - condition = "and 1=1" - if company: - condition = " and company=%s " % company - - if len(suppliers) == 1: - suppliers.append(suppliers[0]) - - return flt(frappe.db.sql(""" - select abs(sum(net_total)) - from `tabPurchase Invoice` - where supplier in %s and is_return=1 and docstatus=1 - and posting_date between %s and %s %s - """, (tuple(suppliers), year_start_date, year_end_date, condition))) + return frappe.get_all('Purchase Invoice', filters, fields)[0].get('net_total') or 0.0 def get_ltds_amount(current_amount, deducted_amount, certificate_limit, rate, tax_details): if current_amount < (certificate_limit - deducted_amount): @@ -227,4 +259,4 @@ def is_valid_certificate(valid_from, valid_upto, posting_date, deducted_amount, certificate_limit > deducted_amount): valid = True - return valid \ No newline at end of file + return valid diff --git a/erpnext/accounts/doctype/tax_withholding_category/test_tax_withholding_category.py b/erpnext/accounts/doctype/tax_withholding_category/test_tax_withholding_category.py index ef77674372..2b387f965a 100644 --- a/erpnext/accounts/doctype/tax_withholding_category/test_tax_withholding_category.py +++ b/erpnext/accounts/doctype/tax_withholding_category/test_tax_withholding_category.py @@ -18,6 +18,17 @@ class TestTaxWithholdingCategory(unittest.TestCase): create_records() create_tax_with_holding_category() + def tearDown(self): + frappe.db.sql('delete from `tabPurchase Invoice` where supplier = "Test TDS Supplier"') + frappe.db.sql('delete from `tabPurchase Invoice` where supplier = "Test TDS Supplier1"') + frappe.db.sql('delete from `tabPurchase Invoice` where supplier = "Test TDS Supplier2"') + frappe.db.sql('delete from `tabPurchase Invoice` where supplier = "Test TDS Supplier ABC"') + + frappe.db.sql('delete from `tabGL Entry` where party = "Test TDS Supplier"') + frappe.db.sql('delete from `tabGL Entry` where party = "Test TDS Supplier1"') + frappe.db.sql('delete from `tabGL Entry` where party = "Test TDS Supplier2"') + frappe.db.sql('delete from `tabGL Entry` where party = "Test TDS Supplier ABC"') + def test_cumulative_threshold_tds(self): frappe.db.set_value("Supplier", "Test TDS Supplier", "tax_withholding_category", "Cumulative Threshold TDS") invoices = []