# -*- coding: utf-8 -*- # Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and contributors # For license information, please see license.txt from __future__ import unicode_literals import base64 import io import json import os import re import sys import traceback import frappe import jwt import six from frappe import _, bold from frappe.core.page.background_jobs.background_jobs import get_info from frappe.integrations.utils import make_get_request, make_post_request from frappe.utils.background_jobs import enqueue from frappe.utils.data import ( add_to_date, cint, cstr, flt, format_date, get_link_to_form, getdate, now_datetime, time_diff_in_hours, time_diff_in_seconds, ) from frappe.utils.scheduler import is_scheduler_inactive from pyqrcode import create as qrcreate from erpnext.regional.india.utils import get_gst_accounts, get_place_of_supply @frappe.whitelist() def validate_eligibility(doc): if isinstance(doc, six.string_types): doc = json.loads(doc) invalid_doctype = doc.get('doctype') != 'Sales Invoice' if invalid_doctype: return False einvoicing_enabled = cint(frappe.db.get_single_value('E Invoice Settings', 'enable')) if not einvoicing_enabled: return False einvoicing_eligible_from = frappe.db.get_single_value('E Invoice Settings', 'applicable_from') or '2021-04-01' if getdate(doc.get('posting_date')) < getdate(einvoicing_eligible_from): return False invalid_company = not frappe.db.get_value('E Invoice User', { 'company': doc.get('company') }) invalid_supply_type = doc.get('gst_category') not in ['Registered Regular', 'SEZ', 'Overseas', 'Deemed Export'] company_transaction = doc.get('billing_address_gstin') == doc.get('company_gstin') # if export invoice, then taxes can be empty # invoice can only be ineligible if no taxes applied and is not an export invoice no_taxes_applied = not doc.get('taxes') and not doc.get('gst_category') == 'Overseas' has_non_gst_item = any(d for d in doc.get('items', []) if d.get('is_non_gst')) if invalid_company or invalid_supply_type or company_transaction or no_taxes_applied or has_non_gst_item: return False return True def validate_einvoice_fields(doc): invoice_eligible = validate_eligibility(doc) if not invoice_eligible: return if doc.docstatus == 0 and doc._action == 'save': if doc.irn: frappe.throw(_('You cannot edit the invoice after generating IRN'), title=_('Edit Not Allowed')) if len(doc.name) > 16: raise_document_name_too_long_error() doc.einvoice_status = 'Pending' elif doc.docstatus == 1 and doc._action == 'submit' and not doc.irn: frappe.throw(_('You must generate IRN before submitting the document.'), title=_('Missing IRN')) elif doc.irn and doc.docstatus == 2 and doc._action == 'cancel' and not doc.irn_cancelled: frappe.throw(_('You must cancel IRN before cancelling the document.'), title=_('Cancel Not Allowed')) def raise_document_name_too_long_error(): title = _('Document ID Too Long') msg = _('As you have E-Invoicing enabled, to be able to generate IRN for this invoice') msg += ', ' msg += _('document id {} exceed 16 letters.').format(bold(_('should not'))) msg += '

' msg += _('You must {} your {} in order to have document id of {} length 16.').format( bold(_('modify')), bold(_('naming series')), bold(_('maximum')) ) msg += _('Please account for ammended documents too.') frappe.throw(msg, title=title) def read_json(name): file_path = os.path.join(os.path.dirname(__file__), '{name}.json'.format(name=name)) with open(file_path, 'r') as f: return cstr(f.read()) def get_transaction_details(invoice): supply_type = '' if invoice.gst_category == 'Registered Regular': supply_type = 'B2B' elif invoice.gst_category == 'SEZ': supply_type = 'SEZWOP' elif invoice.gst_category == 'Overseas': supply_type = 'EXPWOP' elif invoice.gst_category == 'Deemed Export': supply_type = 'DEXP' if not supply_type: rr, sez, overseas, export = bold('Registered Regular'), bold('SEZ'), bold('Overseas'), bold('Deemed Export') frappe.throw(_('GST category should be one of {}, {}, {}, {}').format(rr, sez, overseas, export), title=_('Invalid Supply Type')) return frappe._dict(dict( tax_scheme='GST', supply_type=supply_type, reverse_charge=invoice.reverse_charge )) def get_doc_details(invoice): if getdate(invoice.posting_date) < getdate('2021-01-01'): frappe.throw(_('IRN generation is not allowed for invoices dated before 1st Jan 2021'), title=_('Not Allowed')) invoice_type = 'CRN' if invoice.is_return else 'INV' invoice_name = invoice.name invoice_date = format_date(invoice.posting_date, 'dd/mm/yyyy') return frappe._dict(dict( invoice_type=invoice_type, invoice_name=invoice_name, invoice_date=invoice_date )) def validate_address_fields(address, skip_gstin_validation): if ((not address.gstin and not skip_gstin_validation) or not address.city or not address.pincode or not address.address_title or not address.address_line1 or not address.gst_state_number): frappe.throw( msg=_('Address Lines, City, Pincode, GSTIN are mandatory for address {}. Please set them and try again.').format(address.name), title=_('Missing Address Fields') ) if address.address_line2 and len(address.address_line2) < 2: # to prevent "The field Address 2 must be a string with a minimum length of 3 and a maximum length of 100" address.address_line2 = "" def get_party_details(address_name, skip_gstin_validation=False): addr = frappe.get_doc('Address', address_name) validate_address_fields(addr, skip_gstin_validation) if addr.gst_state_number == 97: # according to einvoice standard addr.pincode = 999999 party_address_details = frappe._dict(dict( legal_name=sanitize_for_json(addr.address_title), location=sanitize_for_json(addr.city), pincode=addr.pincode, gstin=addr.gstin, state_code=addr.gst_state_number, address_line1=sanitize_for_json(addr.address_line1), address_line2=sanitize_for_json(addr.address_line2) )) return party_address_details def get_overseas_address_details(address_name): address_title, address_line1, address_line2, city = frappe.db.get_value( 'Address', address_name, ['address_title', 'address_line1', 'address_line2', 'city'] ) if not address_title or not address_line1 or not city: frappe.throw( msg=_('Address lines and city is mandatory for address {}. Please set them and try again.').format( get_link_to_form('Address', address_name) ), title=_('Missing Address Fields') ) return frappe._dict(dict( gstin='URP', legal_name=sanitize_for_json(address_title), location=city, address_line1=sanitize_for_json(address_line1), address_line2=sanitize_for_json(address_line2), pincode=999999, state_code=96, place_of_supply=96 )) def get_item_list(invoice): item_list = [] for d in invoice.items: einvoice_item_schema = read_json('einv_item_template') item = frappe._dict({}) item.update(d.as_dict()) item.sr_no = d.idx item.description = sanitize_for_json(d.item_name) item.qty = abs(item.qty) if flt(item.qty) != 0.0: item.unit_rate = abs(item.taxable_value / item.qty) else: item.unit_rate = abs(item.taxable_value) item.gross_amount = abs(item.taxable_value) item.taxable_value = abs(item.taxable_value) item.discount_amount = 0 item.batch_expiry_date = frappe.db.get_value('Batch', d.batch_no, 'expiry_date') if d.batch_no else None item.batch_expiry_date = format_date(item.batch_expiry_date, 'dd/mm/yyyy') if item.batch_expiry_date else None item.is_service_item = 'Y' if item.gst_hsn_code and item.gst_hsn_code[:2] == "99" else 'N' item.serial_no = "" item = update_item_taxes(invoice, item) item.total_value = abs( item.taxable_value + item.igst_amount + item.sgst_amount + item.cgst_amount + item.cess_amount + item.cess_nadv_amount + item.other_charges ) einv_item = einvoice_item_schema.format(item=item) item_list.append(einv_item) return ', '.join(item_list) def update_item_taxes(invoice, item): gst_accounts = get_gst_accounts(invoice.company) gst_accounts_list = [d for accounts in gst_accounts.values() for d in accounts if d] for attr in [ 'tax_rate', 'cess_rate', 'cess_nadv_amount', 'cgst_amount', 'sgst_amount', 'igst_amount', 'cess_amount', 'cess_nadv_amount', 'other_charges' ]: item[attr] = 0 for t in invoice.taxes: is_applicable = t.tax_amount and t.account_head in gst_accounts_list if is_applicable: # this contains item wise tax rate & tax amount (incl. discount) item_tax_detail = json.loads(t.item_wise_tax_detail).get(item.item_code or item.item_name) item_tax_rate = item_tax_detail[0] # item tax amount excluding discount amount item_tax_amount = (item_tax_rate / 100) * item.taxable_value if t.account_head in gst_accounts.cess_account: item_tax_amount_after_discount = item_tax_detail[1] if t.charge_type == 'On Item Quantity': item.cess_nadv_amount += abs(item_tax_amount_after_discount) else: item.cess_rate += item_tax_rate item.cess_amount += abs(item_tax_amount_after_discount) for tax_type in ['igst', 'cgst', 'sgst']: if t.account_head in gst_accounts[f'{tax_type}_account']: item.tax_rate += item_tax_rate item[f'{tax_type}_amount'] += abs(item_tax_amount) else: # TODO: other charges per item pass return item def get_invoice_value_details(invoice): invoice_value_details = frappe._dict(dict()) invoice_value_details.base_total = abs(sum([i.taxable_value for i in invoice.get('items')])) invoice_value_details.invoice_discount_amt = 0 invoice_value_details.round_off = invoice.base_rounding_adjustment invoice_value_details.base_grand_total = abs(invoice.base_rounded_total) or abs(invoice.base_grand_total) invoice_value_details.grand_total = abs(invoice.rounded_total) or abs(invoice.grand_total) invoice_value_details = update_invoice_taxes(invoice, invoice_value_details) return invoice_value_details def update_invoice_taxes(invoice, invoice_value_details): gst_accounts = get_gst_accounts(invoice.company) gst_accounts_list = [d for accounts in gst_accounts.values() for d in accounts if d] invoice_value_details.total_cgst_amt = 0 invoice_value_details.total_sgst_amt = 0 invoice_value_details.total_igst_amt = 0 invoice_value_details.total_cess_amt = 0 invoice_value_details.total_other_charges = 0 considered_rows = [] for t in invoice.taxes: tax_amount = t.base_tax_amount_after_discount_amount if t.account_head in gst_accounts_list: if t.account_head in gst_accounts.cess_account: # using after discount amt since item also uses after discount amt for cess calc invoice_value_details.total_cess_amt += abs(t.base_tax_amount_after_discount_amount) for tax_type in ['igst', 'cgst', 'sgst']: if t.account_head in gst_accounts[f'{tax_type}_account']: invoice_value_details[f'total_{tax_type}_amt'] += abs(tax_amount) update_other_charges(t, invoice_value_details, gst_accounts_list, invoice, considered_rows) else: invoice_value_details.total_other_charges += abs(tax_amount) return invoice_value_details def update_other_charges(tax_row, invoice_value_details, gst_accounts_list, invoice, considered_rows): prev_row_id = cint(tax_row.row_id) - 1 if tax_row.account_head in gst_accounts_list and prev_row_id not in considered_rows: if tax_row.charge_type == 'On Previous Row Amount': amount = invoice.get('taxes')[prev_row_id].tax_amount_after_discount_amount invoice_value_details.total_other_charges -= abs(amount) considered_rows.append(prev_row_id) if tax_row.charge_type == 'On Previous Row Total': amount = invoice.get('taxes')[prev_row_id].base_total - invoice.base_net_total invoice_value_details.total_other_charges -= abs(amount) considered_rows.append(prev_row_id) def get_payment_details(invoice): payee_name = invoice.company mode_of_payment = ', '.join([d.mode_of_payment for d in invoice.payments]) paid_amount = invoice.base_paid_amount outstanding_amount = invoice.outstanding_amount return frappe._dict(dict( payee_name=payee_name, mode_of_payment=mode_of_payment, paid_amount=paid_amount, outstanding_amount=outstanding_amount )) def get_return_doc_reference(invoice): invoice_date = frappe.db.get_value('Sales Invoice', invoice.return_against, 'posting_date') return frappe._dict(dict( invoice_name=invoice.return_against, invoice_date=format_date(invoice_date, 'dd/mm/yyyy') )) def get_eway_bill_details(invoice): if invoice.is_return: frappe.throw(_('E-Way Bill cannot be generated for Credit Notes & Debit Notes. Please clear fields in the Transporter Section of the invoice.'), title=_('Invalid Fields')) mode_of_transport = { '': '', 'Road': '1', 'Air': '2', 'Rail': '3', 'Ship': '4' } vehicle_type = { 'Regular': 'R', 'Over Dimensional Cargo (ODC)': 'O' } return frappe._dict(dict( gstin=invoice.gst_transporter_id, name=invoice.transporter_name, mode_of_transport=mode_of_transport[invoice.mode_of_transport], distance=invoice.distance or 0, document_name=invoice.lr_no, document_date=format_date(invoice.lr_date, 'dd/mm/yyyy'), vehicle_no=invoice.vehicle_no, vehicle_type=vehicle_type[invoice.gst_vehicle_type] )) def validate_mandatory_fields(invoice): if not invoice.company_address: frappe.throw( _('Company Address is mandatory to fetch company GSTIN details. Please set Company Address and try again.'), title=_('Missing Fields') ) if not invoice.customer_address: frappe.throw( _('Customer Address is mandatory to fetch customer GSTIN details. Please set Company Address and try again.'), title=_('Missing Fields') ) if not frappe.db.get_value('Address', invoice.company_address, 'gstin'): frappe.throw( _('GSTIN is mandatory to fetch company GSTIN details. Please enter GSTIN in selected company address.'), title=_('Missing Fields') ) if invoice.gst_category != 'Overseas' and not frappe.db.get_value('Address', invoice.customer_address, 'gstin'): frappe.throw( _('GSTIN is mandatory to fetch customer GSTIN details. Please enter GSTIN in selected customer address.'), title=_('Missing Fields') ) def validate_totals(einvoice): item_list = einvoice['ItemList'] value_details = einvoice['ValDtls'] total_item_ass_value = 0 total_item_cgst_value = 0 total_item_sgst_value = 0 total_item_igst_value = 0 total_item_value = 0 for item in item_list: total_item_ass_value += flt(item['AssAmt']) total_item_cgst_value += flt(item['CgstAmt']) total_item_sgst_value += flt(item['SgstAmt']) total_item_igst_value += flt(item['IgstAmt']) total_item_value += flt(item['TotItemVal']) if abs(flt(item['AssAmt']) * flt(item['GstRt']) / 100) - (flt(item['CgstAmt']) + flt(item['SgstAmt']) + flt(item['IgstAmt'])) > 1: frappe.throw(_('Row #{}: GST rate is invalid. Please remove tax rows with zero tax amount from taxes table.').format(item.idx)) if abs(flt(value_details['AssVal']) - total_item_ass_value) > 1: frappe.throw(_('Total Taxable Value of the items is not equal to the Invoice Net Total. Please check item taxes / discounts for any correction.')) if abs(flt(value_details['CgstVal']) + flt(value_details['SgstVal']) - total_item_cgst_value - total_item_sgst_value) > 1: frappe.throw(_('CGST + SGST value of the items is not equal to total CGST + SGST value. Please review taxes for any correction.')) if abs(flt(value_details['IgstVal']) - total_item_igst_value) > 1: frappe.throw(_('IGST value of all items is not equal to total IGST value. Please review taxes for any correction.')) if abs( flt(value_details['TotInvVal']) + flt(value_details['Discount']) - flt(value_details['OthChrg']) - flt(value_details['RndOffAmt']) - total_item_value ) > 1: frappe.throw(_('Total Value of the items is not equal to the Invoice Grand Total. Please check item taxes / discounts for any correction.')) calculated_invoice_value = \ flt(value_details['AssVal']) + flt(value_details['CgstVal']) \ + flt(value_details['SgstVal']) + flt(value_details['IgstVal']) \ + flt(value_details['OthChrg']) + flt(value_details['RndOffAmt']) - flt(value_details['Discount']) if abs(flt(value_details['TotInvVal']) - calculated_invoice_value) > 1: frappe.throw(_('Total Item Value + Taxes - Discount is not equal to the Invoice Grand Total. Please check taxes / discounts for any correction.')) def make_einvoice(invoice): validate_mandatory_fields(invoice) schema = read_json('einv_template') transaction_details = get_transaction_details(invoice) item_list = get_item_list(invoice) doc_details = get_doc_details(invoice) invoice_value_details = get_invoice_value_details(invoice) seller_details = get_party_details(invoice.company_address) if invoice.gst_category == 'Overseas': buyer_details = get_overseas_address_details(invoice.customer_address) else: buyer_details = get_party_details(invoice.customer_address) place_of_supply = get_place_of_supply(invoice, invoice.doctype) if place_of_supply: place_of_supply = place_of_supply.split('-')[0] else: place_of_supply = sanitize_for_json(invoice.billing_address_gstin)[:2] buyer_details.update(dict(place_of_supply=place_of_supply)) seller_details.update(dict(legal_name=invoice.company)) buyer_details.update(dict(legal_name=invoice.customer_name or invoice.customer)) shipping_details = payment_details = prev_doc_details = eway_bill_details = frappe._dict({}) if invoice.shipping_address_name and invoice.customer_address != invoice.shipping_address_name: if invoice.gst_category == 'Overseas': shipping_details = get_overseas_address_details(invoice.shipping_address_name) else: shipping_details = get_party_details(invoice.shipping_address_name, skip_gstin_validation=True) dispatch_details = frappe._dict({}) if invoice.dispatch_address_name: dispatch_details = get_party_details(invoice.dispatch_address_name, skip_gstin_validation=True) if invoice.is_pos and invoice.base_paid_amount: payment_details = get_payment_details(invoice) if invoice.is_return and invoice.return_against: prev_doc_details = get_return_doc_reference(invoice) if invoice.transporter and not invoice.is_return: eway_bill_details = get_eway_bill_details(invoice) # not yet implemented period_details = export_details = frappe._dict({}) einvoice = schema.format( transaction_details=transaction_details, doc_details=doc_details, dispatch_details=dispatch_details, seller_details=seller_details, buyer_details=buyer_details, shipping_details=shipping_details, item_list=item_list, invoice_value_details=invoice_value_details, payment_details=payment_details, period_details=period_details, prev_doc_details=prev_doc_details, export_details=export_details, eway_bill_details=eway_bill_details ) try: einvoice = safe_json_load(einvoice) einvoice = santize_einvoice_fields(einvoice) except Exception: show_link_to_error_log(invoice, einvoice) try: validate_totals(einvoice) except Exception: log_error(einvoice) raise return einvoice def show_link_to_error_log(invoice, einvoice): err_log = log_error(einvoice) link_to_error_log = get_link_to_form('Error Log', err_log.name, 'Error Log') frappe.throw( _('An error occurred while creating e-invoice for {}. Please check {} for more information.').format( invoice.name, link_to_error_log), title=_('E Invoice Creation Failed') ) def log_error(data=None): if isinstance(data, six.string_types): data = json.loads(data) seperator = "--" * 50 err_tb = traceback.format_exc() err_msg = str(sys.exc_info()[1]) data = json.dumps(data, indent=4) message = "\n".join([ "Error", err_msg, seperator, "Data:", data, seperator, "Exception:", err_tb ]) return frappe.log_error(title=_('E Invoice Request Failed'), message=message) def santize_einvoice_fields(einvoice): int_fields = ["Pin","Distance","CrDay"] float_fields = ["Qty","FreeQty","UnitPrice","TotAmt","Discount","PreTaxVal","AssAmt","GstRt","IgstAmt","CgstAmt","SgstAmt","CesRt","CesAmt","CesNonAdvlAmt","StateCesRt","StateCesAmt","StateCesNonAdvlAmt","OthChrg","TotItemVal","AssVal","CgstVal","SgstVal","IgstVal","CesVal","StCesVal","Discount","OthChrg","RndOffAmt","TotInvVal","TotInvValFc","PaidAmt","PaymtDue","ExpDuty",] copy = einvoice.copy() for key, value in copy.items(): if isinstance(value, list): for idx, d in enumerate(value): santized_dict = santize_einvoice_fields(d) if santized_dict: einvoice[key][idx] = santized_dict else: einvoice[key].pop(idx) if not einvoice[key]: einvoice.pop(key, None) elif isinstance(value, dict): santized_dict = santize_einvoice_fields(value) if santized_dict: einvoice[key] = santized_dict else: einvoice.pop(key, None) elif not value or value == "None": einvoice.pop(key, None) elif key in float_fields: einvoice[key] = flt(value, 2) elif key in int_fields: einvoice[key] = cint(value) return einvoice def safe_json_load(json_string): try: return json.loads(json_string) except json.JSONDecodeError as e: # print a snippet of 40 characters around the location where error occured pos = e.pos start, end = max(0, pos-20), min(len(json_string)-1, pos+20) snippet = json_string[start:end] frappe.throw(_("Error in input data. Please check for any special characters near following input:
{}").format(snippet)) class RequestFailed(Exception): pass class CancellationNotAllowed(Exception): pass class GSPConnector(): def __init__(self, doctype=None, docname=None): self.doctype = doctype self.docname = docname self.set_invoice() self.set_credentials() # authenticate url is same for sandbox & live self.authenticate_url = 'https://gsp.adaequare.com/gsp/authenticate?grant_type=token' self.base_url = 'https://gsp.adaequare.com' if not self.e_invoice_settings.sandbox_mode else 'https://gsp.adaequare.com/test' self.cancel_irn_url = self.base_url + '/enriched/ei/api/invoice/cancel' self.irn_details_url = self.base_url + '/enriched/ei/api/invoice/irn' self.generate_irn_url = self.base_url + '/enriched/ei/api/invoice' self.gstin_details_url = self.base_url + '/enriched/ei/api/master/gstin' self.cancel_ewaybill_url = self.base_url + '/enriched/ewb/ewayapi?action=CANEWB' self.generate_ewaybill_url = self.base_url + '/enriched/ei/api/ewaybill' def set_invoice(self): self.invoice = None if self.doctype and self.docname: self.invoice = frappe.get_cached_doc(self.doctype, self.docname) def set_credentials(self): self.e_invoice_settings = frappe.get_cached_doc('E Invoice Settings') if not self.e_invoice_settings.enable: frappe.throw(_("E-Invoicing is disabled. Please enable it from {} to generate e-invoices.").format(get_link_to_form("E Invoice Settings", "E Invoice Settings"))) if self.invoice: gstin = self.get_seller_gstin() credentials_for_gstin = [d for d in self.e_invoice_settings.credentials if d.gstin == gstin] if credentials_for_gstin: self.credentials = credentials_for_gstin[0] else: frappe.throw(_('Cannot find e-invoicing credentials for selected Company GSTIN. Please check E-Invoice Settings')) else: self.credentials = self.e_invoice_settings.credentials[0] if self.e_invoice_settings.credentials else None def get_seller_gstin(self): gstin = frappe.db.get_value('Address', self.invoice.company_address, 'gstin') if not gstin: frappe.throw(_('Cannot retrieve Company GSTIN. Please select company address with valid GSTIN.')) return gstin def get_auth_token(self): if time_diff_in_seconds(self.e_invoice_settings.token_expiry, now_datetime()) < 150.0: self.fetch_auth_token() return self.e_invoice_settings.auth_token def make_request(self, request_type, url, headers=None, data=None): if request_type == 'post': res = make_post_request(url, headers=headers, data=data) else: res = make_get_request(url, headers=headers, data=data) self.log_request(url, headers, data, res) return res def log_request(self, url, headers, data, res): headers.update({ 'password': self.credentials.password }) request_log = frappe.get_doc({ "doctype": "E Invoice Request Log", "user": frappe.session.user, "reference_invoice": self.invoice.name if self.invoice else None, "url": url, "headers": json.dumps(headers, indent=4) if headers else None, "data": json.dumps(data, indent=4) if isinstance(data, dict) else data, "response": json.dumps(res, indent=4) if res else None }) request_log.save(ignore_permissions=True) frappe.db.commit() def fetch_auth_token(self): client_id = self.e_invoice_settings.client_id or frappe.conf.einvoice_client_id client_secret = self.e_invoice_settings.get_password('client_secret') or frappe.conf.einvoice_client_secret headers = { 'gspappid': client_id, 'gspappsecret': client_secret } res = {} try: res = self.make_request('post', self.authenticate_url, headers) self.e_invoice_settings.auth_token = "{} {}".format(res.get('token_type'), res.get('access_token')) self.e_invoice_settings.token_expiry = add_to_date(None, seconds=res.get('expires_in')) self.e_invoice_settings.save(ignore_permissions=True) self.e_invoice_settings.reload() except Exception: log_error(res) self.raise_error(True) def get_headers(self): return { 'content-type': 'application/json', 'user_name': self.credentials.username, 'password': self.credentials.get_password(), 'gstin': self.credentials.gstin, 'authorization': self.get_auth_token(), 'requestid': str(base64.b64encode(os.urandom(18))), } def fetch_gstin_details(self, gstin): headers = self.get_headers() try: params = '?gstin={gstin}'.format(gstin=gstin) res = self.make_request('get', self.gstin_details_url + params, headers) if res.get('success'): return res.get('result') else: log_error(res) raise RequestFailed except RequestFailed: self.raise_error() except Exception: log_error() self.raise_error(True) @staticmethod def get_gstin_details(gstin): '''fetch and cache GSTIN details''' if not hasattr(frappe.local, 'gstin_cache'): frappe.local.gstin_cache = {} key = gstin gsp_connector = GSPConnector() details = gsp_connector.fetch_gstin_details(gstin) frappe.local.gstin_cache[key] = details frappe.cache().hset('gstin_cache', key, details) return details def generate_irn(self): data = {} try: headers = self.get_headers() einvoice = make_einvoice(self.invoice) data = json.dumps(einvoice, indent=4) res = self.make_request('post', self.generate_irn_url, headers, data) if res.get('success'): self.set_einvoice_data(res.get('result')) elif '2150' in res.get('message'): # IRN already generated but not updated in invoice # Extract the IRN from the response description and fetch irn details irn = res.get('result')[0].get('Desc').get('Irn') irn_details = self.get_irn_details(irn) if irn_details: self.set_einvoice_data(irn_details) else: raise RequestFailed('IRN has already been generated for the invoice but cannot fetch details for the it. \ Contact ERPNext support to resolve the issue.') else: raise RequestFailed except RequestFailed: errors = self.sanitize_error_message(res.get('message')) self.set_failed_status(errors=errors) self.raise_error(errors=errors) except Exception as e: self.set_failed_status(errors=str(e)) log_error(data) self.raise_error(True) @staticmethod def bulk_generate_irn(invoices): gsp_connector = GSPConnector() gsp_connector.doctype = 'Sales Invoice' failed = [] for invoice in invoices: try: gsp_connector.docname = invoice gsp_connector.set_invoice() gsp_connector.set_credentials() gsp_connector.generate_irn() except Exception as e: failed.append({ 'docname': invoice, 'message': str(e) }) return failed def get_irn_details(self, irn): headers = self.get_headers() try: params = '?irn={irn}'.format(irn=irn) res = self.make_request('get', self.irn_details_url + params, headers) if res.get('success'): return res.get('result') else: raise RequestFailed except RequestFailed: errors = self.sanitize_error_message(res.get('message')) self.raise_error(errors=errors) except Exception: log_error() self.raise_error(True) def cancel_irn(self, irn, reason, remark): data, res = {}, {} try: # validate cancellation if time_diff_in_hours(now_datetime(), self.invoice.ack_date) > 24: frappe.throw(_('E-Invoice cannot be cancelled after 24 hours of IRN generation.'), title=_('Not Allowed'), exc=CancellationNotAllowed) if not irn: frappe.throw(_('IRN not found. You must generate IRN before cancelling.'), title=_('Not Allowed'), exc=CancellationNotAllowed) headers = self.get_headers() data = json.dumps({ 'Irn': irn, 'Cnlrsn': reason, 'Cnlrem': remark }, indent=4) res = self.make_request('post', self.cancel_irn_url, headers, data) if res.get('success') or '9999' in res.get('message'): self.invoice.irn_cancelled = 1 self.invoice.irn_cancel_date = res.get('result')['CancelDate'] if res.get('result') else "" self.invoice.einvoice_status = 'Cancelled' self.invoice.flags.updater_reference = { 'doctype': self.invoice.doctype, 'docname': self.invoice.name, 'label': _('IRN Cancelled - {}').format(remark) } self.update_invoice() else: raise RequestFailed except RequestFailed: errors = self.sanitize_error_message(res.get('message')) self.set_failed_status(errors=errors) self.raise_error(errors=errors) except CancellationNotAllowed as e: self.set_failed_status(errors=str(e)) self.raise_error(errors=str(e)) except Exception as e: self.set_failed_status(errors=str(e)) log_error(data) self.raise_error(True) @staticmethod def bulk_cancel_irn(invoices, reason, remark): gsp_connector = GSPConnector() gsp_connector.doctype = 'Sales Invoice' failed = [] for invoice in invoices: try: gsp_connector.docname = invoice gsp_connector.set_invoice() gsp_connector.set_credentials() irn = gsp_connector.invoice.irn gsp_connector.cancel_irn(irn, reason, remark) except Exception as e: failed.append({ 'docname': invoice, 'message': str(e) }) return failed def generate_eway_bill(self, **kwargs): args = frappe._dict(kwargs) headers = self.get_headers() eway_bill_details = get_eway_bill_details(args) data = json.dumps({ 'Irn': args.irn, 'Distance': cint(eway_bill_details.distance), 'TransMode': eway_bill_details.mode_of_transport, 'TransId': eway_bill_details.gstin, 'TransName': eway_bill_details.transporter, 'TrnDocDt': eway_bill_details.document_date, 'TrnDocNo': eway_bill_details.document_name, 'VehNo': eway_bill_details.vehicle_no, 'VehType': eway_bill_details.vehicle_type }, indent=4) try: res = self.make_request('post', self.generate_ewaybill_url, headers, data) if res.get('success'): self.invoice.ewaybill = res.get('result').get('EwbNo') self.invoice.eway_bill_validity = res.get('result').get('EwbValidTill') self.invoice.eway_bill_cancelled = 0 self.invoice.update(args) self.invoice.flags.updater_reference = { 'doctype': self.invoice.doctype, 'docname': self.invoice.name, 'label': _('E-Way Bill Generated') } self.update_invoice() else: raise RequestFailed except RequestFailed: errors = self.sanitize_error_message(res.get('message')) self.raise_error(errors=errors) except Exception: log_error(data) self.raise_error(True) def cancel_eway_bill(self, eway_bill, reason, remark): headers = self.get_headers() data = json.dumps({ 'ewbNo': eway_bill, 'cancelRsnCode': reason, 'cancelRmrk': remark }, indent=4) headers["username"] = headers["user_name"] del headers["user_name"] try: res = self.make_request('post', self.cancel_ewaybill_url, headers, data) if res.get('success'): self.invoice.ewaybill = '' self.invoice.eway_bill_cancelled = 1 self.invoice.flags.updater_reference = { 'doctype': self.invoice.doctype, 'docname': self.invoice.name, 'label': _('E-Way Bill Cancelled - {}').format(remark) } self.update_invoice() else: raise RequestFailed except RequestFailed: errors = self.sanitize_error_message(res.get('message')) self.raise_error(errors=errors) except Exception: log_error(data) self.raise_error(True) def sanitize_error_message(self, message): ''' On validation errors, response message looks something like this: message = '2174 : For inter-state transaction, CGST and SGST amounts are not applicable; only IGST amount is applicable, 3095 : Supplier GSTIN is inactive' we search for string between ':' to extract the error messages errors = [ ': For inter-state transaction, CGST and SGST amounts are not applicable; only IGST amount is applicable, 3095 ', ': Test' ] then we trim down the message by looping over errors ''' if not message: return [] errors = re.findall(': [^:]+', message) for idx, e in enumerate(errors): # remove colons errors[idx] = errors[idx].replace(':', '').strip() # if not last if idx != len(errors) - 1: # remove last 7 chars eg: ', 3095 ' errors[idx] = errors[idx][:-6] return errors def raise_error(self, raise_exception=False, errors=None): if errors is None: errors = [] title = _('E Invoice Request Failed') if errors: frappe.throw(errors, title=title, as_list=1) else: link_to_error_list = 'Error Log' frappe.msgprint( _('An error occurred while making e-invoicing request. Please check {} for more information.').format(link_to_error_list), title=title, raise_exception=raise_exception, indicator='red' ) def set_einvoice_data(self, res): enc_signed_invoice = res.get('SignedInvoice') dec_signed_invoice = jwt.decode(enc_signed_invoice, options={"verify_signature": False})['data'] self.invoice.irn = res.get('Irn') self.invoice.ewaybill = res.get('EwbNo') self.invoice.eway_bill_validity = res.get('EwbValidTill') self.invoice.ack_no = res.get('AckNo') self.invoice.ack_date = res.get('AckDt') self.invoice.signed_einvoice = dec_signed_invoice self.invoice.ack_no = res.get('AckNo') self.invoice.ack_date = res.get('AckDt') self.invoice.signed_qr_code = res.get('SignedQRCode') self.invoice.einvoice_status = 'Generated' self.attach_qrcode_image() self.invoice.flags.updater_reference = { 'doctype': self.invoice.doctype, 'docname': self.invoice.name, 'label': _('IRN Generated') } self.update_invoice() def attach_qrcode_image(self): qrcode = self.invoice.signed_qr_code doctype = self.invoice.doctype docname = self.invoice.name filename = 'QRCode_{}.png'.format(docname).replace(os.path.sep, "__") qr_image = io.BytesIO() url = qrcreate(qrcode, error='L') url.png(qr_image, scale=2, quiet_zone=1) _file = frappe.get_doc({ "doctype": "File", "file_name": filename, "attached_to_doctype": doctype, "attached_to_name": docname, "attached_to_field": "qrcode_image", "is_private": 0, "content": qr_image.getvalue()}) _file.save() frappe.db.commit() self.invoice.qrcode_image = _file.file_url def update_invoice(self): self.invoice.flags.ignore_validate_update_after_submit = True self.invoice.flags.ignore_validate = True self.invoice.save() def set_failed_status(self, errors=None): frappe.db.rollback() self.invoice.einvoice_status = 'Failed' self.invoice.failure_description = self.get_failure_message(errors) if errors else "" self.update_invoice() frappe.db.commit() def get_failure_message(self, errors): if isinstance(errors, list): errors = ', '.join(errors) return errors def sanitize_for_json(string): """Escape JSON specific characters from a string.""" # json.dumps adds double-quotes to the string. Indexing to remove them. return json.dumps(string)[1:-1] @frappe.whitelist() def get_einvoice(doctype, docname): invoice = frappe.get_doc(doctype, docname) return make_einvoice(invoice) @frappe.whitelist() def generate_irn(doctype, docname): gsp_connector = GSPConnector(doctype, docname) gsp_connector.generate_irn() @frappe.whitelist() def cancel_irn(doctype, docname, irn, reason, remark): gsp_connector = GSPConnector(doctype, docname) gsp_connector.cancel_irn(irn, reason, remark) @frappe.whitelist() def generate_eway_bill(doctype, docname, **kwargs): gsp_connector = GSPConnector(doctype, docname) gsp_connector.generate_eway_bill(**kwargs) @frappe.whitelist() def cancel_eway_bill(doctype, docname): # TODO: uncomment when eway_bill api from Adequare is enabled # gsp_connector = GSPConnector(doctype, docname) # gsp_connector.cancel_eway_bill(eway_bill, reason, remark) frappe.db.set_value(doctype, docname, 'ewaybill', '') frappe.db.set_value(doctype, docname, 'eway_bill_cancelled', 1) @frappe.whitelist() def generate_einvoices(docnames): docnames = json.loads(docnames) or [] if len(docnames) < 10: failures = GSPConnector.bulk_generate_irn(docnames) frappe.local.message_log = [] if failures: show_bulk_action_failure_message(failures) success = len(docnames) - len(failures) frappe.msgprint( _('{} e-invoices generated successfully').format(success), title=_('Bulk E-Invoice Generation Complete') ) else: enqueue_bulk_action(schedule_bulk_generate_irn, docnames=docnames) def schedule_bulk_generate_irn(docnames): failures = GSPConnector.bulk_generate_irn(docnames) frappe.local.message_log = [] frappe.publish_realtime("bulk_einvoice_generation_complete", { "user": frappe.session.user, "failures": failures, "invoices": docnames }) def show_bulk_action_failure_message(failures): for doc in failures: docname = '{0}'.format(doc.get('docname')) message = doc.get('message').replace("'", '"') if message[0] == '[': errors = json.loads(message) error_list = ''.join(['
  • {}
  • '.format(err) for err in errors]) message = '''{} has following errors:
    '''.format(docname, error_list) else: message = '{} - {}'.format(docname, message) frappe.msgprint( message, title=_('Bulk E-Invoice Generation Complete'), indicator='red' ) @frappe.whitelist() def cancel_irns(docnames, reason, remark): docnames = json.loads(docnames) or [] if len(docnames) < 10: failures = GSPConnector.bulk_cancel_irn(docnames, reason, remark) frappe.local.message_log = [] if failures: show_bulk_action_failure_message(failures) success = len(docnames) - len(failures) frappe.msgprint( _('{} e-invoices cancelled successfully').format(success), title=_('Bulk E-Invoice Cancellation Complete') ) else: enqueue_bulk_action(schedule_bulk_cancel_irn, docnames=docnames, reason=reason, remark=remark) def schedule_bulk_cancel_irn(docnames, reason, remark): failures = GSPConnector.bulk_cancel_irn(docnames, reason, remark) frappe.local.message_log = [] frappe.publish_realtime("bulk_einvoice_cancellation_complete", { "user": frappe.session.user, "failures": failures, "invoices": docnames }) def enqueue_bulk_action(job, **kwargs): check_scheduler_status() enqueue( job, **kwargs, queue="long", timeout=10000, event="processing_bulk_einvoice_action", now=frappe.conf.developer_mode or frappe.flags.in_test, ) if job == schedule_bulk_generate_irn: msg = _('E-Invoices will be generated in a background process.') else: msg = _('E-Invoices will be cancelled in a background process.') frappe.msgprint(msg, alert=1) def check_scheduler_status(): if is_scheduler_inactive() and not frappe.flags.in_test: frappe.throw(_("Scheduler is inactive. Cannot enqueue job."), title=_("Scheduler Inactive")) def job_already_enqueued(job_name): enqueued_jobs = [d.get("job_name") for d in get_info()] if job_name in enqueued_jobs: return True