brotherton-erpnext/erpnext/regional/india/e_invoice/utils.py

1144 lines
39 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
from __future__ import unicode_literals
import os
import re
import jwt
import sys
import json
import base64
import frappe
import six
import traceback
import io
from frappe import _, bold
from pyqrcode import create as qrcreate
from frappe.utils.background_jobs import enqueue
from frappe.utils.scheduler import is_scheduler_inactive
from frappe.core.page.background_jobs.background_jobs import get_info
from frappe.integrations.utils import make_post_request, make_get_request
from erpnext.regional.india.utils import get_gst_accounts, get_place_of_supply
from frappe.utils.data import cstr, cint, format_date, flt, time_diff_in_seconds, now_datetime, add_to_date, get_link_to_form, getdate, time_diff_in_hours
@frappe.whitelist()
def validate_eligibility(doc):
if isinstance(doc, six.string_types):
doc = json.loads(doc)
invalid_doctype = doc.get('doctype') != 'Sales Invoice'
if invalid_doctype:
return False
einvoicing_enabled = cint(frappe.db.get_single_value('E Invoice Settings', 'enable'))
if not einvoicing_enabled:
return False
einvoicing_eligible_from = frappe.db.get_single_value('E Invoice Settings', 'applicable_from') or '2021-04-01'
if getdate(doc.get('posting_date')) < getdate(einvoicing_eligible_from):
return False
invalid_company = not frappe.db.get_value('E Invoice User', { 'company': doc.get('company') })
invalid_supply_type = doc.get('gst_category') not in ['Registered Regular', 'SEZ', 'Overseas', 'Deemed Export']
company_transaction = doc.get('billing_address_gstin') == doc.get('company_gstin')
no_taxes_applied = not doc.get('taxes')
has_non_gst_item = any(d for d in doc.get('items', []) if d.get('is_non_gst'))
if invalid_company or invalid_supply_type or company_transaction or no_taxes_applied or has_non_gst_item:
return False
return True
def validate_einvoice_fields(doc):
invoice_eligible = validate_eligibility(doc)
if not invoice_eligible:
return
if doc.docstatus == 0 and doc._action == 'save':
if doc.irn:
frappe.throw(_('You cannot edit the invoice after generating IRN'), title=_('Edit Not Allowed'))
if len(doc.name) > 16:
raise_document_name_too_long_error()
doc.einvoice_status = 'Pending'
elif doc.docstatus == 1 and doc._action == 'submit' and not doc.irn:
frappe.throw(_('You must generate IRN before submitting the document.'), title=_('Missing IRN'))
elif doc.irn and doc.docstatus == 2 and doc._action == 'cancel' and not doc.irn_cancelled:
frappe.throw(_('You must cancel IRN before cancelling the document.'), title=_('Cancel Not Allowed'))
def raise_document_name_too_long_error():
title = _('Document ID Too Long')
msg = _('As you have E-Invoicing enabled, to be able to generate IRN for this invoice')
msg += ', '
msg += _('document id {} exceed 16 letters.').format(bold(_('should not')))
msg += '<br><br>'
msg += _('You must {} your {} in order to have document id of {} length 16.').format(
bold(_('modify')), bold(_('naming series')), bold(_('maximum'))
)
msg += _('Please account for ammended documents too.')
frappe.throw(msg, title=title)
def read_json(name):
file_path = os.path.join(os.path.dirname(__file__), '{name}.json'.format(name=name))
with open(file_path, 'r') as f:
return cstr(f.read())
def get_transaction_details(invoice):
supply_type = ''
if invoice.gst_category == 'Registered Regular': supply_type = 'B2B'
elif invoice.gst_category == 'SEZ': supply_type = 'SEZWOP'
elif invoice.gst_category == 'Overseas': supply_type = 'EXPWOP'
elif invoice.gst_category == 'Deemed Export': supply_type = 'DEXP'
if not supply_type:
rr, sez, overseas, export = bold('Registered Regular'), bold('SEZ'), bold('Overseas'), bold('Deemed Export')
frappe.throw(_('GST category should be one of {}, {}, {}, {}').format(rr, sez, overseas, export),
title=_('Invalid Supply Type'))
return frappe._dict(dict(
tax_scheme='GST',
supply_type=supply_type,
reverse_charge=invoice.reverse_charge
))
def get_doc_details(invoice):
if getdate(invoice.posting_date) < getdate('2021-01-01'):
frappe.throw(_('IRN generation is not allowed for invoices dated before 1st Jan 2021'), title=_('Not Allowed'))
invoice_type = 'CRN' if invoice.is_return else 'INV'
invoice_name = invoice.name
invoice_date = format_date(invoice.posting_date, 'dd/mm/yyyy')
return frappe._dict(dict(
invoice_type=invoice_type,
invoice_name=invoice_name,
invoice_date=invoice_date
))
def validate_address_fields(address, is_shipping_address):
if ((not address.gstin and not is_shipping_address)
or not address.city
or not address.pincode
or not address.address_title
or not address.address_line1
or not address.gst_state_number):
frappe.throw(
msg=_('Address Lines, City, Pincode, GSTIN are mandatory for address {}. Please set them and try again.').format(address.name),
title=_('Missing Address Fields')
)
def get_party_details(address_name, is_shipping_address=False):
addr = frappe.get_doc('Address', address_name)
validate_address_fields(addr, is_shipping_address)
if addr.gst_state_number == 97:
# according to einvoice standard
addr.pincode = 999999
party_address_details = frappe._dict(dict(
legal_name=sanitize_for_json(addr.address_title),
location=sanitize_for_json(addr.city),
pincode=addr.pincode, gstin=addr.gstin,
state_code=addr.gst_state_number,
address_line1=sanitize_for_json(addr.address_line1),
address_line2=sanitize_for_json(addr.address_line2)
))
return party_address_details
def get_overseas_address_details(address_name):
address_title, address_line1, address_line2, city = frappe.db.get_value(
'Address', address_name, ['address_title', 'address_line1', 'address_line2', 'city']
)
if not address_title or not address_line1 or not city:
frappe.throw(
msg=_('Address lines and city is mandatory for address {}. Please set them and try again.').format(
get_link_to_form('Address', address_name)
),
title=_('Missing Address Fields')
)
return frappe._dict(dict(
gstin='URP',
legal_name=sanitize_for_json(address_title),
location=city,
address_line1=sanitize_for_json(address_line1),
address_line2=sanitize_for_json(address_line2),
pincode=999999, state_code=96, place_of_supply=96
))
def get_item_list(invoice):
item_list = []
for d in invoice.items:
einvoice_item_schema = read_json('einv_item_template')
item = frappe._dict({})
item.update(d.as_dict())
item.sr_no = d.idx
item.description = sanitize_for_json(d.item_name)
item.qty = abs(item.qty)
if invoice.apply_discount_on == 'Net Total' and invoice.discount_amount:
item.discount_amount = abs(item.base_amount - item.base_net_amount)
else:
item.discount_amount = 0
item.unit_rate = abs((abs(item.taxable_value) - item.discount_amount)/ item.qty)
item.gross_amount = abs(item.taxable_value) + item.discount_amount
item.taxable_value = abs(item.taxable_value)
item.batch_expiry_date = frappe.db.get_value('Batch', d.batch_no, 'expiry_date') if d.batch_no else None
item.batch_expiry_date = format_date(item.batch_expiry_date, 'dd/mm/yyyy') if item.batch_expiry_date else None
item.is_service_item = 'N' if frappe.db.get_value('Item', d.item_code, 'is_stock_item') else 'Y'
item.serial_no = ""
item = update_item_taxes(invoice, item)
item.total_value = abs(
item.taxable_value + item.igst_amount + item.sgst_amount +
item.cgst_amount + item.cess_amount + item.cess_nadv_amount + item.other_charges
)
einv_item = einvoice_item_schema.format(item=item)
item_list.append(einv_item)
return ', '.join(item_list)
def update_item_taxes(invoice, item):
gst_accounts = get_gst_accounts(invoice.company)
gst_accounts_list = [d for accounts in gst_accounts.values() for d in accounts if d]
for attr in [
'tax_rate', 'cess_rate', 'cess_nadv_amount',
'cgst_amount', 'sgst_amount', 'igst_amount',
'cess_amount', 'cess_nadv_amount', 'other_charges'
]:
item[attr] = 0
for t in invoice.taxes:
is_applicable = t.tax_amount and t.account_head in gst_accounts_list
if is_applicable:
# this contains item wise tax rate & tax amount (incl. discount)
item_tax_detail = json.loads(t.item_wise_tax_detail).get(item.item_code or item.item_name)
item_tax_rate = item_tax_detail[0]
# item tax amount excluding discount amount
item_tax_amount = (item_tax_rate / 100) * item.taxable_value
if t.account_head in gst_accounts.cess_account:
item_tax_amount_after_discount = item_tax_detail[1]
if t.charge_type == 'On Item Quantity':
item.cess_nadv_amount += abs(item_tax_amount_after_discount)
else:
item.cess_rate += item_tax_rate
item.cess_amount += abs(item_tax_amount_after_discount)
for tax_type in ['igst', 'cgst', 'sgst']:
if t.account_head in gst_accounts[f'{tax_type}_account']:
item.tax_rate += item_tax_rate
item[f'{tax_type}_amount'] += abs(item_tax_amount)
else:
# TODO: other charges per item
pass
return item
def get_invoice_value_details(invoice):
invoice_value_details = frappe._dict(dict())
if invoice.apply_discount_on == 'Net Total' and invoice.discount_amount:
# Discount already applied on net total which means on items
invoice_value_details.base_total = abs(sum([i.taxable_value for i in invoice.get('items')]))
invoice_value_details.invoice_discount_amt = 0
elif invoice.apply_discount_on == 'Grand Total' and invoice.discount_amount:
invoice_value_details.invoice_discount_amt = invoice.base_discount_amount
invoice_value_details.base_total = abs(sum([i.taxable_value for i in invoice.get('items')]))
else:
invoice_value_details.base_total = abs(sum([i.taxable_value for i in invoice.get('items')]))
# since tax already considers discount amount
invoice_value_details.invoice_discount_amt = 0
invoice_value_details.round_off = invoice.base_rounding_adjustment
invoice_value_details.base_grand_total = abs(invoice.base_rounded_total) or abs(invoice.base_grand_total)
invoice_value_details.grand_total = abs(invoice.rounded_total) or abs(invoice.grand_total)
invoice_value_details = update_invoice_taxes(invoice, invoice_value_details)
return invoice_value_details
def update_invoice_taxes(invoice, invoice_value_details):
gst_accounts = get_gst_accounts(invoice.company)
gst_accounts_list = [d for accounts in gst_accounts.values() for d in accounts if d]
invoice_value_details.total_cgst_amt = 0
invoice_value_details.total_sgst_amt = 0
invoice_value_details.total_igst_amt = 0
invoice_value_details.total_cess_amt = 0
invoice_value_details.total_other_charges = 0
considered_rows = []
for t in invoice.taxes:
tax_amount = t.base_tax_amount if (invoice.apply_discount_on == 'Grand Total' and invoice.discount_amount) \
else t.base_tax_amount_after_discount_amount
if t.account_head in gst_accounts_list:
if t.account_head in gst_accounts.cess_account:
# using after discount amt since item also uses after discount amt for cess calc
invoice_value_details.total_cess_amt += abs(t.base_tax_amount_after_discount_amount)
for tax_type in ['igst', 'cgst', 'sgst']:
if t.account_head in gst_accounts[f'{tax_type}_account']:
invoice_value_details[f'total_{tax_type}_amt'] += abs(tax_amount)
update_other_charges(t, invoice_value_details, gst_accounts_list, invoice, considered_rows)
else:
invoice_value_details.total_other_charges += abs(tax_amount)
return invoice_value_details
def update_other_charges(tax_row, invoice_value_details, gst_accounts_list, invoice, considered_rows):
prev_row_id = cint(tax_row.row_id) - 1
if tax_row.account_head in gst_accounts_list and prev_row_id not in considered_rows:
if tax_row.charge_type == 'On Previous Row Amount':
amount = invoice.get('taxes')[prev_row_id].tax_amount_after_discount_amount
invoice_value_details.total_other_charges -= abs(amount)
considered_rows.append(prev_row_id)
if tax_row.charge_type == 'On Previous Row Total':
amount = invoice.get('taxes')[prev_row_id].base_total - invoice.base_net_total
invoice_value_details.total_other_charges -= abs(amount)
considered_rows.append(prev_row_id)
def get_payment_details(invoice):
payee_name = invoice.company
mode_of_payment = ', '.join([d.mode_of_payment for d in invoice.payments])
paid_amount = invoice.base_paid_amount
outstanding_amount = invoice.outstanding_amount
return frappe._dict(dict(
payee_name=payee_name, mode_of_payment=mode_of_payment,
paid_amount=paid_amount, outstanding_amount=outstanding_amount
))
def get_return_doc_reference(invoice):
if not invoice.return_against:
frappe.throw(_('For generating IRN, reference to the original invoice is mandatory for a credit note. Please set {} field to generate e-invoice.')
.format(frappe.bold('Return Against')), title=_('Missing Field'))
invoice_date = frappe.db.get_value('Sales Invoice', invoice.return_against, 'posting_date')
return frappe._dict(dict(
invoice_name=invoice.return_against, invoice_date=format_date(invoice_date, 'dd/mm/yyyy')
))
def get_eway_bill_details(invoice):
if invoice.is_return:
frappe.throw(_('E-Way Bill cannot be generated for Credit Notes & Debit Notes. Please clear fields in the Transporter Section of the invoice.'),
title=_('Invalid Fields'))
mode_of_transport = { '': '', 'Road': '1', 'Air': '2', 'Rail': '3', 'Ship': '4' }
vehicle_type = { 'Regular': 'R', 'Over Dimensional Cargo (ODC)': 'O' }
return frappe._dict(dict(
gstin=invoice.gst_transporter_id,
name=invoice.transporter_name,
mode_of_transport=mode_of_transport[invoice.mode_of_transport],
distance=invoice.distance or 0,
document_name=invoice.lr_no,
document_date=format_date(invoice.lr_date, 'dd/mm/yyyy'),
vehicle_no=invoice.vehicle_no,
vehicle_type=vehicle_type[invoice.gst_vehicle_type]
))
def validate_mandatory_fields(invoice):
if not invoice.company_address:
frappe.throw(
_('Company Address is mandatory to fetch company GSTIN details. Please set Company Address and try again.'),
title=_('Missing Fields')
)
if not invoice.customer_address:
frappe.throw(
_('Customer Address is mandatory to fetch customer GSTIN details. Please set Company Address and try again.'),
title=_('Missing Fields')
)
if not frappe.db.get_value('Address', invoice.company_address, 'gstin'):
frappe.throw(
_('GSTIN is mandatory to fetch company GSTIN details. Please enter GSTIN in selected company address.'),
title=_('Missing Fields')
)
if invoice.gst_category != 'Overseas' and not frappe.db.get_value('Address', invoice.customer_address, 'gstin'):
frappe.throw(
_('GSTIN is mandatory to fetch customer GSTIN details. Please enter GSTIN in selected customer address.'),
title=_('Missing Fields')
)
def validate_totals(einvoice):
item_list = einvoice['ItemList']
value_details = einvoice['ValDtls']
total_item_ass_value = 0
total_item_cgst_value = 0
total_item_sgst_value = 0
total_item_igst_value = 0
total_item_value = 0
for item in item_list:
total_item_ass_value += flt(item['AssAmt'])
total_item_cgst_value += flt(item['CgstAmt'])
total_item_sgst_value += flt(item['SgstAmt'])
total_item_igst_value += flt(item['IgstAmt'])
total_item_value += flt(item['TotItemVal'])
if abs(flt(item['AssAmt']) * flt(item['GstRt']) / 100) - (flt(item['CgstAmt']) + flt(item['SgstAmt']) + flt(item['IgstAmt'])) > 1:
frappe.throw(_('Row #{}: GST rate is invalid. Please remove tax rows with zero tax amount from taxes table.').format(item.idx))
if abs(flt(value_details['AssVal']) - total_item_ass_value) > 1:
frappe.throw(_('Total Taxable Value of the items is not equal to the Invoice Net Total. Please check item taxes / discounts for any correction.'))
if abs(flt(value_details['TotInvVal']) + flt(value_details['Discount']) - flt(value_details['OthChrg']) - total_item_value) > 1:
frappe.throw(_('Total Value of the items is not equal to the Invoice Grand Total. Please check item taxes / discounts for any correction.'))
calculated_invoice_value = \
flt(value_details['AssVal']) + flt(value_details['CgstVal']) \
+ flt(value_details['SgstVal']) + flt(value_details['IgstVal']) \
+ flt(value_details['OthChrg']) - flt(value_details['Discount'])
if abs(flt(value_details['TotInvVal']) - calculated_invoice_value) > 1:
frappe.throw(_('Total Item Value + Taxes - Discount is not equal to the Invoice Grand Total. Please check taxes / discounts for any correction.'))
def make_einvoice(invoice):
validate_mandatory_fields(invoice)
schema = read_json('einv_template')
transaction_details = get_transaction_details(invoice)
item_list = get_item_list(invoice)
doc_details = get_doc_details(invoice)
invoice_value_details = get_invoice_value_details(invoice)
seller_details = get_party_details(invoice.company_address)
if invoice.gst_category == 'Overseas':
buyer_details = get_overseas_address_details(invoice.customer_address)
else:
buyer_details = get_party_details(invoice.customer_address)
place_of_supply = get_place_of_supply(invoice, invoice.doctype)
if place_of_supply:
place_of_supply = place_of_supply.split('-')[0]
else:
place_of_supply = sanitize_for_json(invoice.billing_address_gstin)[:2]
buyer_details.update(dict(place_of_supply=place_of_supply))
seller_details.update(dict(legal_name=invoice.company))
buyer_details.update(dict(legal_name=invoice.customer_name or invoice.customer))
shipping_details = payment_details = prev_doc_details = eway_bill_details = frappe._dict({})
if invoice.shipping_address_name and invoice.customer_address != invoice.shipping_address_name:
if invoice.gst_category == 'Overseas':
shipping_details = get_overseas_address_details(invoice.shipping_address_name)
else:
shipping_details = get_party_details(invoice.shipping_address_name, is_shipping_address=True)
if invoice.is_pos and invoice.base_paid_amount:
payment_details = get_payment_details(invoice)
if invoice.is_return:
prev_doc_details = get_return_doc_reference(invoice)
if invoice.transporter and not invoice.is_return:
eway_bill_details = get_eway_bill_details(invoice)
# not yet implemented
dispatch_details = period_details = export_details = frappe._dict({})
einvoice = schema.format(
transaction_details=transaction_details, doc_details=doc_details, dispatch_details=dispatch_details,
seller_details=seller_details, buyer_details=buyer_details, shipping_details=shipping_details,
item_list=item_list, invoice_value_details=invoice_value_details, payment_details=payment_details,
period_details=period_details, prev_doc_details=prev_doc_details,
export_details=export_details, eway_bill_details=eway_bill_details
)
try:
einvoice = safe_json_load(einvoice)
einvoice = santize_einvoice_fields(einvoice)
except Exception:
show_link_to_error_log(invoice, einvoice)
validate_totals(einvoice)
return einvoice
def show_link_to_error_log(invoice, einvoice):
err_log = log_error(einvoice)
link_to_error_log = get_link_to_form('Error Log', err_log.name, 'Error Log')
frappe.throw(
_('An error occurred while creating e-invoice for {}. Please check {} for more information.').format(
invoice.name, link_to_error_log),
title=_('E Invoice Creation Failed')
)
def log_error(data=None):
if isinstance(data, six.string_types):
data = json.loads(data)
seperator = "--" * 50
err_tb = traceback.format_exc()
err_msg = str(sys.exc_info()[1])
data = json.dumps(data, indent=4)
message = "\n".join([
"Error", err_msg, seperator,
"Data:", data, seperator,
"Exception:", err_tb
])
frappe.log_error(title=_('E Invoice Request Failed'), message=message)
def santize_einvoice_fields(einvoice):
int_fields = ["Pin","Distance","CrDay"]
float_fields = ["Qty","FreeQty","UnitPrice","TotAmt","Discount","PreTaxVal","AssAmt","GstRt","IgstAmt","CgstAmt","SgstAmt","CesRt","CesAmt","CesNonAdvlAmt","StateCesRt","StateCesAmt","StateCesNonAdvlAmt","OthChrg","TotItemVal","AssVal","CgstVal","SgstVal","IgstVal","CesVal","StCesVal","Discount","OthChrg","RndOffAmt","TotInvVal","TotInvValFc","PaidAmt","PaymtDue","ExpDuty",]
copy = einvoice.copy()
for key, value in copy.items():
if isinstance(value, list):
for idx, d in enumerate(value):
santized_dict = santize_einvoice_fields(d)
if santized_dict:
einvoice[key][idx] = santized_dict
else:
einvoice[key].pop(idx)
if not einvoice[key]:
einvoice.pop(key, None)
elif isinstance(value, dict):
santized_dict = santize_einvoice_fields(value)
if santized_dict:
einvoice[key] = santized_dict
else:
einvoice.pop(key, None)
elif not value or value == "None":
einvoice.pop(key, None)
elif key in float_fields:
einvoice[key] = flt(value, 2)
elif key in int_fields:
einvoice[key] = cint(value)
return einvoice
def safe_json_load(json_string):
JSONDecodeError = ValueError if six.PY2 else json.JSONDecodeError
try:
return json.loads(json_string)
except JSONDecodeError as e:
# print a snippet of 40 characters around the location where error occured
pos = e.pos
start, end = max(0, pos-20), min(len(json_string)-1, pos+20)
snippet = json_string[start:end]
frappe.throw(_("Error in input data. Please check for any special characters near following input: <br> {}").format(snippet))
class RequestFailed(Exception):
pass
class CancellationNotAllowed(Exception):
pass
class GSPConnector():
def __init__(self, doctype=None, docname=None):
self.doctype = doctype
self.docname = docname
self.set_invoice()
self.set_credentials()
# authenticate url is same for sandbox & live
self.authenticate_url = 'https://gsp.adaequare.com/gsp/authenticate?grant_type=token'
self.base_url = 'https://gsp.adaequare.com' if not self.e_invoice_settings.sandbox_mode else 'https://gsp.adaequare.com/test'
self.cancel_irn_url = self.base_url + '/enriched/ei/api/invoice/cancel'
self.irn_details_url = self.base_url + '/enriched/ei/api/invoice/irn'
self.generate_irn_url = self.base_url + '/enriched/ei/api/invoice'
self.gstin_details_url = self.base_url + '/enriched/ei/api/master/gstin'
self.cancel_ewaybill_url = self.base_url + '/enriched/ewb/ewayapi?action=CANEWB'
self.generate_ewaybill_url = self.base_url + '/enriched/ei/api/ewaybill'
def set_invoice(self):
self.invoice = None
if self.doctype and self.docname:
self.invoice = frappe.get_cached_doc(self.doctype, self.docname)
def set_credentials(self):
self.e_invoice_settings = frappe.get_cached_doc('E Invoice Settings')
if not self.e_invoice_settings.enable:
frappe.throw(_("E-Invoicing is disabled. Please enable it from {} to generate e-invoices.").format(get_link_to_form("E Invoice Settings", "E Invoice Settings")))
if self.invoice:
gstin = self.get_seller_gstin()
credentials_for_gstin = [d for d in self.e_invoice_settings.credentials if d.gstin == gstin]
if credentials_for_gstin:
self.credentials = credentials_for_gstin[0]
else:
frappe.throw(_('Cannot find e-invoicing credentials for selected Company GSTIN. Please check E-Invoice Settings'))
else:
self.credentials = self.e_invoice_settings.credentials[0] if self.e_invoice_settings.credentials else None
def get_seller_gstin(self):
gstin = frappe.db.get_value('Address', self.invoice.company_address, 'gstin')
if not gstin:
frappe.throw(_('Cannot retrieve Company GSTIN. Please select company address with valid GSTIN.'))
return gstin
def get_auth_token(self):
if time_diff_in_seconds(self.e_invoice_settings.token_expiry, now_datetime()) < 150.0:
self.fetch_auth_token()
return self.e_invoice_settings.auth_token
def make_request(self, request_type, url, headers=None, data=None):
if request_type == 'post':
res = make_post_request(url, headers=headers, data=data)
else:
res = make_get_request(url, headers=headers, data=data)
self.log_request(url, headers, data, res)
return res
def log_request(self, url, headers, data, res):
headers.update({ 'password': self.credentials.password })
request_log = frappe.get_doc({
"doctype": "E Invoice Request Log",
"user": frappe.session.user,
"reference_invoice": self.invoice.name if self.invoice else None,
"url": url,
"headers": json.dumps(headers, indent=4) if headers else None,
"data": json.dumps(data, indent=4) if isinstance(data, dict) else data,
"response": json.dumps(res, indent=4) if res else None
})
request_log.save(ignore_permissions=True)
frappe.db.commit()
def fetch_auth_token(self):
headers = {
'gspappid': frappe.conf.einvoice_client_id,
'gspappsecret': frappe.conf.einvoice_client_secret
}
res = {}
try:
res = self.make_request('post', self.authenticate_url, headers)
self.e_invoice_settings.auth_token = "{} {}".format(res.get('token_type'), res.get('access_token'))
self.e_invoice_settings.token_expiry = add_to_date(None, seconds=res.get('expires_in'))
self.e_invoice_settings.save(ignore_permissions=True)
self.e_invoice_settings.reload()
except Exception:
log_error(res)
self.raise_error(True)
def get_headers(self):
return {
'content-type': 'application/json',
'user_name': self.credentials.username,
'password': self.credentials.get_password(),
'gstin': self.credentials.gstin,
'authorization': self.get_auth_token(),
'requestid': str(base64.b64encode(os.urandom(18))),
}
def fetch_gstin_details(self, gstin):
headers = self.get_headers()
try:
params = '?gstin={gstin}'.format(gstin=gstin)
res = self.make_request('get', self.gstin_details_url + params, headers)
if res.get('success'):
return res.get('result')
else:
log_error(res)
raise RequestFailed
except RequestFailed:
self.raise_error()
except Exception:
log_error()
self.raise_error(True)
@staticmethod
def get_gstin_details(gstin):
'''fetch and cache GSTIN details'''
if not hasattr(frappe.local, 'gstin_cache'):
frappe.local.gstin_cache = {}
key = gstin
gsp_connector = GSPConnector()
details = gsp_connector.fetch_gstin_details(gstin)
frappe.local.gstin_cache[key] = details
frappe.cache().hset('gstin_cache', key, details)
return details
def generate_irn(self):
data = {}
try:
headers = self.get_headers()
einvoice = make_einvoice(self.invoice)
data = json.dumps(einvoice, indent=4)
res = self.make_request('post', self.generate_irn_url, headers, data)
if res.get('success'):
self.set_einvoice_data(res.get('result'))
elif '2150' in res.get('message'):
# IRN already generated but not updated in invoice
# Extract the IRN from the response description and fetch irn details
irn = res.get('result')[0].get('Desc').get('Irn')
irn_details = self.get_irn_details(irn)
if irn_details:
self.set_einvoice_data(irn_details)
else:
raise RequestFailed('IRN has already been generated for the invoice but cannot fetch details for the it. \
Contact ERPNext support to resolve the issue.')
else:
raise RequestFailed
except RequestFailed:
errors = self.sanitize_error_message(res.get('message'))
self.set_failed_status(errors=errors)
self.raise_error(errors=errors)
except Exception as e:
self.set_failed_status(errors=str(e))
log_error(data)
self.raise_error(True)
@staticmethod
def bulk_generate_irn(invoices):
gsp_connector = GSPConnector()
gsp_connector.doctype = 'Sales Invoice'
failed = []
for invoice in invoices:
try:
gsp_connector.docname = invoice
gsp_connector.set_invoice()
gsp_connector.set_credentials()
gsp_connector.generate_irn()
except Exception as e:
failed.append({
'docname': invoice,
'message': str(e)
})
return failed
def get_irn_details(self, irn):
headers = self.get_headers()
try:
params = '?irn={irn}'.format(irn=irn)
res = self.make_request('get', self.irn_details_url + params, headers)
if res.get('success'):
return res.get('result')
else:
raise RequestFailed
except RequestFailed:
errors = self.sanitize_error_message(res.get('message'))
self.raise_error(errors=errors)
except Exception:
log_error()
self.raise_error(True)
def cancel_irn(self, irn, reason, remark):
data, res = {}, {}
try:
# validate cancellation
if time_diff_in_hours(now_datetime(), self.invoice.ack_date) > 24:
frappe.throw(_('E-Invoice cannot be cancelled after 24 hours of IRN generation.'), title=_('Not Allowed'), exc=CancellationNotAllowed)
if not irn:
frappe.throw(_('IRN not found. You must generate IRN before cancelling.'), title=_('Not Allowed'), exc=CancellationNotAllowed)
headers = self.get_headers()
data = json.dumps({
'Irn': irn,
'Cnlrsn': reason,
'Cnlrem': remark
}, indent=4)
res = self.make_request('post', self.cancel_irn_url, headers, data)
if res.get('success') or '9999' in res.get('message'):
self.invoice.irn_cancelled = 1
self.invoice.irn_cancel_date = res.get('result')['CancelDate'] if res.get('result') else ""
self.invoice.einvoice_status = 'Cancelled'
self.invoice.flags.updater_reference = {
'doctype': self.invoice.doctype,
'docname': self.invoice.name,
'label': _('IRN Cancelled - {}').format(remark)
}
self.update_invoice()
else:
raise RequestFailed
except RequestFailed:
errors = self.sanitize_error_message(res.get('message'))
self.set_failed_status(errors=errors)
self.raise_error(errors=errors)
except CancellationNotAllowed as e:
self.set_failed_status(errors=str(e))
self.raise_error(errors=str(e))
except Exception as e:
self.set_failed_status(errors=str(e))
log_error(data)
self.raise_error(True)
@staticmethod
def bulk_cancel_irn(invoices, reason, remark):
gsp_connector = GSPConnector()
gsp_connector.doctype = 'Sales Invoice'
failed = []
for invoice in invoices:
try:
gsp_connector.docname = invoice
gsp_connector.set_invoice()
gsp_connector.set_credentials()
irn = gsp_connector.invoice.irn
gsp_connector.cancel_irn(irn, reason, remark)
except Exception as e:
failed.append({
'docname': invoice,
'message': str(e)
})
return failed
def generate_eway_bill(self, **kwargs):
args = frappe._dict(kwargs)
headers = self.get_headers()
eway_bill_details = get_eway_bill_details(args)
data = json.dumps({
'Irn': args.irn,
'Distance': cint(eway_bill_details.distance),
'TransMode': eway_bill_details.mode_of_transport,
'TransId': eway_bill_details.gstin,
'TransName': eway_bill_details.transporter,
'TrnDocDt': eway_bill_details.document_date,
'TrnDocNo': eway_bill_details.document_name,
'VehNo': eway_bill_details.vehicle_no,
'VehType': eway_bill_details.vehicle_type
}, indent=4)
try:
res = self.make_request('post', self.generate_ewaybill_url, headers, data)
if res.get('success'):
self.invoice.ewaybill = res.get('result').get('EwbNo')
self.invoice.eway_bill_validity = res.get('result').get('EwbValidTill')
self.invoice.eway_bill_cancelled = 0
self.invoice.update(args)
self.invoice.flags.updater_reference = {
'doctype': self.invoice.doctype,
'docname': self.invoice.name,
'label': _('E-Way Bill Generated')
}
self.update_invoice()
else:
raise RequestFailed
except RequestFailed:
errors = self.sanitize_error_message(res.get('message'))
self.raise_error(errors=errors)
except Exception:
log_error(data)
self.raise_error(True)
def cancel_eway_bill(self, eway_bill, reason, remark):
headers = self.get_headers()
data = json.dumps({
'ewbNo': eway_bill,
'cancelRsnCode': reason,
'cancelRmrk': remark
}, indent=4)
headers["username"] = headers["user_name"]
del headers["user_name"]
try:
res = self.make_request('post', self.cancel_ewaybill_url, headers, data)
if res.get('success'):
self.invoice.ewaybill = ''
self.invoice.eway_bill_cancelled = 1
self.invoice.flags.updater_reference = {
'doctype': self.invoice.doctype,
'docname': self.invoice.name,
'label': _('E-Way Bill Cancelled - {}').format(remark)
}
self.update_invoice()
else:
raise RequestFailed
except RequestFailed:
errors = self.sanitize_error_message(res.get('message'))
self.raise_error(errors=errors)
except Exception:
log_error(data)
self.raise_error(True)
def sanitize_error_message(self, message):
'''
On validation errors, response message looks something like this:
message = '2174 : For inter-state transaction, CGST and SGST amounts are not applicable; only IGST amount is applicable,
3095 : Supplier GSTIN is inactive'
we search for string between ':' to extract the error messages
errors = [
': For inter-state transaction, CGST and SGST amounts are not applicable; only IGST amount is applicable, 3095 ',
': Test'
]
then we trim down the message by looping over errors
'''
if not message:
return []
errors = re.findall(': [^:]+', message)
for idx, e in enumerate(errors):
# remove colons
errors[idx] = errors[idx].replace(':', '').strip()
# if not last
if idx != len(errors) - 1:
# remove last 7 chars eg: ', 3095 '
errors[idx] = errors[idx][:-6]
return errors
def raise_error(self, raise_exception=False, errors=[]):
title = _('E Invoice Request Failed')
if errors:
frappe.throw(errors, title=title, as_list=1)
else:
link_to_error_list = '<a href="desk#List/Error Log/List?method=E Invoice Request Failed">Error Log</a>'
frappe.msgprint(
_('An error occurred while making e-invoicing request. Please check {} for more information.').format(link_to_error_list),
title=title,
raise_exception=raise_exception,
indicator='red'
)
def set_einvoice_data(self, res):
enc_signed_invoice = res.get('SignedInvoice')
dec_signed_invoice = jwt.decode(enc_signed_invoice, verify=False)['data']
self.invoice.irn = res.get('Irn')
self.invoice.ewaybill = res.get('EwbNo')
self.invoice.eway_bill_validity = res.get('EwbValidTill')
self.invoice.ack_no = res.get('AckNo')
self.invoice.ack_date = res.get('AckDt')
self.invoice.signed_einvoice = dec_signed_invoice
self.invoice.ack_no = res.get('AckNo')
self.invoice.ack_date = res.get('AckDt')
self.invoice.signed_qr_code = res.get('SignedQRCode')
self.invoice.einvoice_status = 'Generated'
self.attach_qrcode_image()
self.invoice.flags.updater_reference = {
'doctype': self.invoice.doctype,
'docname': self.invoice.name,
'label': _('IRN Generated')
}
self.update_invoice()
def attach_qrcode_image(self):
qrcode = self.invoice.signed_qr_code
doctype = self.invoice.doctype
docname = self.invoice.name
filename = 'QRCode_{}.png'.format(docname).replace(os.path.sep, "__")
qr_image = io.BytesIO()
url = qrcreate(qrcode, error='L')
url.png(qr_image, scale=2, quiet_zone=1)
_file = frappe.get_doc({
"doctype": "File",
"file_name": filename,
"attached_to_doctype": doctype,
"attached_to_name": docname,
"attached_to_field": "qrcode_image",
"is_private": 1,
"content": qr_image.getvalue()})
_file.save()
frappe.db.commit()
self.invoice.qrcode_image = _file.file_url
def update_invoice(self):
self.invoice.flags.ignore_validate_update_after_submit = True
self.invoice.flags.ignore_validate = True
self.invoice.save()
def set_failed_status(self, errors=None):
frappe.db.rollback()
self.invoice.einvoice_status = 'Failed'
self.invoice.failure_description = self.get_failure_message(errors) if errors else ""
self.update_invoice()
frappe.db.commit()
def get_failure_message(self, errors):
if isinstance(errors, list):
errors = ', '.join(errors)
return errors
def sanitize_for_json(string):
"""Escape JSON specific characters from a string."""
# json.dumps adds double-quotes to the string. Indexing to remove them.
return json.dumps(string)[1:-1]
@frappe.whitelist()
def get_einvoice(doctype, docname):
invoice = frappe.get_doc(doctype, docname)
return make_einvoice(invoice)
@frappe.whitelist()
def generate_irn(doctype, docname):
gsp_connector = GSPConnector(doctype, docname)
gsp_connector.generate_irn()
@frappe.whitelist()
def cancel_irn(doctype, docname, irn, reason, remark):
gsp_connector = GSPConnector(doctype, docname)
gsp_connector.cancel_irn(irn, reason, remark)
@frappe.whitelist()
def generate_eway_bill(doctype, docname, **kwargs):
gsp_connector = GSPConnector(doctype, docname)
gsp_connector.generate_eway_bill(**kwargs)
@frappe.whitelist()
def cancel_eway_bill(doctype, docname):
# TODO: uncomment when eway_bill api from Adequare is enabled
# gsp_connector = GSPConnector(doctype, docname)
# gsp_connector.cancel_eway_bill(eway_bill, reason, remark)
frappe.db.set_value(doctype, docname, 'ewaybill', '')
frappe.db.set_value(doctype, docname, 'eway_bill_cancelled', 1)
@frappe.whitelist()
def generate_einvoices(docnames):
docnames = json.loads(docnames) or []
if len(docnames) < 10:
failures = GSPConnector.bulk_generate_irn(docnames)
frappe.local.message_log = []
if failures:
show_bulk_action_failure_message(failures)
success = len(docnames) - len(failures)
frappe.msgprint(
_('{} e-invoices generated successfully').format(success),
title=_('Bulk E-Invoice Generation Complete')
)
else:
enqueue_bulk_action(schedule_bulk_generate_irn, docnames=docnames)
def schedule_bulk_generate_irn(docnames):
failures = GSPConnector.bulk_generate_irn(docnames)
frappe.local.message_log = []
frappe.publish_realtime("bulk_einvoice_generation_complete", {
"user": frappe.session.user,
"failures": failures,
"invoices": docnames
})
def show_bulk_action_failure_message(failures):
for doc in failures:
docname = '<a href="sales-invoice/{0}">{0}</a>'.format(doc.get('docname'))
message = doc.get('message').replace("'", '"')
if message[0] == '[':
errors = json.loads(message)
error_list = ''.join(['<li>{}</li>'.format(err) for err in errors])
message = '''{} has following errors:<br>
<ul style="padding-left: 20px; padding-top: 5px">{}</ul>'''.format(docname, error_list)
else:
message = '{} - {}'.format(docname, message)
frappe.msgprint(
message,
title=_('Bulk E-Invoice Generation Complete'),
indicator='red'
)
@frappe.whitelist()
def cancel_irns(docnames, reason, remark):
docnames = json.loads(docnames) or []
if len(docnames) < 10:
failures = GSPConnector.bulk_cancel_irn(docnames, reason, remark)
frappe.local.message_log = []
if failures:
show_bulk_action_failure_message(failures)
success = len(docnames) - len(failures)
frappe.msgprint(
_('{} e-invoices cancelled successfully').format(success),
title=_('Bulk E-Invoice Cancellation Complete')
)
else:
enqueue_bulk_action(schedule_bulk_cancel_irn, docnames=docnames, reason=reason, remark=remark)
def schedule_bulk_cancel_irn(docnames, reason, remark):
failures = GSPConnector.bulk_cancel_irn(docnames, reason, remark)
frappe.local.message_log = []
frappe.publish_realtime("bulk_einvoice_cancellation_complete", {
"user": frappe.session.user,
"failures": failures,
"invoices": docnames
})
def enqueue_bulk_action(job, **kwargs):
check_scheduler_status()
enqueue(
job,
**kwargs,
queue="long",
timeout=10000,
event="processing_bulk_einvoice_action",
now=frappe.conf.developer_mode or frappe.flags.in_test,
)
if job == schedule_bulk_generate_irn:
msg = _('E-Invoices will be generated in a background process.')
else:
msg = _('E-Invoices will be cancelled in a background process.')
frappe.msgprint(msg, alert=1)
def check_scheduler_status():
if is_scheduler_inactive() and not frappe.flags.in_test:
frappe.throw(_("Scheduler is inactive. Cannot enqueue job."), title=_("Scheduler Inactive"))
def job_already_enqueued(job_name):
enqueued_jobs = [d.get("job_name") for d in get_info()]
if job_name in enqueued_jobs:
return True