# -*- coding: utf-8 -*- # Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and contributors # For license information, please see license.txt import base64 import io import json import os import re import sys import traceback import frappe import jwt import requests from frappe import _, bold from frappe.core.page.background_jobs.background_jobs import get_info from frappe.integrations.utils import make_get_request, make_post_request from frappe.utils.background_jobs import enqueue from frappe.utils.data import ( add_to_date, cint, cstr, flt, format_date, get_link_to_form, getdate, now_datetime, time_diff_in_hours, time_diff_in_seconds, ) from frappe.utils.scheduler import is_scheduler_inactive from pyqrcode import create as qrcreate from erpnext.regional.india.utils import get_gst_accounts, get_place_of_supply @frappe.whitelist() def validate_eligibility(doc): if isinstance(doc, str): doc = json.loads(doc) invalid_doctype = doc.get("doctype") != "Sales Invoice" if invalid_doctype: return False einvoicing_enabled = cint(frappe.db.get_single_value("E Invoice Settings", "enable")) if not einvoicing_enabled: return False einvoicing_eligible_from = ( frappe.db.get_single_value("E Invoice Settings", "applicable_from") or "2021-04-01" ) if getdate(doc.get("posting_date")) < getdate(einvoicing_eligible_from): return False invalid_company = not frappe.db.get_value("E Invoice User", {"company": doc.get("company")}) invalid_supply_type = doc.get("gst_category") not in [ "Registered Regular", "SEZ", "Overseas", "Deemed Export", ] company_transaction = doc.get("billing_address_gstin") == doc.get("company_gstin") # if export invoice, then taxes can be empty # invoice can only be ineligible if no taxes applied and is not an export invoice no_taxes_applied = not doc.get("taxes") and not doc.get("gst_category") == "Overseas" has_non_gst_item = any(d for d in doc.get("items", []) if d.get("is_non_gst")) if ( invalid_company or invalid_supply_type or company_transaction or no_taxes_applied or has_non_gst_item ): return False return True def validate_einvoice_fields(doc): invoice_eligible = validate_eligibility(doc) if not invoice_eligible: return if doc.docstatus == 0 and doc._action == "save": if doc.irn: frappe.throw(_("You cannot edit the invoice after generating IRN"), title=_("Edit Not Allowed")) if len(doc.name) > 16: raise_document_name_too_long_error() doc.einvoice_status = "Pending" elif doc.docstatus == 1 and doc._action == "submit" and not doc.irn: frappe.throw(_("You must generate IRN before submitting the document."), title=_("Missing IRN")) elif doc.irn and doc.docstatus == 2 and doc._action == "cancel" and not doc.irn_cancelled: frappe.throw( _("You must cancel IRN before cancelling the document."), title=_("Cancel Not Allowed") ) def raise_document_name_too_long_error(): title = _("Document ID Too Long") msg = _("As you have E-Invoicing enabled, to be able to generate IRN for this invoice") msg += ", " msg += _("document id {} exceed 16 letters.").format(bold(_("should not"))) msg += "

" msg += _("You must {} your {} in order to have document id of {} length 16.").format( bold(_("modify")), bold(_("naming series")), bold(_("maximum")) ) msg += _("Please account for ammended documents too.") frappe.throw(msg, title=title) def read_json(name): file_path = os.path.join(os.path.dirname(__file__), "{name}.json".format(name=name)) with open(file_path, "r") as f: return cstr(f.read()) def get_transaction_details(invoice): supply_type = "" if invoice.gst_category == "Registered Regular": supply_type = "B2B" elif invoice.gst_category == "SEZ": supply_type = "SEZWOP" elif invoice.gst_category == "Overseas": if invoice.export_type == "Without Payment of Tax": supply_type = "EXPWOP" else: supply_type = "EXPWP" elif invoice.gst_category == "Deemed Export": supply_type = "DEXP" if not supply_type: rr, sez, overseas, export = ( bold("Registered Regular"), bold("SEZ"), bold("Overseas"), bold("Deemed Export"), ) frappe.throw( _("GST category should be one of {}, {}, {}, {}").format(rr, sez, overseas, export), title=_("Invalid Supply Type"), ) return frappe._dict( dict(tax_scheme="GST", supply_type=supply_type, reverse_charge=invoice.reverse_charge) ) def get_doc_details(invoice): if getdate(invoice.posting_date) < getdate("2021-01-01"): frappe.throw( _("IRN generation is not allowed for invoices dated before 1st Jan 2021"), title=_("Not Allowed"), ) invoice_type = "CRN" if invoice.is_return else "INV" invoice_name = invoice.name invoice_date = format_date(invoice.posting_date, "dd/mm/yyyy") return frappe._dict( dict(invoice_type=invoice_type, invoice_name=invoice_name, invoice_date=invoice_date) ) def validate_address_fields(address, skip_gstin_validation): if ( (not address.gstin and not skip_gstin_validation) or not address.city or not address.pincode or not address.address_title or not address.address_line1 or not address.gst_state_number ): frappe.throw( msg=_( "Address Lines, City, Pincode, GSTIN are mandatory for address {}. Please set them and try again." ).format(address.name), title=_("Missing Address Fields"), ) if address.address_line2 and len(address.address_line2) < 2: # to prevent "The field Address 2 must be a string with a minimum length of 3 and a maximum length of 100" address.address_line2 = "" def get_party_details(address_name, skip_gstin_validation=False): addr = frappe.get_doc("Address", address_name) validate_address_fields(addr, skip_gstin_validation) if addr.gst_state_number == 97: # according to einvoice standard addr.pincode = 999999 party_address_details = frappe._dict( dict( legal_name=sanitize_for_json(addr.address_title), location=sanitize_for_json(addr.city), pincode=addr.pincode, gstin=addr.gstin, state_code=addr.gst_state_number, address_line1=sanitize_for_json(addr.address_line1), address_line2=sanitize_for_json(addr.address_line2), ) ) return party_address_details def get_overseas_address_details(address_name): address_title, address_line1, address_line2, city = frappe.db.get_value( "Address", address_name, ["address_title", "address_line1", "address_line2", "city"] ) if not address_title or not address_line1 or not city: frappe.throw( msg=_( "Address lines and city is mandatory for address {}. Please set them and try again." ).format(get_link_to_form("Address", address_name)), title=_("Missing Address Fields"), ) return frappe._dict( dict( gstin="URP", legal_name=sanitize_for_json(address_title), location=city, address_line1=sanitize_for_json(address_line1), address_line2=sanitize_for_json(address_line2), pincode=999999, state_code=96, place_of_supply=96, ) ) def get_item_list(invoice): item_list = [] for d in invoice.items: einvoice_item_schema = read_json("einv_item_template") item = frappe._dict({}) item.update(d.as_dict()) item.sr_no = d.idx item.description = sanitize_for_json(d.item_name) item.qty = abs(item.qty) if flt(item.qty) != 0.0: item.unit_rate = abs(item.taxable_value / item.qty) else: item.unit_rate = abs(item.taxable_value) item.gross_amount = abs(item.taxable_value) item.taxable_value = abs(item.taxable_value) item.discount_amount = 0 item.is_service_item = "Y" if item.gst_hsn_code and item.gst_hsn_code[:2] == "99" else "N" item.serial_no = "" item = update_item_taxes(invoice, item) item.total_value = abs( item.taxable_value + item.igst_amount + item.sgst_amount + item.cgst_amount + item.cess_amount + item.cess_nadv_amount + item.other_charges ) einv_item = einvoice_item_schema.format(item=item) item_list.append(einv_item) return ", ".join(item_list) def update_item_taxes(invoice, item): gst_accounts = get_gst_accounts(invoice.company) gst_accounts_list = [d for accounts in gst_accounts.values() for d in accounts if d] for attr in [ "tax_rate", "cess_rate", "cess_nadv_amount", "cgst_amount", "sgst_amount", "igst_amount", "cess_amount", "cess_nadv_amount", "other_charges", ]: item[attr] = 0 for t in invoice.taxes: is_applicable = t.tax_amount and t.account_head in gst_accounts_list if is_applicable: # this contains item wise tax rate & tax amount (incl. discount) item_tax_detail = json.loads(t.item_wise_tax_detail).get(item.item_code or item.item_name) item_tax_rate = item_tax_detail[0] # item tax amount excluding discount amount item_tax_amount = (item_tax_rate / 100) * item.taxable_value if t.account_head in gst_accounts.cess_account: item_tax_amount_after_discount = item_tax_detail[1] if t.charge_type == "On Item Quantity": item.cess_nadv_amount += abs(item_tax_amount_after_discount) else: item.cess_rate += item_tax_rate item.cess_amount += abs(item_tax_amount_after_discount) for tax_type in ["igst", "cgst", "sgst", "utgst"]: if t.account_head in gst_accounts[f"{tax_type}_account"]: item.tax_rate += item_tax_rate if tax_type == "utgst": # utgst taxes are reported same as sgst tax item["sgst_amount"] += abs(item_tax_amount) else: item[f"{tax_type}_amount"] += abs(item_tax_amount) else: # TODO: other charges per item pass return item def get_invoice_value_details(invoice): invoice_value_details = frappe._dict(dict()) invoice_value_details.base_total = abs(sum([i.taxable_value for i in invoice.get("items")])) invoice_value_details.invoice_discount_amt = 0 invoice_value_details.round_off = invoice.base_rounding_adjustment invoice_value_details.base_grand_total = abs(invoice.base_rounded_total) or abs( invoice.base_grand_total ) invoice_value_details.grand_total = abs(invoice.rounded_total) or abs(invoice.grand_total) invoice_value_details = update_invoice_taxes(invoice, invoice_value_details) return invoice_value_details def update_invoice_taxes(invoice, invoice_value_details): gst_accounts = get_gst_accounts(invoice.company) gst_accounts_list = [d for accounts in gst_accounts.values() for d in accounts if d] invoice_value_details.total_cgst_amt = 0 invoice_value_details.total_sgst_amt = 0 invoice_value_details.total_igst_amt = 0 invoice_value_details.total_cess_amt = 0 invoice_value_details.total_other_charges = 0 considered_rows = [] for t in invoice.taxes: tax_amount = t.base_tax_amount_after_discount_amount if t.account_head in gst_accounts_list: if t.account_head in gst_accounts.cess_account: # using after discount amt since item also uses after discount amt for cess calc invoice_value_details.total_cess_amt += abs(t.base_tax_amount_after_discount_amount) for tax_type in ["igst", "cgst", "sgst", "utgst"]: if t.account_head in gst_accounts[f"{tax_type}_account"]: if tax_type == "utgst": invoice_value_details["total_sgst_amt"] += abs(tax_amount) else: invoice_value_details[f"total_{tax_type}_amt"] += abs(tax_amount) update_other_charges(t, invoice_value_details, gst_accounts_list, invoice, considered_rows) else: invoice_value_details.total_other_charges += abs(tax_amount) return invoice_value_details def update_other_charges( tax_row, invoice_value_details, gst_accounts_list, invoice, considered_rows ): prev_row_id = cint(tax_row.row_id) - 1 if tax_row.account_head in gst_accounts_list and prev_row_id not in considered_rows: if tax_row.charge_type == "On Previous Row Amount": amount = invoice.get("taxes")[prev_row_id].tax_amount_after_discount_amount invoice_value_details.total_other_charges -= abs(amount) considered_rows.append(prev_row_id) if tax_row.charge_type == "On Previous Row Total": amount = invoice.get("taxes")[prev_row_id].base_total - invoice.base_net_total invoice_value_details.total_other_charges -= abs(amount) considered_rows.append(prev_row_id) def get_payment_details(invoice): payee_name = invoice.company mode_of_payment = "" paid_amount = invoice.base_paid_amount outstanding_amount = invoice.outstanding_amount return frappe._dict( dict( payee_name=payee_name, mode_of_payment=mode_of_payment, paid_amount=paid_amount, outstanding_amount=outstanding_amount, ) ) def get_return_doc_reference(invoice): invoice_date = frappe.db.get_value("Sales Invoice", invoice.return_against, "posting_date") return frappe._dict( dict(invoice_name=invoice.return_against, invoice_date=format_date(invoice_date, "dd/mm/yyyy")) ) def get_eway_bill_details(invoice): if invoice.is_return: frappe.throw( _( "E-Way Bill cannot be generated for Credit Notes & Debit Notes. Please clear fields in the Transporter Section of the invoice." ), title=_("Invalid Fields"), ) mode_of_transport = {"": "", "Road": "1", "Air": "2", "Rail": "3", "Ship": "4"} vehicle_type = {"Regular": "R", "Over Dimensional Cargo (ODC)": "O"} return frappe._dict( dict( gstin=invoice.gst_transporter_id, name=invoice.transporter_name, mode_of_transport=mode_of_transport[invoice.mode_of_transport], distance=invoice.distance or 0, document_name=invoice.lr_no, document_date=format_date(invoice.lr_date, "dd/mm/yyyy"), vehicle_no=invoice.vehicle_no, vehicle_type=vehicle_type[invoice.gst_vehicle_type], ) ) def validate_mandatory_fields(invoice): if not invoice.company_address: frappe.throw( _( "Company Address is mandatory to fetch company GSTIN details. Please set Company Address and try again." ), title=_("Missing Fields"), ) if not invoice.customer_address: frappe.throw( _( "Customer Address is mandatory to fetch customer GSTIN details. Please set Company Address and try again." ), title=_("Missing Fields"), ) if not frappe.db.get_value("Address", invoice.company_address, "gstin"): frappe.throw( _( "GSTIN is mandatory to fetch company GSTIN details. Please enter GSTIN in selected company address." ), title=_("Missing Fields"), ) if invoice.gst_category != "Overseas" and not frappe.db.get_value( "Address", invoice.customer_address, "gstin" ): frappe.throw( _( "GSTIN is mandatory to fetch customer GSTIN details. Please enter GSTIN in selected customer address." ), title=_("Missing Fields"), ) def validate_totals(einvoice): item_list = einvoice["ItemList"] value_details = einvoice["ValDtls"] total_item_ass_value = 0 total_item_cgst_value = 0 total_item_sgst_value = 0 total_item_igst_value = 0 total_item_value = 0 for item in item_list: total_item_ass_value += flt(item["AssAmt"]) total_item_cgst_value += flt(item["CgstAmt"]) total_item_sgst_value += flt(item["SgstAmt"]) total_item_igst_value += flt(item["IgstAmt"]) total_item_value += flt(item["TotItemVal"]) if ( abs(flt(item["AssAmt"]) * flt(item["GstRt"]) / 100) - (flt(item["CgstAmt"]) + flt(item["SgstAmt"]) + flt(item["IgstAmt"])) > 1 ): frappe.throw( _( "Row #{}: GST rate is invalid. Please remove tax rows with zero tax amount from taxes table." ).format(item.idx) ) if abs(flt(value_details["AssVal"]) - total_item_ass_value) > 1: frappe.throw( _( "Total Taxable Value of the items is not equal to the Invoice Net Total. Please check item taxes / discounts for any correction." ) ) if ( abs( flt(value_details["CgstVal"]) + flt(value_details["SgstVal"]) - total_item_cgst_value - total_item_sgst_value ) > 1 ): frappe.throw( _( "CGST + SGST value of the items is not equal to total CGST + SGST value. Please review taxes for any correction." ) ) if abs(flt(value_details["IgstVal"]) - total_item_igst_value) > 1: frappe.throw( _( "IGST value of all items is not equal to total IGST value. Please review taxes for any correction." ) ) if ( abs( flt(value_details["TotInvVal"]) + flt(value_details["Discount"]) - flt(value_details["OthChrg"]) - flt(value_details["RndOffAmt"]) - total_item_value ) > 1 ): frappe.throw( _( "Total Value of the items is not equal to the Invoice Grand Total. Please check item taxes / discounts for any correction." ) ) calculated_invoice_value = ( flt(value_details["AssVal"]) + flt(value_details["CgstVal"]) + flt(value_details["SgstVal"]) + flt(value_details["IgstVal"]) + flt(value_details["CesVal"]) + flt(value_details["OthChrg"]) + flt(value_details["RndOffAmt"]) - flt(value_details["Discount"]) ) if abs(flt(value_details["TotInvVal"]) - calculated_invoice_value) > 1: frappe.throw( _( "Total Item Value + Taxes - Discount is not equal to the Invoice Grand Total. Please check taxes / discounts for any correction." ) ) def make_einvoice(invoice): validate_mandatory_fields(invoice) schema = read_json("einv_template") transaction_details = get_transaction_details(invoice) item_list = get_item_list(invoice) doc_details = get_doc_details(invoice) invoice_value_details = get_invoice_value_details(invoice) seller_details = get_party_details(invoice.company_address) if invoice.gst_category == "Overseas": buyer_details = get_overseas_address_details(invoice.customer_address) else: buyer_details = get_party_details(invoice.customer_address) place_of_supply = get_place_of_supply(invoice, invoice.doctype) if place_of_supply: place_of_supply = place_of_supply.split("-")[0] else: place_of_supply = sanitize_for_json(invoice.billing_address_gstin)[:2] buyer_details.update(dict(place_of_supply=place_of_supply)) seller_details.update(dict(legal_name=invoice.company)) buyer_details.update(dict(legal_name=invoice.customer_name or invoice.customer)) shipping_details = payment_details = prev_doc_details = eway_bill_details = frappe._dict({}) if invoice.shipping_address_name and invoice.customer_address != invoice.shipping_address_name: if invoice.gst_category == "Overseas": shipping_details = get_overseas_address_details(invoice.shipping_address_name) else: shipping_details = get_party_details(invoice.shipping_address_name, skip_gstin_validation=True) dispatch_details = frappe._dict({}) if invoice.dispatch_address_name: dispatch_details = get_party_details(invoice.dispatch_address_name, skip_gstin_validation=True) if invoice.is_pos and invoice.base_paid_amount: payment_details = get_payment_details(invoice) if invoice.is_return and invoice.return_against: prev_doc_details = get_return_doc_reference(invoice) if invoice.transporter and not invoice.is_return: eway_bill_details = get_eway_bill_details(invoice) # not yet implemented period_details = export_details = frappe._dict({}) einvoice = schema.format( transaction_details=transaction_details, doc_details=doc_details, dispatch_details=dispatch_details, seller_details=seller_details, buyer_details=buyer_details, shipping_details=shipping_details, item_list=item_list, invoice_value_details=invoice_value_details, payment_details=payment_details, period_details=period_details, prev_doc_details=prev_doc_details, export_details=export_details, eway_bill_details=eway_bill_details, ) try: einvoice = safe_json_load(einvoice) einvoice = santize_einvoice_fields(einvoice) except Exception: show_link_to_error_log(invoice, einvoice) try: validate_totals(einvoice) except Exception: log_error(einvoice) raise return einvoice def show_link_to_error_log(invoice, einvoice): err_log = log_error(einvoice) link_to_error_log = get_link_to_form("Error Log", err_log.name, "Error Log") frappe.throw( _( "An error occurred while creating e-invoice for {}. Please check {} for more information." ).format(invoice.name, link_to_error_log), title=_("E Invoice Creation Failed"), ) def log_error(data=None): if isinstance(data, str): data = json.loads(data) seperator = "--" * 50 err_tb = traceback.format_exc() err_msg = str(sys.exc_info()[1]) data = json.dumps(data, indent=4) message = "\n".join(["Error", err_msg, seperator, "Data:", data, seperator, "Exception:", err_tb]) return frappe.log_error(title=_("E Invoice Request Failed"), message=message) def santize_einvoice_fields(einvoice): int_fields = ["Pin", "Distance", "CrDay"] float_fields = [ "Qty", "FreeQty", "UnitPrice", "TotAmt", "Discount", "PreTaxVal", "AssAmt", "GstRt", "IgstAmt", "CgstAmt", "SgstAmt", "CesRt", "CesAmt", "CesNonAdvlAmt", "StateCesRt", "StateCesAmt", "StateCesNonAdvlAmt", "OthChrg", "TotItemVal", "AssVal", "CgstVal", "SgstVal", "IgstVal", "CesVal", "StCesVal", "Discount", "OthChrg", "RndOffAmt", "TotInvVal", "TotInvValFc", "PaidAmt", "PaymtDue", "ExpDuty", ] copy = einvoice.copy() for key, value in copy.items(): if isinstance(value, list): for idx, d in enumerate(value): santized_dict = santize_einvoice_fields(d) if santized_dict: einvoice[key][idx] = santized_dict else: einvoice[key].pop(idx) if not einvoice[key]: einvoice.pop(key, None) elif isinstance(value, dict): santized_dict = santize_einvoice_fields(value) if santized_dict: einvoice[key] = santized_dict else: einvoice.pop(key, None) elif not value or value == "None": einvoice.pop(key, None) elif key in float_fields: einvoice[key] = flt(value, 2) elif key in int_fields: einvoice[key] = cint(value) return einvoice def safe_json_load(json_string): try: return json.loads(json_string) except json.JSONDecodeError as e: # print a snippet of 40 characters around the location where error occured pos = e.pos start, end = max(0, pos - 20), min(len(json_string) - 1, pos + 20) snippet = json_string[start:end] frappe.throw( _( "Error in input data. Please check for any special characters near following input:
{}" ).format(snippet) ) class RequestFailed(Exception): pass class CancellationNotAllowed(Exception): pass class GSPConnector: def __init__(self, doctype=None, docname=None): self.doctype = doctype self.docname = docname self.set_invoice() self.set_credentials() # authenticate url is same for sandbox & live self.authenticate_url = "https://gsp.adaequare.com/gsp/authenticate?grant_type=token" self.base_url = ( "https://gsp.adaequare.com" if not self.e_invoice_settings.sandbox_mode else "https://gsp.adaequare.com/test" ) self.cancel_irn_url = self.base_url + "/enriched/ei/api/invoice/cancel" self.irn_details_url = self.base_url + "/enriched/ei/api/invoice/irn" self.generate_irn_url = self.base_url + "/enriched/ei/api/invoice" self.gstin_details_url = self.base_url + "/enriched/ei/api/master/gstin" self.cancel_ewaybill_url = self.base_url + "/enriched/ewb/ewayapi?action=CANEWB" self.generate_ewaybill_url = self.base_url + "/enriched/ei/api/ewaybill" def set_invoice(self): self.invoice = None if self.doctype and self.docname: self.invoice = frappe.get_cached_doc(self.doctype, self.docname) def set_credentials(self): self.e_invoice_settings = frappe.get_cached_doc("E Invoice Settings") if not self.e_invoice_settings.enable: frappe.throw( _("E-Invoicing is disabled. Please enable it from {} to generate e-invoices.").format( get_link_to_form("E Invoice Settings", "E Invoice Settings") ) ) if self.invoice: gstin = self.get_seller_gstin() credentials_for_gstin = [d for d in self.e_invoice_settings.credentials if d.gstin == gstin] if credentials_for_gstin: self.credentials = credentials_for_gstin[0] else: frappe.throw( _( "Cannot find e-invoicing credentials for selected Company GSTIN. Please check E-Invoice Settings" ) ) else: self.credentials = ( self.e_invoice_settings.credentials[0] if self.e_invoice_settings.credentials else None ) def get_seller_gstin(self): gstin = frappe.db.get_value("Address", self.invoice.company_address, "gstin") if not gstin: frappe.throw( _("Cannot retrieve Company GSTIN. Please select company address with valid GSTIN.") ) return gstin def get_auth_token(self): if time_diff_in_seconds(self.e_invoice_settings.token_expiry, now_datetime()) < 150.0: self.fetch_auth_token() return self.e_invoice_settings.auth_token def make_request(self, request_type, url, headers=None, data=None): try: if request_type == "post": res = make_post_request(url, headers=headers, data=data) else: res = make_get_request(url, headers=headers, data=data) except requests.exceptions.HTTPError as e: if e.response.status_code in [401, 403] and not hasattr(self, "token_auto_refreshed"): self.auto_refresh_token() headers = self.get_headers() return self.make_request(request_type, url, headers, data) self.log_request(url, headers, data, res) return res def auto_refresh_token(self): self.fetch_auth_token() self.token_auto_refreshed = True def log_request(self, url, headers, data, res): headers.update({"password": self.credentials.password}) request_log = frappe.get_doc( { "doctype": "E Invoice Request Log", "user": frappe.session.user, "reference_invoice": self.invoice.name if self.invoice else None, "url": url, "headers": json.dumps(headers, indent=4) if headers else None, "data": json.dumps(data, indent=4) if isinstance(data, dict) else data, "response": json.dumps(res, indent=4) if res else None, } ) request_log.save(ignore_permissions=True) frappe.db.commit() def get_client_credentials(self): if self.e_invoice_settings.client_id and self.e_invoice_settings.client_secret: return self.e_invoice_settings.client_id, self.e_invoice_settings.get_password("client_secret") return frappe.conf.einvoice_client_id, frappe.conf.einvoice_client_secret def fetch_auth_token(self): client_id, client_secret = self.get_client_credentials() headers = {"gspappid": client_id, "gspappsecret": client_secret} res = {} try: res = self.make_request("post", self.authenticate_url, headers) self.e_invoice_settings.auth_token = "{} {}".format( res.get("token_type"), res.get("access_token") ) self.e_invoice_settings.token_expiry = add_to_date(None, seconds=res.get("expires_in")) self.e_invoice_settings.save(ignore_permissions=True) self.e_invoice_settings.reload() except Exception: log_error(res) self.raise_error(True) def get_headers(self): return { "content-type": "application/json", "user_name": self.credentials.username, "password": self.credentials.get_password(), "gstin": self.credentials.gstin, "authorization": self.get_auth_token(), "requestid": str(base64.b64encode(os.urandom(18))), } def fetch_gstin_details(self, gstin): headers = self.get_headers() try: params = "?gstin={gstin}".format(gstin=gstin) res = self.make_request("get", self.gstin_details_url + params, headers) if res.get("success"): return res.get("result") else: log_error(res) raise RequestFailed except RequestFailed: self.raise_error() except Exception: log_error() self.raise_error(True) @staticmethod def get_gstin_details(gstin): """fetch and cache GSTIN details""" if not hasattr(frappe.local, "gstin_cache"): frappe.local.gstin_cache = {} key = gstin gsp_connector = GSPConnector() details = gsp_connector.fetch_gstin_details(gstin) frappe.local.gstin_cache[key] = details frappe.cache().hset("gstin_cache", key, details) return details def generate_irn(self): data = {} try: headers = self.get_headers() einvoice = make_einvoice(self.invoice) data = json.dumps(einvoice, indent=4) res = self.make_request("post", self.generate_irn_url, headers, data) if res.get("success"): self.set_einvoice_data(res.get("result")) elif "2150" in res.get("message"): # IRN already generated but not updated in invoice # Extract the IRN from the response description and fetch irn details irn = res.get("result")[0].get("Desc").get("Irn") irn_details = self.get_irn_details(irn) if irn_details: self.set_einvoice_data(irn_details) else: raise RequestFailed( "IRN has already been generated for the invoice but cannot fetch details for the it. \ Contact ERPNext support to resolve the issue." ) else: raise RequestFailed except RequestFailed: errors = self.sanitize_error_message(res.get("message")) self.set_failed_status(errors=errors) self.raise_error(errors=errors) except Exception as e: self.set_failed_status(errors=str(e)) log_error(data) self.raise_error(True) @staticmethod def bulk_generate_irn(invoices): gsp_connector = GSPConnector() gsp_connector.doctype = "Sales Invoice" failed = [] for invoice in invoices: try: gsp_connector.docname = invoice gsp_connector.set_invoice() gsp_connector.set_credentials() gsp_connector.generate_irn() except Exception as e: failed.append({"docname": invoice, "message": str(e)}) return failed def get_irn_details(self, irn): headers = self.get_headers() try: params = "?irn={irn}".format(irn=irn) res = self.make_request("get", self.irn_details_url + params, headers) if res.get("success"): return res.get("result") else: raise RequestFailed except RequestFailed: errors = self.sanitize_error_message(res.get("message")) self.raise_error(errors=errors) except Exception: log_error() self.raise_error(True) def cancel_irn(self, irn, reason, remark): data, res = {}, {} try: # validate cancellation if time_diff_in_hours(now_datetime(), self.invoice.ack_date) > 24: frappe.throw( _("E-Invoice cannot be cancelled after 24 hours of IRN generation."), title=_("Not Allowed"), exc=CancellationNotAllowed, ) if not irn: frappe.throw( _("IRN not found. You must generate IRN before cancelling."), title=_("Not Allowed"), exc=CancellationNotAllowed, ) headers = self.get_headers() data = json.dumps({"Irn": irn, "Cnlrsn": reason, "Cnlrem": remark}, indent=4) res = self.make_request("post", self.cancel_irn_url, headers, data) if res.get("success") or "9999" in res.get("message"): self.invoice.irn_cancelled = 1 self.invoice.irn_cancel_date = res.get("result")["CancelDate"] if res.get("result") else "" self.invoice.einvoice_status = "Cancelled" self.invoice.flags.updater_reference = { "doctype": self.invoice.doctype, "docname": self.invoice.name, "label": _("IRN Cancelled - {}").format(remark), } self.update_invoice() else: raise RequestFailed except RequestFailed: errors = self.sanitize_error_message(res.get("message")) self.set_failed_status(errors=errors) self.raise_error(errors=errors) except CancellationNotAllowed as e: self.set_failed_status(errors=str(e)) self.raise_error(errors=str(e)) except Exception as e: self.set_failed_status(errors=str(e)) log_error(data) self.raise_error(True) @staticmethod def bulk_cancel_irn(invoices, reason, remark): gsp_connector = GSPConnector() gsp_connector.doctype = "Sales Invoice" failed = [] for invoice in invoices: try: gsp_connector.docname = invoice gsp_connector.set_invoice() gsp_connector.set_credentials() irn = gsp_connector.invoice.irn gsp_connector.cancel_irn(irn, reason, remark) except Exception as e: failed.append({"docname": invoice, "message": str(e)}) return failed def generate_eway_bill(self, **kwargs): args = frappe._dict(kwargs) headers = self.get_headers() eway_bill_details = get_eway_bill_details(args) data = json.dumps( { "Irn": args.irn, "Distance": cint(eway_bill_details.distance), "TransMode": eway_bill_details.mode_of_transport, "TransId": eway_bill_details.gstin, "TransName": eway_bill_details.name, "TrnDocDt": eway_bill_details.document_date, "TrnDocNo": eway_bill_details.document_name, "VehNo": eway_bill_details.vehicle_no, "VehType": eway_bill_details.vehicle_type, }, indent=4, ) try: res = self.make_request("post", self.generate_ewaybill_url, headers, data) if res.get("success"): self.invoice.ewaybill = res.get("result").get("EwbNo") self.invoice.eway_bill_validity = res.get("result").get("EwbValidTill") self.invoice.eway_bill_cancelled = 0 self.invoice.update(args) self.invoice.flags.updater_reference = { "doctype": self.invoice.doctype, "docname": self.invoice.name, "label": _("E-Way Bill Generated"), } self.update_invoice() else: raise RequestFailed except RequestFailed: errors = self.sanitize_error_message(res.get("message")) self.raise_error(errors=errors) except Exception: log_error(data) self.raise_error(True) def cancel_eway_bill(self, eway_bill, reason, remark): headers = self.get_headers() data = json.dumps({"ewbNo": eway_bill, "cancelRsnCode": reason, "cancelRmrk": remark}, indent=4) headers["username"] = headers["user_name"] del headers["user_name"] try: res = self.make_request("post", self.cancel_ewaybill_url, headers, data) if res.get("success"): self.invoice.ewaybill = "" self.invoice.eway_bill_cancelled = 1 self.invoice.flags.updater_reference = { "doctype": self.invoice.doctype, "docname": self.invoice.name, "label": _("E-Way Bill Cancelled - {}").format(remark), } self.update_invoice() else: raise RequestFailed except RequestFailed: errors = self.sanitize_error_message(res.get("message")) self.raise_error(errors=errors) except Exception: log_error(data) self.raise_error(True) def sanitize_error_message(self, message): """ On validation errors, response message looks something like this: message = '2174 : For inter-state transaction, CGST and SGST amounts are not applicable; only IGST amount is applicable, 3095 : Supplier GSTIN is inactive' we search for string between ':' to extract the error messages errors = [ ': For inter-state transaction, CGST and SGST amounts are not applicable; only IGST amount is applicable, 3095 ', ': Test' ] then we trim down the message by looping over errors """ if not message: return [] errors = re.findall(": [^:]+", message) for idx, e in enumerate(errors): # remove colons errors[idx] = errors[idx].replace(":", "").strip() # if not last if idx != len(errors) - 1: # remove last 7 chars eg: ', 3095 ' errors[idx] = errors[idx][:-6] return errors def raise_error(self, raise_exception=False, errors=None): if errors is None: errors = [] title = _("E Invoice Request Failed") if errors: frappe.throw(errors, title=title, as_list=1) else: link_to_error_list = 'Error Log' frappe.msgprint( _( "An error occurred while making e-invoicing request. Please check {} for more information." ).format(link_to_error_list), title=title, raise_exception=raise_exception, indicator="red", ) def set_einvoice_data(self, res): enc_signed_invoice = res.get("SignedInvoice") dec_signed_invoice = jwt.decode(enc_signed_invoice, options={"verify_signature": False})["data"] self.invoice.irn = res.get("Irn") self.invoice.ewaybill = res.get("EwbNo") self.invoice.eway_bill_validity = res.get("EwbValidTill") self.invoice.ack_no = res.get("AckNo") self.invoice.ack_date = res.get("AckDt") self.invoice.signed_einvoice = dec_signed_invoice self.invoice.ack_no = res.get("AckNo") self.invoice.ack_date = res.get("AckDt") self.invoice.signed_qr_code = res.get("SignedQRCode") self.invoice.einvoice_status = "Generated" self.attach_qrcode_image() self.invoice.flags.updater_reference = { "doctype": self.invoice.doctype, "docname": self.invoice.name, "label": _("IRN Generated"), } self.update_invoice() def attach_qrcode_image(self): qrcode = self.invoice.signed_qr_code doctype = self.invoice.doctype docname = self.invoice.name filename = "QRCode_{}.png".format(docname).replace(os.path.sep, "__") qr_image = io.BytesIO() url = qrcreate(qrcode, error="L") url.png(qr_image, scale=2, quiet_zone=1) _file = frappe.get_doc( { "doctype": "File", "file_name": filename, "attached_to_doctype": doctype, "attached_to_name": docname, "attached_to_field": "qrcode_image", "is_private": 0, "content": qr_image.getvalue(), } ) _file.save() frappe.db.commit() self.invoice.qrcode_image = _file.file_url def update_invoice(self): self.invoice.flags.ignore_validate_update_after_submit = True self.invoice.flags.ignore_validate = True self.invoice.save() def set_failed_status(self, errors=None): frappe.db.rollback() self.invoice.einvoice_status = "Failed" self.invoice.failure_description = self.get_failure_message(errors) if errors else "" self.update_invoice() frappe.db.commit() def get_failure_message(self, errors): if isinstance(errors, list): errors = ", ".join(errors) return errors def sanitize_for_json(string): """Escape JSON specific characters from a string.""" # json.dumps adds double-quotes to the string. Indexing to remove them. return json.dumps(string)[1:-1] @frappe.whitelist() def get_einvoice(doctype, docname): invoice = frappe.get_doc(doctype, docname) return make_einvoice(invoice) @frappe.whitelist() def generate_irn(doctype, docname): gsp_connector = GSPConnector(doctype, docname) gsp_connector.generate_irn() @frappe.whitelist() def cancel_irn(doctype, docname, irn, reason, remark): gsp_connector = GSPConnector(doctype, docname) gsp_connector.cancel_irn(irn, reason, remark) @frappe.whitelist() def generate_eway_bill(doctype, docname, **kwargs): gsp_connector = GSPConnector(doctype, docname) gsp_connector.generate_eway_bill(**kwargs) @frappe.whitelist() def cancel_eway_bill(doctype, docname): # TODO: uncomment when eway_bill api from Adequare is enabled # gsp_connector = GSPConnector(doctype, docname) # gsp_connector.cancel_eway_bill(eway_bill, reason, remark) frappe.db.set_value(doctype, docname, "ewaybill", "") frappe.db.set_value(doctype, docname, "eway_bill_cancelled", 1) @frappe.whitelist() def generate_einvoices(docnames): docnames = json.loads(docnames) or [] if len(docnames) < 10: failures = GSPConnector.bulk_generate_irn(docnames) frappe.local.message_log = [] if failures: show_bulk_action_failure_message(failures) success = len(docnames) - len(failures) frappe.msgprint( _("{} e-invoices generated successfully").format(success), title=_("Bulk E-Invoice Generation Complete"), ) else: enqueue_bulk_action(schedule_bulk_generate_irn, docnames=docnames) def schedule_bulk_generate_irn(docnames): failures = GSPConnector.bulk_generate_irn(docnames) frappe.local.message_log = [] frappe.publish_realtime( "bulk_einvoice_generation_complete", {"user": frappe.session.user, "failures": failures, "invoices": docnames}, ) def show_bulk_action_failure_message(failures): for doc in failures: docname = '{0}'.format(doc.get("docname")) message = doc.get("message").replace("'", '"') if message[0] == "[": errors = json.loads(message) error_list = "".join(["
  • {}
  • ".format(err) for err in errors]) message = """{} has following errors:
    """.format( docname, error_list ) else: message = "{} - {}".format(docname, message) frappe.msgprint(message, title=_("Bulk E-Invoice Generation Complete"), indicator="red") @frappe.whitelist() def cancel_irns(docnames, reason, remark): docnames = json.loads(docnames) or [] if len(docnames) < 10: failures = GSPConnector.bulk_cancel_irn(docnames, reason, remark) frappe.local.message_log = [] if failures: show_bulk_action_failure_message(failures) success = len(docnames) - len(failures) frappe.msgprint( _("{} e-invoices cancelled successfully").format(success), title=_("Bulk E-Invoice Cancellation Complete"), ) else: enqueue_bulk_action(schedule_bulk_cancel_irn, docnames=docnames, reason=reason, remark=remark) def schedule_bulk_cancel_irn(docnames, reason, remark): failures = GSPConnector.bulk_cancel_irn(docnames, reason, remark) frappe.local.message_log = [] frappe.publish_realtime( "bulk_einvoice_cancellation_complete", {"user": frappe.session.user, "failures": failures, "invoices": docnames}, ) def enqueue_bulk_action(job, **kwargs): check_scheduler_status() enqueue( job, **kwargs, queue="long", timeout=10000, event="processing_bulk_einvoice_action", now=frappe.conf.developer_mode or frappe.flags.in_test, ) if job == schedule_bulk_generate_irn: msg = _("E-Invoices will be generated in a background process.") else: msg = _("E-Invoices will be cancelled in a background process.") frappe.msgprint(msg, alert=1) def check_scheduler_status(): if is_scheduler_inactive() and not frappe.flags.in_test: frappe.throw(_("Scheduler is inactive. Cannot enqueue job."), title=_("Scheduler Inactive")) def job_already_enqueued(job_name): enqueued_jobs = [d.get("job_name") for d in get_info()] if job_name in enqueued_jobs: return True