brotherton-erpnext/erpnext/regional/india/utils.py
2019-12-12 14:55:57 +05:30

644 lines
23 KiB
Python

from __future__ import unicode_literals
import frappe, re, json
from frappe import _
from frappe.utils import cstr, flt, date_diff, nowdate
from erpnext.regional.india import states, state_numbers
from erpnext.controllers.taxes_and_totals import get_itemised_tax, get_itemised_taxable_amount
from erpnext.controllers.accounts_controller import get_taxes_and_charges
from erpnext.hr.utils import get_salary_assignment
from erpnext.hr.doctype.salary_structure.salary_structure import make_salary_slip
from erpnext.regional.india import number_state_mapping
from six import string_types
def validate_gstin_for_india(doc, method):
if hasattr(doc, 'gst_state') and doc.gst_state:
doc.gst_state_number = state_numbers[doc.gst_state]
if not hasattr(doc, 'gstin') or not doc.gstin:
return
gst_category = []
if len(doc.links):
link_doctype = doc.links[0].get("link_doctype")
link_name = doc.links[0].get("link_name")
if link_doctype in ["Customer", "Supplier"]:
gst_category = frappe.db.get_value(link_doctype, {'name': link_name}, ['gst_category'])
doc.gstin = doc.gstin.upper().strip()
if not doc.gstin or doc.gstin == 'NA':
return
if len(doc.gstin) != 15:
frappe.throw(_("Invalid GSTIN! A GSTIN must have 15 characters."))
if gst_category and gst_category == 'UIN Holders':
p = re.compile("^[0-9]{4}[A-Z]{3}[0-9]{5}[0-9A-Z]{3}")
if not p.match(doc.gstin):
frappe.throw(_("Invalid GSTIN! The input you've entered doesn't match the GSTIN format for UIN Holders or Non-Resident OIDAR Service Providers"))
else:
p = re.compile("^[0-9]{2}[A-Z]{4}[0-9A-Z]{1}[0-9]{4}[A-Z]{1}[1-9A-Z]{1}[1-9A-Z]{1}[0-9A-Z]{1}$")
if not p.match(doc.gstin):
frappe.throw(_("Invalid GSTIN! The input you've entered doesn't match the format of GSTIN."))
validate_gstin_check_digit(doc.gstin)
set_gst_state_and_state_number(doc)
if doc.gst_state_number != doc.gstin[:2]:
frappe.throw(_("Invalid GSTIN! First 2 digits of GSTIN should match with State number {0}.")
.format(doc.gst_state_number))
def update_gst_category(doc, method):
for link in doc.links:
if link.link_doctype in ['Customer', 'Supplier']:
if doc.get('gstin'):
frappe.db.sql("""
UPDATE `tab{0}` SET gst_category = %s WHERE name = %s AND gst_category = 'Unregistered'
""".format(link.link_doctype), ("Registered Regular", link.link_name)) #nosec
def set_gst_state_and_state_number(doc):
if not doc.gst_state:
if not doc.state:
return
state = doc.state.lower()
states_lowercase = {s.lower():s for s in states}
if state in states_lowercase:
doc.gst_state = states_lowercase[state]
else:
return
doc.gst_state_number = state_numbers[doc.gst_state]
def validate_gstin_check_digit(gstin, label='GSTIN'):
''' Function to validate the check digit of the GSTIN.'''
factor = 1
total = 0
code_point_chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'
mod = len(code_point_chars)
input_chars = gstin[:-1]
for char in input_chars:
digit = factor * code_point_chars.find(char)
digit = (digit // mod) + (digit % mod)
total += digit
factor = 2 if factor == 1 else 1
if gstin[-1] != code_point_chars[((mod - (total % mod)) % mod)]:
frappe.throw(_("""Invalid {0}! The check digit validation has failed.
Please ensure you've typed the {0} correctly.""".format(label)))
def get_itemised_tax_breakup_header(item_doctype, tax_accounts):
if frappe.get_meta(item_doctype).has_field('gst_hsn_code'):
return [_("HSN/SAC"), _("Taxable Amount")] + tax_accounts
else:
return [_("Item"), _("Taxable Amount")] + tax_accounts
def get_itemised_tax_breakup_data(doc, account_wise=False):
itemised_tax = get_itemised_tax(doc.taxes, with_tax_account=account_wise)
itemised_taxable_amount = get_itemised_taxable_amount(doc.items)
if not frappe.get_meta(doc.doctype + " Item").has_field('gst_hsn_code'):
return itemised_tax, itemised_taxable_amount
item_hsn_map = frappe._dict()
for d in doc.items:
item_hsn_map.setdefault(d.item_code or d.item_name, d.get("gst_hsn_code"))
hsn_tax = {}
for item, taxes in itemised_tax.items():
hsn_code = item_hsn_map.get(item)
hsn_tax.setdefault(hsn_code, frappe._dict())
for tax_desc, tax_detail in taxes.items():
key = tax_desc
if account_wise:
key = tax_detail.get('tax_account')
hsn_tax[hsn_code].setdefault(key, {"tax_rate": 0, "tax_amount": 0})
hsn_tax[hsn_code][key]["tax_rate"] = tax_detail.get("tax_rate")
hsn_tax[hsn_code][key]["tax_amount"] += tax_detail.get("tax_amount")
# set taxable amount
hsn_taxable_amount = frappe._dict()
for item in itemised_taxable_amount:
hsn_code = item_hsn_map.get(item)
hsn_taxable_amount.setdefault(hsn_code, 0)
hsn_taxable_amount[hsn_code] += itemised_taxable_amount.get(item)
return hsn_tax, hsn_taxable_amount
def set_place_of_supply(doc, method=None):
doc.place_of_supply = get_place_of_supply(doc, doc.doctype)
# don't remove this function it is used in tests
def test_method():
'''test function'''
return 'overridden'
def get_place_of_supply(party_details, doctype):
if not frappe.get_meta('Address').has_field('gst_state'): return
if doctype in ("Sales Invoice", "Delivery Note", "Sales Order"):
address_name = party_details.shipping_address_name or party_details.customer_address
elif doctype in ("Purchase Invoice", "Purchase Order", "Purchase Receipt"):
address_name = party_details.shipping_address or party_details.supplier_address
if address_name:
address = frappe.db.get_value("Address", address_name, ["gst_state", "gst_state_number"], as_dict=1)
if address and address.gst_state and address.gst_state_number:
return cstr(address.gst_state_number) + "-" + cstr(address.gst_state)
@frappe.whitelist()
def get_regional_address_details(party_details, doctype, company, return_taxes=None):
if isinstance(party_details, string_types):
party_details = json.loads(party_details)
party_details = frappe._dict(party_details)
party_details.place_of_supply = get_place_of_supply(party_details, doctype)
if doctype in ("Sales Invoice", "Delivery Note", "Sales Order"):
master_doctype = "Sales Taxes and Charges Template"
get_tax_template_for_sez(party_details, master_doctype, company, 'Customer')
get_tax_template_based_on_category(master_doctype, company, party_details)
if party_details.get('taxes_and_charges') and return_taxes:
return party_details
if not party_details.company_gstin:
return
elif doctype in ("Purchase Invoice", "Purchase Order", "Purchase Receipt"):
master_doctype = "Purchase Taxes and Charges Template"
get_tax_template_for_sez(party_details, master_doctype, company, 'Supplier')
get_tax_template_based_on_category(master_doctype, company, party_details)
if party_details.get('taxes_and_charges') and return_taxes:
return party_details
if not party_details.supplier_gstin:
return
if not party_details.place_of_supply: return
if not party_details.company_gstin: return
if ((doctype in ("Sales Invoice", "Delivery Note", "Sales Order") and party_details.company_gstin
and party_details.company_gstin[:2] != party_details.place_of_supply[:2]) or (doctype in ("Purchase Invoice",
"Purchase Order", "Purchase Receipt") and party_details.supplier_gstin and party_details.supplier_gstin[:2] != party_details.place_of_supply[:2])):
default_tax = get_tax_template(master_doctype, company, 1, party_details.company_gstin[:2])
else:
default_tax = get_tax_template(master_doctype, company, 0, party_details.company_gstin[:2])
if not default_tax:
return
party_details["taxes_and_charges"] = default_tax
party_details.taxes = get_taxes_and_charges(master_doctype, default_tax)
if return_taxes:
return party_details
def get_tax_template_based_on_category(master_doctype, company, party_details):
if not party_details.get('tax_category'):
return
default_tax = frappe.db.get_value(master_doctype, {'company': company, 'tax_category': party_details.get('tax_category')},
'name')
if default_tax:
party_details["taxes_and_charges"] = default_tax
party_details.taxes = get_taxes_and_charges(master_doctype, default_tax)
def get_tax_template(master_doctype, company, is_inter_state, state_code):
tax_categories = frappe.get_all('Tax Category', fields = ['name', 'is_inter_state', 'gst_state'],
filters = {'is_inter_state': is_inter_state})
default_tax = ''
for tax_category in tax_categories:
if tax_category.gst_state == number_state_mapping[state_code] or \
(not default_tax and not tax_category.gst_state):
default_tax = frappe.db.get_value(master_doctype,
{'disabled': 0, 'tax_category': tax_category.name}, 'name')
return default_tax
def get_tax_template_for_sez(party_details, master_doctype, company, party_type):
gst_details = frappe.db.get_value(party_type, {'name': party_details.get(frappe.scrub(party_type))},
['gst_category', 'export_type'], as_dict=1)
if gst_details:
if gst_details.gst_category == 'SEZ' and gst_details.export_type == 'With Payment of Tax':
default_tax = frappe.db.get_value(master_doctype, {"company": company, "is_inter_state":1, "disabled":0,
"gst_state": number_state_mapping[party_details.company_gstin[:2]]})
party_details["taxes_and_charges"] = default_tax
party_details.taxes = get_taxes_and_charges(master_doctype, default_tax)
def calculate_annual_eligible_hra_exemption(doc):
basic_component = frappe.get_cached_value('Company', doc.company, "basic_component")
hra_component = frappe.get_cached_value('Company', doc.company, "hra_component")
if not (basic_component and hra_component):
frappe.throw(_("Please mention Basic and HRA component in Company"))
annual_exemption, monthly_exemption, hra_amount = 0, 0, 0
if hra_component and basic_component:
assignment = get_salary_assignment(doc.employee, nowdate())
if assignment:
hra_component_exists = frappe.db.exists("Salary Detail", {
"parent": assignment.salary_structure,
"salary_component": hra_component,
"parentfield": "earnings",
"parenttype": "Salary Structure"
})
if hra_component_exists:
basic_amount, hra_amount = get_component_amt_from_salary_slip(doc.employee,
assignment.salary_structure, basic_component, hra_component)
if hra_amount:
if doc.monthly_house_rent:
annual_exemption = calculate_hra_exemption(assignment.salary_structure,
basic_amount, hra_amount, doc.monthly_house_rent, doc.rented_in_metro_city)
if annual_exemption > 0:
monthly_exemption = annual_exemption / 12
else:
annual_exemption = 0
elif doc.docstatus == 1:
frappe.throw(_("Salary Structure must be submitted before submission of Tax Ememption Declaration"))
return frappe._dict({
"hra_amount": hra_amount,
"annual_exemption": annual_exemption,
"monthly_exemption": monthly_exemption
})
def get_component_amt_from_salary_slip(employee, salary_structure, basic_component, hra_component):
salary_slip = make_salary_slip(salary_structure, employee=employee, for_preview=1)
basic_amt, hra_amt = 0, 0
for earning in salary_slip.earnings:
if earning.salary_component == basic_component:
basic_amt = earning.amount
elif earning.salary_component == hra_component:
hra_amt = earning.amount
if basic_amt and hra_amt:
return basic_amt, hra_amt
return basic_amt, hra_amt
def calculate_hra_exemption(salary_structure, basic, monthly_hra, monthly_house_rent, rented_in_metro_city):
# TODO make this configurable
exemptions = []
frequency = frappe.get_value("Salary Structure", salary_structure, "payroll_frequency")
# case 1: The actual amount allotted by the employer as the HRA.
exemptions.append(get_annual_component_pay(frequency, monthly_hra))
actual_annual_rent = monthly_house_rent * 12
annual_basic = get_annual_component_pay(frequency, basic)
# case 2: Actual rent paid less 10% of the basic salary.
exemptions.append(flt(actual_annual_rent) - flt(annual_basic * 0.1))
# case 3: 50% of the basic salary, if the employee is staying in a metro city (40% for a non-metro city).
exemptions.append(annual_basic * 0.5 if rented_in_metro_city else annual_basic * 0.4)
# return minimum of 3 cases
return min(exemptions)
def get_annual_component_pay(frequency, amount):
if frequency == "Daily":
return amount * 365
elif frequency == "Weekly":
return amount * 52
elif frequency == "Fortnightly":
return amount * 26
elif frequency == "Monthly":
return amount * 12
elif frequency == "Bimonthly":
return amount * 6
def validate_house_rent_dates(doc):
if not doc.rented_to_date or not doc.rented_from_date:
frappe.throw(_("House rented dates required for exemption calculation"))
if date_diff(doc.rented_to_date, doc.rented_from_date) < 14:
frappe.throw(_("House rented dates should be atleast 15 days apart"))
proofs = frappe.db.sql("""
select name
from `tabEmployee Tax Exemption Proof Submission`
where
docstatus=1 and employee=%(employee)s and payroll_period=%(payroll_period)s
and (rented_from_date between %(from_date)s and %(to_date)s or rented_to_date between %(from_date)s and %(to_date)s)
""", {
"employee": doc.employee,
"payroll_period": doc.payroll_period,
"from_date": doc.rented_from_date,
"to_date": doc.rented_to_date
})
if proofs:
frappe.throw(_("House rent paid days overlapping with {0}").format(proofs[0][0]))
def calculate_hra_exemption_for_period(doc):
monthly_rent, eligible_hra = 0, 0
if doc.house_rent_payment_amount:
validate_house_rent_dates(doc)
# TODO receive rented months or validate dates are start and end of months?
# Calc monthly rent, round to nearest .5
factor = flt(date_diff(doc.rented_to_date, doc.rented_from_date) + 1)/30
factor = round(factor * 2)/2
monthly_rent = doc.house_rent_payment_amount / factor
# update field used by calculate_annual_eligible_hra_exemption
doc.monthly_house_rent = monthly_rent
exemptions = calculate_annual_eligible_hra_exemption(doc)
if exemptions["monthly_exemption"]:
# calc total exemption amount
eligible_hra = exemptions["monthly_exemption"] * factor
exemptions["monthly_house_rent"] = monthly_rent
exemptions["total_eligible_hra_exemption"] = eligible_hra
return exemptions
def get_ewb_data(dt, dn):
if dt != 'Sales Invoice':
frappe.throw(_('e-Way Bill JSON can only be generated from Sales Invoice'))
dn = dn.split(',')
ewaybills = []
for doc_name in dn:
doc = frappe.get_doc(dt, doc_name)
validate_sales_invoice(doc)
data = frappe._dict({
"transporterId": "",
"TotNonAdvolVal": 0,
})
data.userGstin = data.fromGstin = doc.company_gstin
data.supplyType = 'O'
if doc.gst_category in ['Registered Regular', 'SEZ']:
data.subSupplyType = 1
elif doc.gst_category in ['Overseas', 'Deemed Export']:
data.subSupplyType = 3
else:
frappe.throw(_('Unsupported GST Category for e-Way Bill JSON generation'))
data.docType = 'INV'
data.docDate = frappe.utils.formatdate(doc.posting_date, 'dd/mm/yyyy')
company_address = frappe.get_doc('Address', doc.company_address)
billing_address = frappe.get_doc('Address', doc.customer_address)
shipping_address = frappe.get_doc('Address', doc.shipping_address_name)
data = get_address_details(data, doc, company_address, billing_address)
data.itemList = []
data.totalValue = doc.total
data = get_item_list(data, doc)
disable_rounded = frappe.db.get_single_value('Global Defaults', 'disable_rounded_total')
data.totInvValue = doc.grand_total if disable_rounded else doc.rounded_total
data = get_transport_details(data, doc)
fields = {
"/. -": {
'docNo': doc.name,
'fromTrdName': doc.company,
'toTrdName': doc.customer_name,
'transDocNo': doc.lr_no,
},
"@#/,&. -": {
'fromAddr1': company_address.address_line1,
'fromAddr2': company_address.address_line2,
'fromPlace': company_address.city,
'toAddr1': shipping_address.address_line1,
'toAddr2': shipping_address.address_line2,
'toPlace': shipping_address.city,
'transporterName': doc.transporter_name
}
}
for allowed_chars, field_map in fields.items():
for key, value in field_map.items():
if not value:
data[key] = ''
else:
data[key] = re.sub(r'[^\w' + allowed_chars + ']', '', value)
ewaybills.append(data)
data = {
'version': '1.0.1118',
'billLists': ewaybills
}
return data
@frappe.whitelist()
def generate_ewb_json(dt, dn):
data = get_ewb_data(dt, dn)
frappe.local.response.filecontent = json.dumps(data, indent=4, sort_keys=True)
frappe.local.response.type = 'download'
if len(data['billLists']) > 1:
doc_name = 'Bulk'
else:
doc_name = dn
frappe.local.response.filename = '{0}_e-WayBill_Data_{1}.json'.format(doc_name, frappe.utils.random_string(5))
@frappe.whitelist()
def get_gstins_for_company(company):
company_gstins =[]
if company:
company_gstins = frappe.db.sql("""select
distinct `tabAddress`.gstin
from
`tabAddress`, `tabDynamic Link`
where
`tabDynamic Link`.parent = `tabAddress`.name and
`tabDynamic Link`.parenttype = 'Address' and
`tabDynamic Link`.link_doctype = 'Company' and
`tabDynamic Link`.link_name = '{0}'""".format(company))
return company_gstins
def get_address_details(data, doc, company_address, billing_address):
data.fromPincode = validate_pincode(company_address.pincode, 'Company Address')
data.fromStateCode = data.actualFromStateCode = validate_state_code(
company_address.gst_state_number, 'Company Address')
if not doc.billing_address_gstin or len(doc.billing_address_gstin) < 15:
data.toGstin = 'URP'
set_gst_state_and_state_number(billing_address)
else:
data.toGstin = doc.billing_address_gstin
data.toPincode = validate_pincode(billing_address.pincode, 'Customer Address')
data.toStateCode = validate_state_code(billing_address.gst_state_number, 'Customer Address')
if doc.customer_address != doc.shipping_address_name:
data.transType = 2
shipping_address = frappe.get_doc('Address', doc.shipping_address_name)
set_gst_state_and_state_number(shipping_address)
data.toPincode = validate_pincode(shipping_address.pincode, 'Shipping Address')
data.actualToStateCode = validate_state_code(shipping_address.gst_state_number, 'Shipping Address')
else:
data.transType = 1
data.actualToStateCode = data.toStateCode
shipping_address = billing_address
return data
def get_item_list(data, doc):
for attr in ['cgstValue', 'sgstValue', 'igstValue', 'cessValue', 'OthValue']:
data[attr] = 0
gst_accounts = get_gst_accounts(doc.company, account_wise=True)
tax_map = {
'sgst_account': ['sgstRate', 'sgstValue'],
'cgst_account': ['cgstRate', 'cgstValue'],
'igst_account': ['igstRate', 'igstValue'],
'cess_account': ['cessRate', 'cessValue']
}
item_data_attrs = ['sgstRate', 'cgstRate', 'igstRate', 'cessRate', 'cessNonAdvol']
hsn_wise_charges, hsn_taxable_amount = get_itemised_tax_breakup_data(doc, account_wise=True)
for hsn_code, taxable_amount in hsn_taxable_amount.items():
item_data = frappe._dict()
if not hsn_code:
frappe.throw(_('GST HSN Code does not exist for one or more items'))
item_data.hsnCode = int(hsn_code)
item_data.taxableAmount = taxable_amount
item_data.qtyUnit = ""
for attr in item_data_attrs:
item_data[attr] = 0
for account, tax_detail in hsn_wise_charges.get(hsn_code, {}).items():
account_type = gst_accounts.get(account, '')
for tax_acc, attrs in tax_map.items():
if account_type == tax_acc:
item_data[attrs[0]] = tax_detail.get('tax_rate')
data[attrs[1]] += tax_detail.get('tax_amount')
break
else:
data.OthValue += tax_detail.get('tax_amount')
data.itemList.append(item_data)
# Tax amounts rounded to 2 decimals to avoid exceeding max character limit
for attr in ['sgstValue', 'cgstValue', 'igstValue', 'cessValue']:
data[attr] = flt(data[attr], 2)
return data
def validate_sales_invoice(doc):
if doc.docstatus != 1:
frappe.throw(_('e-Way Bill JSON can only be generated from submitted document'))
if doc.is_return:
frappe.throw(_('e-Way Bill JSON cannot be generated for Sales Return as of now'))
if doc.ewaybill:
frappe.throw(_('e-Way Bill already exists for this document'))
reqd_fields = ['company_gstin', 'company_address', 'customer_address',
'shipping_address_name', 'mode_of_transport', 'distance']
for fieldname in reqd_fields:
if not doc.get(fieldname):
frappe.throw(_('{} is required to generate e-Way Bill JSON'.format(
doc.meta.get_label(fieldname)
)))
if len(doc.company_gstin) < 15:
frappe.throw(_('You must be a registered supplier to generate e-Way Bill'))
def get_transport_details(data, doc):
if doc.distance > 4000:
frappe.throw(_('Distance cannot be greater than 4000 kms'))
data.transDistance = int(round(doc.distance))
transport_modes = {
'Road': 1,
'Rail': 2,
'Air': 3,
'Ship': 4
}
vehicle_types = {
'Regular': 'R',
'Over Dimensional Cargo (ODC)': 'O'
}
data.transMode = transport_modes.get(doc.mode_of_transport)
if doc.mode_of_transport == 'Road':
if not doc.gst_transporter_id and not doc.vehicle_no:
frappe.throw(_('Either GST Transporter ID or Vehicle No is required if Mode of Transport is Road'))
if doc.vehicle_no:
data.vehicleNo = doc.vehicle_no.replace(' ', '')
if not doc.gst_vehicle_type:
frappe.throw(_('Vehicle Type is required if Mode of Transport is Road'))
else:
data.vehicleType = vehicle_types.get(doc.gst_vehicle_type)
else:
if not doc.lr_no or not doc.lr_date:
frappe.throw(_('Transport Receipt No and Date are mandatory for your chosen Mode of Transport'))
if doc.lr_no:
data.transDocNo = doc.lr_no
if doc.lr_date:
data.transDocDate = frappe.utils.formatdate(doc.lr_date, 'dd/mm/yyyy')
if doc.gst_transporter_id:
validate_gstin_check_digit(doc.gst_transporter_id, label='GST Transporter ID')
data.transporterId = doc.gst_transporter_id
return data
def validate_pincode(pincode, address):
pin_not_found = "Pin Code doesn't exist for {}"
incorrect_pin = "Pin Code for {} is incorrecty formatted. It must be 6 digits (without spaces)"
if not pincode:
frappe.throw(_(pin_not_found.format(address)))
pincode = pincode.replace(' ', '')
if not pincode.isdigit() or len(pincode) != 6:
frappe.throw(_(incorrect_pin.format(address)))
else:
return int(pincode)
def validate_state_code(state_code, address):
no_state_code = "GST State Code not found for {0}. Please set GST State in {0}"
if not state_code:
frappe.throw(_(no_state_code.format(address)))
else:
return int(state_code)
def get_gst_accounts(company, account_wise=False):
gst_accounts = frappe._dict()
gst_settings_accounts = frappe.get_all("GST Account",
filters={"parent": "GST Settings", "company": company},
fields=["cgst_account", "sgst_account", "igst_account", "cess_account"])
if not gst_settings_accounts and not frappe.flags.in_test:
frappe.throw(_("Please set GST Accounts in GST Settings"))
for d in gst_settings_accounts:
for acc, val in d.items():
if not account_wise:
gst_accounts.setdefault(acc, []).append(val)
elif val:
gst_accounts[val] = acc
return gst_accounts