refactor: tax withholding category against customer

This commit is contained in:
Saqib Ansari 2021-01-27 16:59:07 +05:30
parent 5ba2f58355
commit 6f718a31f0
2 changed files with 175 additions and 132 deletions

View File

@ -12,37 +12,54 @@ from erpnext.accounts.utils import get_fiscal_year
class TaxWithholdingCategory(Document):
pass
def get_party_tax_withholding_details(ref_doc, tax_withholding_category=None):
def get_party_details(ref_doc):
party_type, party = '', ''
if ref_doc.doctype == 'Sales Invoice':
party_type = 'Customer'
party = ref_doc.customer
else:
party_type = 'Supplier'
party = ref_doc.supplier
return party_type, party
def get_party_tax_withholding_details(ref_doc, tax_withholding_category=None):
pan_no = ''
suppliers = []
parties = []
party_type, party = get_party_details(ref_doc)
if not tax_withholding_category:
tax_withholding_category, pan_no = frappe.db.get_value('Supplier', ref_doc.supplier, ['tax_withholding_category', 'pan'])
tax_withholding_category, pan_no = frappe.db.get_value(party_type, party, ['tax_withholding_category', 'pan'])
if not tax_withholding_category:
return
# if tax_withholding_category passed as an argument but not pan_no
if not pan_no:
pan_no = frappe.db.get_value('Supplier', ref_doc.supplier, 'pan')
pan_no = frappe.db.get_value(party_type, party, 'pan')
# Get others suppliers with the same PAN No
if pan_no:
suppliers = [d.name for d in frappe.get_all('Supplier', fields=['name'], filters={'pan': pan_no})]
parties = frappe.get_all(party_type, filters={ 'pan': pan_no }, pluck='name')
if not suppliers:
suppliers.append(ref_doc.supplier)
if not parties:
parties.append(party)
fiscal_year = get_fiscal_year(ref_doc.posting_date, company=ref_doc.company)
tax_details = get_tax_withholding_details(tax_withholding_category, fiscal_year[0], ref_doc.company)
fy = get_fiscal_year(ref_doc.posting_date, company=ref_doc.company)
tax_details = get_tax_withholding_details(tax_withholding_category, fy[0], ref_doc.company)
if not tax_details:
frappe.throw(_('Please set associated account in Tax Withholding Category {0} against Company {1}')
.format(tax_withholding_category, ref_doc.company))
tds_amount = get_tds_amount(suppliers, ref_doc.net_total, ref_doc.company,
tax_details, fy, ref_doc.posting_date, pan_no)
tax_amount = get_tax_amount(
party_type, parties,
ref_doc, tax_details,
fiscal_year, pan_no
)
tax_row = get_tax_row(tax_details, tds_amount)
tax_row = get_tax_row(tax_details, tax_amount)
return tax_row
@ -69,147 +86,162 @@ def get_tax_withholding_rates(tax_withholding, fiscal_year):
frappe.throw(_("No Tax Withholding data found for the current Fiscal Year."))
def get_tax_row(tax_details, tds_amount):
def get_tax_row(tax_details, tax_amount):
return {
"category": "Total",
"add_deduct_tax": "Deduct",
"charge_type": "Actual",
"account_head": tax_details.account_head,
"description": tax_details.description,
"tax_amount": tds_amount
"tax_amount": tax_amount
}
def get_tds_amount(suppliers, net_total, company, tax_details, fiscal_year_details, posting_date, pan_no=None):
fiscal_year, year_start_date, year_end_date = fiscal_year_details
tds_amount = 0
tds_deducted = 0
def _get_tds(amount, rate):
if amount <= 0:
return 0
return amount * rate / 100
ldc_name = frappe.db.get_value('Lower Deduction Certificate',
{
'pan_no': pan_no,
'fiscal_year': fiscal_year
}, 'name')
ldc = ''
def get_lower_deduction_certificate(fiscal_year, pan_no):
ldc_name = frappe.db.get_value('Lower Deduction Certificate', { 'pan_no': pan_no, 'fiscal_year': fiscal_year }, 'name')
if ldc_name:
ldc = frappe.get_doc('Lower Deduction Certificate', ldc_name)
return frappe.get_doc('Lower Deduction Certificate', ldc_name)
entries = frappe.db.sql("""
select voucher_no, credit
from `tabGL Entry`
where company = %s and
party in %s and fiscal_year=%s and credit > 0
and is_opening = 'No'
""", (company, tuple(suppliers), fiscal_year), as_dict=1)
def get_tax_amount(party_type, parties, ref_doc, tax_details, fiscal_year_details, pan_no=None):
fiscal_year = fiscal_year_details[0]
vouchers = [d.voucher_no for d in entries]
advance_vouchers = get_advance_vouchers(suppliers, fiscal_year=fiscal_year, company=company)
vouchers = get_invoice_vouchers(parties, fiscal_year, ref_doc.company, party_type=party_type) or [""]
advance_vouchers = get_advance_vouchers(parties, fiscal_year, ref_doc.company, party_type=party_type)
tax_vouchers = vouchers + advance_vouchers
tds_vouchers = vouchers + advance_vouchers
tax_deducted = 0
dr_or_cr = 'credit' if party_type == 'Supplier' else 'debit'
if tax_vouchers:
filters = {
dr_or_cr: ['>', 0],
'account': tax_details.account_head,
'fiscal_year': fiscal_year,
'voucher_no': ['in', tax_vouchers],
'is_cancelled': 0
}
field = "sum({})".format(dr_or_cr)
if tds_vouchers:
tds_deducted = frappe.db.sql("""
SELECT sum(credit) FROM `tabGL Entry`
WHERE
account=%s and fiscal_year=%s and credit > 0
and voucher_no in ({0})""". format(','.join(['%s'] * len(tds_vouchers))),
((tax_details.account_head, fiscal_year) + tuple(tds_vouchers)))
tax_deducted = frappe.db.get_value('GL Entry', filters, field) or 0.0
tds_deducted = tds_deducted[0][0] if tds_deducted and tds_deducted[0][0] else 0
tax_amount = 0
if party_type == 'Supplier':
net_total = ref_doc.net_total
posting_date = ref_doc.posting_date
ldc = get_lower_deduction_certificate(fiscal_year, pan_no)
if tds_deducted:
if ldc:
limit_consumed = frappe.db.get_value('Purchase Invoice',
{
'supplier': ('in', suppliers),
'apply_tds': 1,
'docstatus': 1
}, 'sum(net_total)')
if ldc and is_valid_certificate(ldc.valid_from, ldc.valid_upto, posting_date, limit_consumed, net_total,
ldc.certificate_limit):
tds_amount = get_ltds_amount(net_total, limit_consumed, ldc.certificate_limit, ldc.rate, tax_details)
else:
tds_amount = _get_tds(net_total, tax_details.rate)
else:
supplier_credit_amount = frappe.get_all('Purchase Invoice',
fields = ['sum(net_total)'],
filters = {'name': ('in', vouchers), 'docstatus': 1, "apply_tds": 1}, as_list=1)
supplier_credit_amount = (supplier_credit_amount[0][0]
if supplier_credit_amount and supplier_credit_amount[0][0] else 0)
jv_supplier_credit_amt = frappe.get_all('Journal Entry Account',
fields = ['sum(credit_in_account_currency)'],
filters = {
'parent': ('in', vouchers), 'docstatus': 1,
'party': ('in', suppliers),
'reference_type': ('not in', ['Purchase Invoice'])
}, as_list=1)
supplier_credit_amount += (jv_supplier_credit_amt[0][0]
if jv_supplier_credit_amt and jv_supplier_credit_amt[0][0] else 0)
supplier_credit_amount += net_total
debit_note_amount = get_debit_note_amount(suppliers, year_start_date, year_end_date)
supplier_credit_amount -= debit_note_amount
if ((tax_details.get('threshold', 0) and supplier_credit_amount >= tax_details.threshold)
or (tax_details.get('cumulative_threshold', 0) and supplier_credit_amount >= tax_details.cumulative_threshold)):
if ldc and is_valid_certificate(ldc.valid_from, ldc.valid_upto, posting_date, tds_deducted, net_total,
ldc.certificate_limit):
tds_amount = get_ltds_amount(supplier_credit_amount, 0, ldc.certificate_limit, ldc.rate,
tax_details)
if tax_deducted:
if ldc:
tax_amount = get_tds_amount_from_ldc(ldc, parties, fiscal_year, pan_no, tax_details, posting_date, net_total)
else:
tds_amount = _get_tds(supplier_credit_amount, tax_details.rate)
tax_amount = net_total * tax_details.rate / 100 if net_total > 0 else 0
else:
tax_amount = get_tds_amount(
ldc, parties, ref_doc, tax_details,
fiscal_year_details, vouchers
)
return tax_amount
def get_invoice_vouchers(parties, fiscal_year, company, party_type='Supplier'):
dr_or_cr = 'credit' if party_type == 'Supplier' else 'debit'
filters = {
dr_or_cr: ['>', 0],
'company': company,
'party_type': party_type,
'party': ['in', parties],
'fiscal_year': fiscal_year,
'is_opening': 'No',
'is_cancelled': 0
}
return frappe.get_all('GL Entry', filters=filters, distinct=1, pluck="voucher_no")
def get_advance_vouchers(parties, fiscal_year=None, company=None, from_date=None, to_date=None, party_type='Supplier'):
# for advance vouchers, debit and credit is reversed
dr_or_cr = 'debit' if party_type == 'Supplier' else 'credit'
filters = {
dr_or_cr: ['>', 0],
'party_type': party_type,
'party': ['in', parties],
'is_opening': 'No',
'is_cancelled': 0
}
if fiscal_year:
filters['fiscal_year'] = fiscal_year
if company:
filters['company'] = company
if from_date and to_date:
filters['posting_date'] = ['between', (from_date, to_date)]
return frappe.get_all('GL Entry', filters=filters, distinct=1, pluck='voucher_no')
def get_tds_amount(ldc, parties, ref_doc, tax_details, fiscal_year_details, vouchers):
tds_amount = 0
supp_credit_amt = frappe.db.get_value('Purchase Invoice', {
'name': ('in', vouchers), 'docstatus': 1, 'apply_tds': 1
}, 'sum(net_total)') or 0.0
supp_jv_credit_amt = frappe.db.get_value('Journal Entry Account', {
'parent': ('in', vouchers), 'docstatus': 1,
'party': ('in', parties), 'reference_type': ('!=', 'Purchase Invoice')
}, 'sum(credit_in_account_currency)') or 0.0
supp_credit_amt += supp_jv_credit_amt
supp_credit_amt += ref_doc.net_total
debit_note_amount = get_debit_note_amount(parties, fiscal_year_details, ref_doc.company)
supp_credit_amt -= debit_note_amount
threshold = tax_details.get('threshold', 0)
cumulative_threshold = tax_details.get('cumulative_threshold', 0)
if ((threshold and supp_credit_amt >= threshold) or (cumulative_threshold and supp_credit_amt >= cumulative_threshold)):
if ldc and is_valid_certificate(
ldc.valid_from, ldc.valid_upto,
ref_doc.posting_date, tax_deducted,
net_total, ldc.certificate_limit
):
tds_amount = get_ltds_amount(supp_credit_amt, 0, ldc.certificate_limit, ldc.rate, tax_details)
else:
tds_amount = supp_credit_amt * tax_details.rate / 100 if supp_credit_amt > 0 else 0
return tds_amount
def get_advance_vouchers(suppliers, fiscal_year=None, company=None, from_date=None, to_date=None):
condition = "fiscal_year=%s" % fiscal_year
def get_tds_amount_from_ldc(ldc, parties, fiscal_year, pan_no, tax_details, posting_date, net_total):
tds_amount = 0
limit_consumed = frappe.db.get_value('Purchase Invoice', {
'supplier': ('in', parties),
'apply_tds': 1,
'docstatus': 1
}, 'sum(net_total)')
if is_valid_certificate(
ldc.valid_from, ldc.valid_upto,
posting_date, limit_consumed,
net_total, ldc.certificate_limit
):
tds_amount = get_ltds_amount(net_total, limit_consumed, ldc.certificate_limit, ldc.rate, tax_details)
return tds_amount
def get_debit_note_amount(suppliers, fiscal_year_details, company=None):
_, year_start_date, year_end_date = fiscal_year_details
filters = {
'supplier': ['in', suppliers],
'is_return': 1,
'docstatus': 1,
'posting_date': ['between', (year_start_date, year_end_date)]
}
fields = ['abs(sum(net_total)) as net_total']
if company:
condition += "and company =%s" % (company)
if from_date and to_date:
condition += "and posting_date between %s and %s" % (from_date, to_date)
filters['company'] = company
## Appending the same supplier again if length of suppliers list is 1
## since tuple of single element list contains None, For example ('Test Supplier 1', )
## and the below query fails
if len(suppliers) == 1:
suppliers.append(suppliers[0])
return frappe.db.sql_list("""
select distinct voucher_no
from `tabGL Entry`
where party in %s and %s and debit > 0
and is_opening = 'No'
""", (tuple(suppliers), condition)) or []
def get_debit_note_amount(suppliers, year_start_date, year_end_date, company=None):
condition = "and 1=1"
if company:
condition = " and company=%s " % company
if len(suppliers) == 1:
suppliers.append(suppliers[0])
return flt(frappe.db.sql("""
select abs(sum(net_total))
from `tabPurchase Invoice`
where supplier in %s and is_return=1 and docstatus=1
and posting_date between %s and %s %s
""", (tuple(suppliers), year_start_date, year_end_date, condition)))
return frappe.get_all('Purchase Invoice', filters, fields)[0].get('net_total') or 0.0
def get_ltds_amount(current_amount, deducted_amount, certificate_limit, rate, tax_details):
if current_amount < (certificate_limit - deducted_amount):
@ -227,4 +259,4 @@ def is_valid_certificate(valid_from, valid_upto, posting_date, deducted_amount,
certificate_limit > deducted_amount):
valid = True
return valid
return valid

View File

@ -18,6 +18,17 @@ class TestTaxWithholdingCategory(unittest.TestCase):
create_records()
create_tax_with_holding_category()
def tearDown(self):
frappe.db.sql('delete from `tabPurchase Invoice` where supplier = "Test TDS Supplier"')
frappe.db.sql('delete from `tabPurchase Invoice` where supplier = "Test TDS Supplier1"')
frappe.db.sql('delete from `tabPurchase Invoice` where supplier = "Test TDS Supplier2"')
frappe.db.sql('delete from `tabPurchase Invoice` where supplier = "Test TDS Supplier ABC"')
frappe.db.sql('delete from `tabGL Entry` where party = "Test TDS Supplier"')
frappe.db.sql('delete from `tabGL Entry` where party = "Test TDS Supplier1"')
frappe.db.sql('delete from `tabGL Entry` where party = "Test TDS Supplier2"')
frappe.db.sql('delete from `tabGL Entry` where party = "Test TDS Supplier ABC"')
def test_cumulative_threshold_tds(self):
frappe.db.set_value("Supplier", "Test TDS Supplier", "tax_withholding_category", "Cumulative Threshold TDS")
invoices = []