Deepesh Garg b1158bcb7c
Merge pull request #30799 from nextchamp-saqib/einv-cess-fix
fix(india): cess value not considered while validating e-invoice totals
2022-04-25 14:39:15 +05:30

1409 lines
40 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
import base64
import io
import json
import os
import re
import sys
import traceback
import frappe
import jwt
import requests
from frappe import _, bold
from frappe.core.page.background_jobs.background_jobs import get_info
from frappe.integrations.utils import make_get_request, make_post_request
from frappe.utils.background_jobs import enqueue
from frappe.utils.data import (
add_to_date,
cint,
cstr,
flt,
format_date,
get_link_to_form,
getdate,
now_datetime,
time_diff_in_hours,
time_diff_in_seconds,
)
from frappe.utils.scheduler import is_scheduler_inactive
from pyqrcode import create as qrcreate
from erpnext.regional.india.utils import get_gst_accounts, get_place_of_supply
@frappe.whitelist()
def validate_eligibility(doc):
if isinstance(doc, str):
doc = json.loads(doc)
invalid_doctype = doc.get("doctype") != "Sales Invoice"
if invalid_doctype:
return False
einvoicing_enabled = cint(frappe.db.get_single_value("E Invoice Settings", "enable"))
if not einvoicing_enabled:
return False
einvoicing_eligible_from = (
frappe.db.get_single_value("E Invoice Settings", "applicable_from") or "2021-04-01"
)
if getdate(doc.get("posting_date")) < getdate(einvoicing_eligible_from):
return False
invalid_company = not frappe.db.get_value("E Invoice User", {"company": doc.get("company")})
invalid_supply_type = doc.get("gst_category") not in [
"Registered Regular",
"SEZ",
"Overseas",
"Deemed Export",
]
company_transaction = doc.get("billing_address_gstin") == doc.get("company_gstin")
# if export invoice, then taxes can be empty
# invoice can only be ineligible if no taxes applied and is not an export invoice
no_taxes_applied = not doc.get("taxes") and not doc.get("gst_category") == "Overseas"
has_non_gst_item = any(d for d in doc.get("items", []) if d.get("is_non_gst"))
if (
invalid_company
or invalid_supply_type
or company_transaction
or no_taxes_applied
or has_non_gst_item
):
return False
return True
def validate_einvoice_fields(doc):
invoice_eligible = validate_eligibility(doc)
if not invoice_eligible:
return
if doc.docstatus == 0 and doc._action == "save":
if doc.irn:
frappe.throw(_("You cannot edit the invoice after generating IRN"), title=_("Edit Not Allowed"))
if len(doc.name) > 16:
raise_document_name_too_long_error()
doc.einvoice_status = "Pending"
elif doc.docstatus == 1 and doc._action == "submit" and not doc.irn:
frappe.throw(_("You must generate IRN before submitting the document."), title=_("Missing IRN"))
elif doc.irn and doc.docstatus == 2 and doc._action == "cancel" and not doc.irn_cancelled:
frappe.throw(
_("You must cancel IRN before cancelling the document."), title=_("Cancel Not Allowed")
)
def raise_document_name_too_long_error():
title = _("Document ID Too Long")
msg = _("As you have E-Invoicing enabled, to be able to generate IRN for this invoice")
msg += ", "
msg += _("document id {} exceed 16 letters.").format(bold(_("should not")))
msg += "<br><br>"
msg += _("You must {} your {} in order to have document id of {} length 16.").format(
bold(_("modify")), bold(_("naming series")), bold(_("maximum"))
)
msg += _("Please account for ammended documents too.")
frappe.throw(msg, title=title)
def read_json(name):
file_path = os.path.join(os.path.dirname(__file__), "{name}.json".format(name=name))
with open(file_path, "r") as f:
return cstr(f.read())
def get_transaction_details(invoice):
supply_type = ""
if invoice.gst_category == "Registered Regular":
supply_type = "B2B"
elif invoice.gst_category == "SEZ":
supply_type = "SEZWOP"
elif invoice.gst_category == "Overseas":
supply_type = "EXPWOP"
elif invoice.gst_category == "Deemed Export":
supply_type = "DEXP"
if not supply_type:
rr, sez, overseas, export = (
bold("Registered Regular"),
bold("SEZ"),
bold("Overseas"),
bold("Deemed Export"),
)
frappe.throw(
_("GST category should be one of {}, {}, {}, {}").format(rr, sez, overseas, export),
title=_("Invalid Supply Type"),
)
return frappe._dict(
dict(tax_scheme="GST", supply_type=supply_type, reverse_charge=invoice.reverse_charge)
)
def get_doc_details(invoice):
if getdate(invoice.posting_date) < getdate("2021-01-01"):
frappe.throw(
_("IRN generation is not allowed for invoices dated before 1st Jan 2021"),
title=_("Not Allowed"),
)
invoice_type = "CRN" if invoice.is_return else "INV"
invoice_name = invoice.name
invoice_date = format_date(invoice.posting_date, "dd/mm/yyyy")
return frappe._dict(
dict(invoice_type=invoice_type, invoice_name=invoice_name, invoice_date=invoice_date)
)
def validate_address_fields(address, skip_gstin_validation):
if (
(not address.gstin and not skip_gstin_validation)
or not address.city
or not address.pincode
or not address.address_title
or not address.address_line1
or not address.gst_state_number
):
frappe.throw(
msg=_(
"Address Lines, City, Pincode, GSTIN are mandatory for address {}. Please set them and try again."
).format(address.name),
title=_("Missing Address Fields"),
)
if address.address_line2 and len(address.address_line2) < 2:
# to prevent "The field Address 2 must be a string with a minimum length of 3 and a maximum length of 100"
address.address_line2 = ""
def get_party_details(address_name, skip_gstin_validation=False):
addr = frappe.get_doc("Address", address_name)
validate_address_fields(addr, skip_gstin_validation)
if addr.gst_state_number == 97:
# according to einvoice standard
addr.pincode = 999999
party_address_details = frappe._dict(
dict(
legal_name=sanitize_for_json(addr.address_title),
location=sanitize_for_json(addr.city),
pincode=addr.pincode,
gstin=addr.gstin,
state_code=addr.gst_state_number,
address_line1=sanitize_for_json(addr.address_line1),
address_line2=sanitize_for_json(addr.address_line2),
)
)
return party_address_details
def get_overseas_address_details(address_name):
address_title, address_line1, address_line2, city = frappe.db.get_value(
"Address", address_name, ["address_title", "address_line1", "address_line2", "city"]
)
if not address_title or not address_line1 or not city:
frappe.throw(
msg=_(
"Address lines and city is mandatory for address {}. Please set them and try again."
).format(get_link_to_form("Address", address_name)),
title=_("Missing Address Fields"),
)
return frappe._dict(
dict(
gstin="URP",
legal_name=sanitize_for_json(address_title),
location=city,
address_line1=sanitize_for_json(address_line1),
address_line2=sanitize_for_json(address_line2),
pincode=999999,
state_code=96,
place_of_supply=96,
)
)
def get_item_list(invoice):
item_list = []
for d in invoice.items:
einvoice_item_schema = read_json("einv_item_template")
item = frappe._dict({})
item.update(d.as_dict())
item.sr_no = d.idx
item.description = sanitize_for_json(d.item_name)
item.qty = abs(item.qty)
if flt(item.qty) != 0.0:
item.unit_rate = abs(item.taxable_value / item.qty)
else:
item.unit_rate = abs(item.taxable_value)
item.gross_amount = abs(item.taxable_value)
item.taxable_value = abs(item.taxable_value)
item.discount_amount = 0
item.is_service_item = "Y" if item.gst_hsn_code and item.gst_hsn_code[:2] == "99" else "N"
item.serial_no = ""
item = update_item_taxes(invoice, item)
item.total_value = abs(
item.taxable_value
+ item.igst_amount
+ item.sgst_amount
+ item.cgst_amount
+ item.cess_amount
+ item.cess_nadv_amount
+ item.other_charges
)
einv_item = einvoice_item_schema.format(item=item)
item_list.append(einv_item)
return ", ".join(item_list)
def update_item_taxes(invoice, item):
gst_accounts = get_gst_accounts(invoice.company)
gst_accounts_list = [d for accounts in gst_accounts.values() for d in accounts if d]
for attr in [
"tax_rate",
"cess_rate",
"cess_nadv_amount",
"cgst_amount",
"sgst_amount",
"igst_amount",
"cess_amount",
"cess_nadv_amount",
"other_charges",
]:
item[attr] = 0
for t in invoice.taxes:
is_applicable = t.tax_amount and t.account_head in gst_accounts_list
if is_applicable:
# this contains item wise tax rate & tax amount (incl. discount)
item_tax_detail = json.loads(t.item_wise_tax_detail).get(item.item_code or item.item_name)
item_tax_rate = item_tax_detail[0]
# item tax amount excluding discount amount
item_tax_amount = (item_tax_rate / 100) * item.taxable_value
if t.account_head in gst_accounts.cess_account:
item_tax_amount_after_discount = item_tax_detail[1]
if t.charge_type == "On Item Quantity":
item.cess_nadv_amount += abs(item_tax_amount_after_discount)
else:
item.cess_rate += item_tax_rate
item.cess_amount += abs(item_tax_amount_after_discount)
for tax_type in ["igst", "cgst", "sgst", "utgst"]:
if t.account_head in gst_accounts[f"{tax_type}_account"]:
item.tax_rate += item_tax_rate
if tax_type == "utgst":
# utgst taxes are reported same as sgst tax
item["sgst_amount"] += abs(item_tax_amount)
else:
item[f"{tax_type}_amount"] += abs(item_tax_amount)
else:
# TODO: other charges per item
pass
return item
def get_invoice_value_details(invoice):
invoice_value_details = frappe._dict(dict())
invoice_value_details.base_total = abs(sum([i.taxable_value for i in invoice.get("items")]))
invoice_value_details.invoice_discount_amt = 0
invoice_value_details.round_off = invoice.base_rounding_adjustment
invoice_value_details.base_grand_total = abs(invoice.base_rounded_total) or abs(
invoice.base_grand_total
)
invoice_value_details.grand_total = abs(invoice.rounded_total) or abs(invoice.grand_total)
invoice_value_details = update_invoice_taxes(invoice, invoice_value_details)
return invoice_value_details
def update_invoice_taxes(invoice, invoice_value_details):
gst_accounts = get_gst_accounts(invoice.company)
gst_accounts_list = [d for accounts in gst_accounts.values() for d in accounts if d]
invoice_value_details.total_cgst_amt = 0
invoice_value_details.total_sgst_amt = 0
invoice_value_details.total_igst_amt = 0
invoice_value_details.total_cess_amt = 0
invoice_value_details.total_other_charges = 0
considered_rows = []
for t in invoice.taxes:
tax_amount = t.base_tax_amount_after_discount_amount
if t.account_head in gst_accounts_list:
if t.account_head in gst_accounts.cess_account:
# using after discount amt since item also uses after discount amt for cess calc
invoice_value_details.total_cess_amt += abs(t.base_tax_amount_after_discount_amount)
for tax_type in ["igst", "cgst", "sgst", "utgst"]:
if t.account_head in gst_accounts[f"{tax_type}_account"]:
if tax_type == "utgst":
invoice_value_details["total_sgst_amt"] += abs(tax_amount)
else:
invoice_value_details[f"total_{tax_type}_amt"] += abs(tax_amount)
update_other_charges(t, invoice_value_details, gst_accounts_list, invoice, considered_rows)
else:
invoice_value_details.total_other_charges += abs(tax_amount)
return invoice_value_details
def update_other_charges(
tax_row, invoice_value_details, gst_accounts_list, invoice, considered_rows
):
prev_row_id = cint(tax_row.row_id) - 1
if tax_row.account_head in gst_accounts_list and prev_row_id not in considered_rows:
if tax_row.charge_type == "On Previous Row Amount":
amount = invoice.get("taxes")[prev_row_id].tax_amount_after_discount_amount
invoice_value_details.total_other_charges -= abs(amount)
considered_rows.append(prev_row_id)
if tax_row.charge_type == "On Previous Row Total":
amount = invoice.get("taxes")[prev_row_id].base_total - invoice.base_net_total
invoice_value_details.total_other_charges -= abs(amount)
considered_rows.append(prev_row_id)
def get_payment_details(invoice):
payee_name = invoice.company
mode_of_payment = ""
paid_amount = invoice.base_paid_amount
outstanding_amount = invoice.outstanding_amount
return frappe._dict(
dict(
payee_name=payee_name,
mode_of_payment=mode_of_payment,
paid_amount=paid_amount,
outstanding_amount=outstanding_amount,
)
)
def get_return_doc_reference(invoice):
invoice_date = frappe.db.get_value("Sales Invoice", invoice.return_against, "posting_date")
return frappe._dict(
dict(invoice_name=invoice.return_against, invoice_date=format_date(invoice_date, "dd/mm/yyyy"))
)
def get_eway_bill_details(invoice):
if invoice.is_return:
frappe.throw(
_(
"E-Way Bill cannot be generated for Credit Notes & Debit Notes. Please clear fields in the Transporter Section of the invoice."
),
title=_("Invalid Fields"),
)
mode_of_transport = {"": "", "Road": "1", "Air": "2", "Rail": "3", "Ship": "4"}
vehicle_type = {"Regular": "R", "Over Dimensional Cargo (ODC)": "O"}
return frappe._dict(
dict(
gstin=invoice.gst_transporter_id,
name=invoice.transporter_name,
mode_of_transport=mode_of_transport[invoice.mode_of_transport],
distance=invoice.distance or 0,
document_name=invoice.lr_no,
document_date=format_date(invoice.lr_date, "dd/mm/yyyy"),
vehicle_no=invoice.vehicle_no,
vehicle_type=vehicle_type[invoice.gst_vehicle_type],
)
)
def validate_mandatory_fields(invoice):
if not invoice.company_address:
frappe.throw(
_(
"Company Address is mandatory to fetch company GSTIN details. Please set Company Address and try again."
),
title=_("Missing Fields"),
)
if not invoice.customer_address:
frappe.throw(
_(
"Customer Address is mandatory to fetch customer GSTIN details. Please set Company Address and try again."
),
title=_("Missing Fields"),
)
if not frappe.db.get_value("Address", invoice.company_address, "gstin"):
frappe.throw(
_(
"GSTIN is mandatory to fetch company GSTIN details. Please enter GSTIN in selected company address."
),
title=_("Missing Fields"),
)
if invoice.gst_category != "Overseas" and not frappe.db.get_value(
"Address", invoice.customer_address, "gstin"
):
frappe.throw(
_(
"GSTIN is mandatory to fetch customer GSTIN details. Please enter GSTIN in selected customer address."
),
title=_("Missing Fields"),
)
def validate_totals(einvoice):
item_list = einvoice["ItemList"]
value_details = einvoice["ValDtls"]
total_item_ass_value = 0
total_item_cgst_value = 0
total_item_sgst_value = 0
total_item_igst_value = 0
total_item_value = 0
for item in item_list:
total_item_ass_value += flt(item["AssAmt"])
total_item_cgst_value += flt(item["CgstAmt"])
total_item_sgst_value += flt(item["SgstAmt"])
total_item_igst_value += flt(item["IgstAmt"])
total_item_value += flt(item["TotItemVal"])
if (
abs(flt(item["AssAmt"]) * flt(item["GstRt"]) / 100)
- (flt(item["CgstAmt"]) + flt(item["SgstAmt"]) + flt(item["IgstAmt"]))
> 1
):
frappe.throw(
_(
"Row #{}: GST rate is invalid. Please remove tax rows with zero tax amount from taxes table."
).format(item.idx)
)
if abs(flt(value_details["AssVal"]) - total_item_ass_value) > 1:
frappe.throw(
_(
"Total Taxable Value of the items is not equal to the Invoice Net Total. Please check item taxes / discounts for any correction."
)
)
if (
abs(
flt(value_details["CgstVal"])
+ flt(value_details["SgstVal"])
- total_item_cgst_value
- total_item_sgst_value
)
> 1
):
frappe.throw(
_(
"CGST + SGST value of the items is not equal to total CGST + SGST value. Please review taxes for any correction."
)
)
if abs(flt(value_details["IgstVal"]) - total_item_igst_value) > 1:
frappe.throw(
_(
"IGST value of all items is not equal to total IGST value. Please review taxes for any correction."
)
)
if (
abs(
flt(value_details["TotInvVal"])
+ flt(value_details["Discount"])
- flt(value_details["OthChrg"])
- flt(value_details["RndOffAmt"])
- total_item_value
)
> 1
):
frappe.throw(
_(
"Total Value of the items is not equal to the Invoice Grand Total. Please check item taxes / discounts for any correction."
)
)
calculated_invoice_value = (
flt(value_details["AssVal"])
+ flt(value_details["CgstVal"])
+ flt(value_details["SgstVal"])
+ flt(value_details["IgstVal"])
+ flt(value_details["CesVal"])
+ flt(value_details["OthChrg"])
+ flt(value_details["RndOffAmt"])
- flt(value_details["Discount"])
)
if abs(flt(value_details["TotInvVal"]) - calculated_invoice_value) > 1:
frappe.throw(
_(
"Total Item Value + Taxes - Discount is not equal to the Invoice Grand Total. Please check taxes / discounts for any correction."
)
)
def make_einvoice(invoice):
validate_mandatory_fields(invoice)
schema = read_json("einv_template")
transaction_details = get_transaction_details(invoice)
item_list = get_item_list(invoice)
doc_details = get_doc_details(invoice)
invoice_value_details = get_invoice_value_details(invoice)
seller_details = get_party_details(invoice.company_address)
if invoice.gst_category == "Overseas":
buyer_details = get_overseas_address_details(invoice.customer_address)
else:
buyer_details = get_party_details(invoice.customer_address)
place_of_supply = get_place_of_supply(invoice, invoice.doctype)
if place_of_supply:
place_of_supply = place_of_supply.split("-")[0]
else:
place_of_supply = sanitize_for_json(invoice.billing_address_gstin)[:2]
buyer_details.update(dict(place_of_supply=place_of_supply))
seller_details.update(dict(legal_name=invoice.company))
buyer_details.update(dict(legal_name=invoice.customer_name or invoice.customer))
shipping_details = payment_details = prev_doc_details = eway_bill_details = frappe._dict({})
if invoice.shipping_address_name and invoice.customer_address != invoice.shipping_address_name:
if invoice.gst_category == "Overseas":
shipping_details = get_overseas_address_details(invoice.shipping_address_name)
else:
shipping_details = get_party_details(invoice.shipping_address_name, skip_gstin_validation=True)
dispatch_details = frappe._dict({})
if invoice.dispatch_address_name:
dispatch_details = get_party_details(invoice.dispatch_address_name, skip_gstin_validation=True)
if invoice.is_pos and invoice.base_paid_amount:
payment_details = get_payment_details(invoice)
if invoice.is_return and invoice.return_against:
prev_doc_details = get_return_doc_reference(invoice)
if invoice.transporter and not invoice.is_return:
eway_bill_details = get_eway_bill_details(invoice)
# not yet implemented
period_details = export_details = frappe._dict({})
einvoice = schema.format(
transaction_details=transaction_details,
doc_details=doc_details,
dispatch_details=dispatch_details,
seller_details=seller_details,
buyer_details=buyer_details,
shipping_details=shipping_details,
item_list=item_list,
invoice_value_details=invoice_value_details,
payment_details=payment_details,
period_details=period_details,
prev_doc_details=prev_doc_details,
export_details=export_details,
eway_bill_details=eway_bill_details,
)
try:
einvoice = safe_json_load(einvoice)
einvoice = santize_einvoice_fields(einvoice)
except Exception:
show_link_to_error_log(invoice, einvoice)
try:
validate_totals(einvoice)
except Exception:
log_error(einvoice)
raise
return einvoice
def show_link_to_error_log(invoice, einvoice):
err_log = log_error(einvoice)
link_to_error_log = get_link_to_form("Error Log", err_log.name, "Error Log")
frappe.throw(
_(
"An error occurred while creating e-invoice for {}. Please check {} for more information."
).format(invoice.name, link_to_error_log),
title=_("E Invoice Creation Failed"),
)
def log_error(data=None):
if isinstance(data, str):
data = json.loads(data)
seperator = "--" * 50
err_tb = traceback.format_exc()
err_msg = str(sys.exc_info()[1])
data = json.dumps(data, indent=4)
message = "\n".join(["Error", err_msg, seperator, "Data:", data, seperator, "Exception:", err_tb])
return frappe.log_error(title=_("E Invoice Request Failed"), message=message)
def santize_einvoice_fields(einvoice):
int_fields = ["Pin", "Distance", "CrDay"]
float_fields = [
"Qty",
"FreeQty",
"UnitPrice",
"TotAmt",
"Discount",
"PreTaxVal",
"AssAmt",
"GstRt",
"IgstAmt",
"CgstAmt",
"SgstAmt",
"CesRt",
"CesAmt",
"CesNonAdvlAmt",
"StateCesRt",
"StateCesAmt",
"StateCesNonAdvlAmt",
"OthChrg",
"TotItemVal",
"AssVal",
"CgstVal",
"SgstVal",
"IgstVal",
"CesVal",
"StCesVal",
"Discount",
"OthChrg",
"RndOffAmt",
"TotInvVal",
"TotInvValFc",
"PaidAmt",
"PaymtDue",
"ExpDuty",
]
copy = einvoice.copy()
for key, value in copy.items():
if isinstance(value, list):
for idx, d in enumerate(value):
santized_dict = santize_einvoice_fields(d)
if santized_dict:
einvoice[key][idx] = santized_dict
else:
einvoice[key].pop(idx)
if not einvoice[key]:
einvoice.pop(key, None)
elif isinstance(value, dict):
santized_dict = santize_einvoice_fields(value)
if santized_dict:
einvoice[key] = santized_dict
else:
einvoice.pop(key, None)
elif not value or value == "None":
einvoice.pop(key, None)
elif key in float_fields:
einvoice[key] = flt(value, 2)
elif key in int_fields:
einvoice[key] = cint(value)
return einvoice
def safe_json_load(json_string):
try:
return json.loads(json_string)
except json.JSONDecodeError as e:
# print a snippet of 40 characters around the location where error occured
pos = e.pos
start, end = max(0, pos - 20), min(len(json_string) - 1, pos + 20)
snippet = json_string[start:end]
frappe.throw(
_(
"Error in input data. Please check for any special characters near following input: <br> {}"
).format(snippet)
)
class RequestFailed(Exception):
pass
class CancellationNotAllowed(Exception):
pass
class GSPConnector:
def __init__(self, doctype=None, docname=None):
self.doctype = doctype
self.docname = docname
self.set_invoice()
self.set_credentials()
# authenticate url is same for sandbox & live
self.authenticate_url = "https://gsp.adaequare.com/gsp/authenticate?grant_type=token"
self.base_url = (
"https://gsp.adaequare.com"
if not self.e_invoice_settings.sandbox_mode
else "https://gsp.adaequare.com/test"
)
self.cancel_irn_url = self.base_url + "/enriched/ei/api/invoice/cancel"
self.irn_details_url = self.base_url + "/enriched/ei/api/invoice/irn"
self.generate_irn_url = self.base_url + "/enriched/ei/api/invoice"
self.gstin_details_url = self.base_url + "/enriched/ei/api/master/gstin"
self.cancel_ewaybill_url = self.base_url + "/enriched/ewb/ewayapi?action=CANEWB"
self.generate_ewaybill_url = self.base_url + "/enriched/ei/api/ewaybill"
def set_invoice(self):
self.invoice = None
if self.doctype and self.docname:
self.invoice = frappe.get_cached_doc(self.doctype, self.docname)
def set_credentials(self):
self.e_invoice_settings = frappe.get_cached_doc("E Invoice Settings")
if not self.e_invoice_settings.enable:
frappe.throw(
_("E-Invoicing is disabled. Please enable it from {} to generate e-invoices.").format(
get_link_to_form("E Invoice Settings", "E Invoice Settings")
)
)
if self.invoice:
gstin = self.get_seller_gstin()
credentials_for_gstin = [d for d in self.e_invoice_settings.credentials if d.gstin == gstin]
if credentials_for_gstin:
self.credentials = credentials_for_gstin[0]
else:
frappe.throw(
_(
"Cannot find e-invoicing credentials for selected Company GSTIN. Please check E-Invoice Settings"
)
)
else:
self.credentials = (
self.e_invoice_settings.credentials[0] if self.e_invoice_settings.credentials else None
)
def get_seller_gstin(self):
gstin = frappe.db.get_value("Address", self.invoice.company_address, "gstin")
if not gstin:
frappe.throw(
_("Cannot retrieve Company GSTIN. Please select company address with valid GSTIN.")
)
return gstin
def get_auth_token(self):
if time_diff_in_seconds(self.e_invoice_settings.token_expiry, now_datetime()) < 150.0:
self.fetch_auth_token()
return self.e_invoice_settings.auth_token
def make_request(self, request_type, url, headers=None, data=None):
try:
if request_type == "post":
res = make_post_request(url, headers=headers, data=data)
else:
res = make_get_request(url, headers=headers, data=data)
except requests.exceptions.HTTPError as e:
if e.response.status_code in [401, 403] and not hasattr(self, "token_auto_refreshed"):
self.auto_refresh_token()
headers = self.get_headers()
return self.make_request(request_type, url, headers, data)
self.log_request(url, headers, data, res)
return res
def auto_refresh_token(self):
self.fetch_auth_token()
self.token_auto_refreshed = True
def log_request(self, url, headers, data, res):
headers.update({"password": self.credentials.password})
request_log = frappe.get_doc(
{
"doctype": "E Invoice Request Log",
"user": frappe.session.user,
"reference_invoice": self.invoice.name if self.invoice else None,
"url": url,
"headers": json.dumps(headers, indent=4) if headers else None,
"data": json.dumps(data, indent=4) if isinstance(data, dict) else data,
"response": json.dumps(res, indent=4) if res else None,
}
)
request_log.save(ignore_permissions=True)
frappe.db.commit()
def get_client_credentials(self):
if self.e_invoice_settings.client_id and self.e_invoice_settings.client_secret:
return self.e_invoice_settings.client_id, self.e_invoice_settings.get_password("client_secret")
return frappe.conf.einvoice_client_id, frappe.conf.einvoice_client_secret
def fetch_auth_token(self):
client_id, client_secret = self.get_client_credentials()
headers = {"gspappid": client_id, "gspappsecret": client_secret}
res = {}
try:
res = self.make_request("post", self.authenticate_url, headers)
self.e_invoice_settings.auth_token = "{} {}".format(
res.get("token_type"), res.get("access_token")
)
self.e_invoice_settings.token_expiry = add_to_date(None, seconds=res.get("expires_in"))
self.e_invoice_settings.save(ignore_permissions=True)
self.e_invoice_settings.reload()
except Exception:
log_error(res)
self.raise_error(True)
def get_headers(self):
return {
"content-type": "application/json",
"user_name": self.credentials.username,
"password": self.credentials.get_password(),
"gstin": self.credentials.gstin,
"authorization": self.get_auth_token(),
"requestid": str(base64.b64encode(os.urandom(18))),
}
def fetch_gstin_details(self, gstin):
headers = self.get_headers()
try:
params = "?gstin={gstin}".format(gstin=gstin)
res = self.make_request("get", self.gstin_details_url + params, headers)
if res.get("success"):
return res.get("result")
else:
log_error(res)
raise RequestFailed
except RequestFailed:
self.raise_error()
except Exception:
log_error()
self.raise_error(True)
@staticmethod
def get_gstin_details(gstin):
"""fetch and cache GSTIN details"""
if not hasattr(frappe.local, "gstin_cache"):
frappe.local.gstin_cache = {}
key = gstin
gsp_connector = GSPConnector()
details = gsp_connector.fetch_gstin_details(gstin)
frappe.local.gstin_cache[key] = details
frappe.cache().hset("gstin_cache", key, details)
return details
def generate_irn(self):
data = {}
try:
headers = self.get_headers()
einvoice = make_einvoice(self.invoice)
data = json.dumps(einvoice, indent=4)
res = self.make_request("post", self.generate_irn_url, headers, data)
if res.get("success"):
self.set_einvoice_data(res.get("result"))
elif "2150" in res.get("message"):
# IRN already generated but not updated in invoice
# Extract the IRN from the response description and fetch irn details
irn = res.get("result")[0].get("Desc").get("Irn")
irn_details = self.get_irn_details(irn)
if irn_details:
self.set_einvoice_data(irn_details)
else:
raise RequestFailed(
"IRN has already been generated for the invoice but cannot fetch details for the it. \
Contact ERPNext support to resolve the issue."
)
else:
raise RequestFailed
except RequestFailed:
errors = self.sanitize_error_message(res.get("message"))
self.set_failed_status(errors=errors)
self.raise_error(errors=errors)
except Exception as e:
self.set_failed_status(errors=str(e))
log_error(data)
self.raise_error(True)
@staticmethod
def bulk_generate_irn(invoices):
gsp_connector = GSPConnector()
gsp_connector.doctype = "Sales Invoice"
failed = []
for invoice in invoices:
try:
gsp_connector.docname = invoice
gsp_connector.set_invoice()
gsp_connector.set_credentials()
gsp_connector.generate_irn()
except Exception as e:
failed.append({"docname": invoice, "message": str(e)})
return failed
def get_irn_details(self, irn):
headers = self.get_headers()
try:
params = "?irn={irn}".format(irn=irn)
res = self.make_request("get", self.irn_details_url + params, headers)
if res.get("success"):
return res.get("result")
else:
raise RequestFailed
except RequestFailed:
errors = self.sanitize_error_message(res.get("message"))
self.raise_error(errors=errors)
except Exception:
log_error()
self.raise_error(True)
def cancel_irn(self, irn, reason, remark):
data, res = {}, {}
try:
# validate cancellation
if time_diff_in_hours(now_datetime(), self.invoice.ack_date) > 24:
frappe.throw(
_("E-Invoice cannot be cancelled after 24 hours of IRN generation."),
title=_("Not Allowed"),
exc=CancellationNotAllowed,
)
if not irn:
frappe.throw(
_("IRN not found. You must generate IRN before cancelling."),
title=_("Not Allowed"),
exc=CancellationNotAllowed,
)
headers = self.get_headers()
data = json.dumps({"Irn": irn, "Cnlrsn": reason, "Cnlrem": remark}, indent=4)
res = self.make_request("post", self.cancel_irn_url, headers, data)
if res.get("success") or "9999" in res.get("message"):
self.invoice.irn_cancelled = 1
self.invoice.irn_cancel_date = res.get("result")["CancelDate"] if res.get("result") else ""
self.invoice.einvoice_status = "Cancelled"
self.invoice.flags.updater_reference = {
"doctype": self.invoice.doctype,
"docname": self.invoice.name,
"label": _("IRN Cancelled - {}").format(remark),
}
self.update_invoice()
else:
raise RequestFailed
except RequestFailed:
errors = self.sanitize_error_message(res.get("message"))
self.set_failed_status(errors=errors)
self.raise_error(errors=errors)
except CancellationNotAllowed as e:
self.set_failed_status(errors=str(e))
self.raise_error(errors=str(e))
except Exception as e:
self.set_failed_status(errors=str(e))
log_error(data)
self.raise_error(True)
@staticmethod
def bulk_cancel_irn(invoices, reason, remark):
gsp_connector = GSPConnector()
gsp_connector.doctype = "Sales Invoice"
failed = []
for invoice in invoices:
try:
gsp_connector.docname = invoice
gsp_connector.set_invoice()
gsp_connector.set_credentials()
irn = gsp_connector.invoice.irn
gsp_connector.cancel_irn(irn, reason, remark)
except Exception as e:
failed.append({"docname": invoice, "message": str(e)})
return failed
def generate_eway_bill(self, **kwargs):
args = frappe._dict(kwargs)
headers = self.get_headers()
eway_bill_details = get_eway_bill_details(args)
data = json.dumps(
{
"Irn": args.irn,
"Distance": cint(eway_bill_details.distance),
"TransMode": eway_bill_details.mode_of_transport,
"TransId": eway_bill_details.gstin,
"TransName": eway_bill_details.name,
"TrnDocDt": eway_bill_details.document_date,
"TrnDocNo": eway_bill_details.document_name,
"VehNo": eway_bill_details.vehicle_no,
"VehType": eway_bill_details.vehicle_type,
},
indent=4,
)
try:
res = self.make_request("post", self.generate_ewaybill_url, headers, data)
if res.get("success"):
self.invoice.ewaybill = res.get("result").get("EwbNo")
self.invoice.eway_bill_validity = res.get("result").get("EwbValidTill")
self.invoice.eway_bill_cancelled = 0
self.invoice.update(args)
self.invoice.flags.updater_reference = {
"doctype": self.invoice.doctype,
"docname": self.invoice.name,
"label": _("E-Way Bill Generated"),
}
self.update_invoice()
else:
raise RequestFailed
except RequestFailed:
errors = self.sanitize_error_message(res.get("message"))
self.raise_error(errors=errors)
except Exception:
log_error(data)
self.raise_error(True)
def cancel_eway_bill(self, eway_bill, reason, remark):
headers = self.get_headers()
data = json.dumps({"ewbNo": eway_bill, "cancelRsnCode": reason, "cancelRmrk": remark}, indent=4)
headers["username"] = headers["user_name"]
del headers["user_name"]
try:
res = self.make_request("post", self.cancel_ewaybill_url, headers, data)
if res.get("success"):
self.invoice.ewaybill = ""
self.invoice.eway_bill_cancelled = 1
self.invoice.flags.updater_reference = {
"doctype": self.invoice.doctype,
"docname": self.invoice.name,
"label": _("E-Way Bill Cancelled - {}").format(remark),
}
self.update_invoice()
else:
raise RequestFailed
except RequestFailed:
errors = self.sanitize_error_message(res.get("message"))
self.raise_error(errors=errors)
except Exception:
log_error(data)
self.raise_error(True)
def sanitize_error_message(self, message):
"""
On validation errors, response message looks something like this:
message = '2174 : For inter-state transaction, CGST and SGST amounts are not applicable; only IGST amount is applicable,
3095 : Supplier GSTIN is inactive'
we search for string between ':' to extract the error messages
errors = [
': For inter-state transaction, CGST and SGST amounts are not applicable; only IGST amount is applicable, 3095 ',
': Test'
]
then we trim down the message by looping over errors
"""
if not message:
return []
errors = re.findall(": [^:]+", message)
for idx, e in enumerate(errors):
# remove colons
errors[idx] = errors[idx].replace(":", "").strip()
# if not last
if idx != len(errors) - 1:
# remove last 7 chars eg: ', 3095 '
errors[idx] = errors[idx][:-6]
return errors
def raise_error(self, raise_exception=False, errors=None):
if errors is None:
errors = []
title = _("E Invoice Request Failed")
if errors:
frappe.throw(errors, title=title, as_list=1)
else:
link_to_error_list = '<a href="/app/error-log" target="_blank">Error Log</a>'
frappe.msgprint(
_(
"An error occurred while making e-invoicing request. Please check {} for more information."
).format(link_to_error_list),
title=title,
raise_exception=raise_exception,
indicator="red",
)
def set_einvoice_data(self, res):
enc_signed_invoice = res.get("SignedInvoice")
dec_signed_invoice = jwt.decode(enc_signed_invoice, options={"verify_signature": False})["data"]
self.invoice.irn = res.get("Irn")
self.invoice.ewaybill = res.get("EwbNo")
self.invoice.eway_bill_validity = res.get("EwbValidTill")
self.invoice.ack_no = res.get("AckNo")
self.invoice.ack_date = res.get("AckDt")
self.invoice.signed_einvoice = dec_signed_invoice
self.invoice.ack_no = res.get("AckNo")
self.invoice.ack_date = res.get("AckDt")
self.invoice.signed_qr_code = res.get("SignedQRCode")
self.invoice.einvoice_status = "Generated"
self.attach_qrcode_image()
self.invoice.flags.updater_reference = {
"doctype": self.invoice.doctype,
"docname": self.invoice.name,
"label": _("IRN Generated"),
}
self.update_invoice()
def attach_qrcode_image(self):
qrcode = self.invoice.signed_qr_code
doctype = self.invoice.doctype
docname = self.invoice.name
filename = "QRCode_{}.png".format(docname).replace(os.path.sep, "__")
qr_image = io.BytesIO()
url = qrcreate(qrcode, error="L")
url.png(qr_image, scale=2, quiet_zone=1)
_file = frappe.get_doc(
{
"doctype": "File",
"file_name": filename,
"attached_to_doctype": doctype,
"attached_to_name": docname,
"attached_to_field": "qrcode_image",
"is_private": 0,
"content": qr_image.getvalue(),
}
)
_file.save()
frappe.db.commit()
self.invoice.qrcode_image = _file.file_url
def update_invoice(self):
self.invoice.flags.ignore_validate_update_after_submit = True
self.invoice.flags.ignore_validate = True
self.invoice.save()
def set_failed_status(self, errors=None):
frappe.db.rollback()
self.invoice.einvoice_status = "Failed"
self.invoice.failure_description = self.get_failure_message(errors) if errors else ""
self.update_invoice()
frappe.db.commit()
def get_failure_message(self, errors):
if isinstance(errors, list):
errors = ", ".join(errors)
return errors
def sanitize_for_json(string):
"""Escape JSON specific characters from a string."""
# json.dumps adds double-quotes to the string. Indexing to remove them.
return json.dumps(string)[1:-1]
@frappe.whitelist()
def get_einvoice(doctype, docname):
invoice = frappe.get_doc(doctype, docname)
return make_einvoice(invoice)
@frappe.whitelist()
def generate_irn(doctype, docname):
gsp_connector = GSPConnector(doctype, docname)
gsp_connector.generate_irn()
@frappe.whitelist()
def cancel_irn(doctype, docname, irn, reason, remark):
gsp_connector = GSPConnector(doctype, docname)
gsp_connector.cancel_irn(irn, reason, remark)
@frappe.whitelist()
def generate_eway_bill(doctype, docname, **kwargs):
gsp_connector = GSPConnector(doctype, docname)
gsp_connector.generate_eway_bill(**kwargs)
@frappe.whitelist()
def cancel_eway_bill(doctype, docname):
# TODO: uncomment when eway_bill api from Adequare is enabled
# gsp_connector = GSPConnector(doctype, docname)
# gsp_connector.cancel_eway_bill(eway_bill, reason, remark)
frappe.db.set_value(doctype, docname, "ewaybill", "")
frappe.db.set_value(doctype, docname, "eway_bill_cancelled", 1)
@frappe.whitelist()
def generate_einvoices(docnames):
docnames = json.loads(docnames) or []
if len(docnames) < 10:
failures = GSPConnector.bulk_generate_irn(docnames)
frappe.local.message_log = []
if failures:
show_bulk_action_failure_message(failures)
success = len(docnames) - len(failures)
frappe.msgprint(
_("{} e-invoices generated successfully").format(success),
title=_("Bulk E-Invoice Generation Complete"),
)
else:
enqueue_bulk_action(schedule_bulk_generate_irn, docnames=docnames)
def schedule_bulk_generate_irn(docnames):
failures = GSPConnector.bulk_generate_irn(docnames)
frappe.local.message_log = []
frappe.publish_realtime(
"bulk_einvoice_generation_complete",
{"user": frappe.session.user, "failures": failures, "invoices": docnames},
)
def show_bulk_action_failure_message(failures):
for doc in failures:
docname = '<a href="sales-invoice/{0}">{0}</a>'.format(doc.get("docname"))
message = doc.get("message").replace("'", '"')
if message[0] == "[":
errors = json.loads(message)
error_list = "".join(["<li>{}</li>".format(err) for err in errors])
message = """{} has following errors:<br>
<ul style="padding-left: 20px; padding-top: 5px">{}</ul>""".format(
docname, error_list
)
else:
message = "{} - {}".format(docname, message)
frappe.msgprint(message, title=_("Bulk E-Invoice Generation Complete"), indicator="red")
@frappe.whitelist()
def cancel_irns(docnames, reason, remark):
docnames = json.loads(docnames) or []
if len(docnames) < 10:
failures = GSPConnector.bulk_cancel_irn(docnames, reason, remark)
frappe.local.message_log = []
if failures:
show_bulk_action_failure_message(failures)
success = len(docnames) - len(failures)
frappe.msgprint(
_("{} e-invoices cancelled successfully").format(success),
title=_("Bulk E-Invoice Cancellation Complete"),
)
else:
enqueue_bulk_action(schedule_bulk_cancel_irn, docnames=docnames, reason=reason, remark=remark)
def schedule_bulk_cancel_irn(docnames, reason, remark):
failures = GSPConnector.bulk_cancel_irn(docnames, reason, remark)
frappe.local.message_log = []
frappe.publish_realtime(
"bulk_einvoice_cancellation_complete",
{"user": frappe.session.user, "failures": failures, "invoices": docnames},
)
def enqueue_bulk_action(job, **kwargs):
check_scheduler_status()
enqueue(
job,
**kwargs,
queue="long",
timeout=10000,
event="processing_bulk_einvoice_action",
now=frappe.conf.developer_mode or frappe.flags.in_test,
)
if job == schedule_bulk_generate_irn:
msg = _("E-Invoices will be generated in a background process.")
else:
msg = _("E-Invoices will be cancelled in a background process.")
frappe.msgprint(msg, alert=1)
def check_scheduler_status():
if is_scheduler_inactive() and not frappe.flags.in_test:
frappe.throw(_("Scheduler is inactive. Cannot enqueue job."), title=_("Scheduler Inactive"))
def job_already_enqueued(job_name):
enqueued_jobs = [d.get("job_name") for d in get_info()]
if job_name in enqueued_jobs:
return True