brotherton-erpnext/erpnext/utilities/bulk_transaction.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

201 lines
5.8 KiB
Python
Raw Normal View History

import json
from datetime import date, datetime
import frappe
from frappe import _
@frappe.whitelist()
def transaction_processing(data, from_doctype, to_doctype):
if isinstance(data, str):
deserialized_data = json.loads(data)
else:
deserialized_data = data
length_of_data = len(deserialized_data)
if length_of_data > 10:
frappe.msgprint(
_("Started a background job to create {1} {0}").format(to_doctype, length_of_data)
)
frappe.enqueue(
job,
deserialized_data=deserialized_data,
from_doctype=from_doctype,
to_doctype=to_doctype,
)
else:
job(deserialized_data, from_doctype, to_doctype)
def job(deserialized_data, from_doctype, to_doctype):
fail_count = 0
for d in deserialized_data:
try:
doc_name = d.get("name")
frappe.db.savepoint("before_creation_state")
task(doc_name, from_doctype, to_doctype)
except Exception as e:
frappe.db.rollback(save_point="before_creation_state")
fail_count += 1
update_logger(
doc_name,
str(frappe.get_traceback()),
from_doctype,
to_doctype,
status="Failed",
log_date=str(date.today()),
2022-03-28 18:52:46 +05:30
)
else:
update_logger(
doc_name, None, from_doctype, to_doctype, status="Success", log_date=str(date.today())
2022-03-28 18:52:46 +05:30
)
show_job_status(fail_count, len(deserialized_data), to_doctype)
def task(doc_name, from_doctype, to_doctype):
from erpnext.accounts.doctype.payment_entry import payment_entry
from erpnext.accounts.doctype.purchase_invoice import purchase_invoice
from erpnext.accounts.doctype.sales_invoice import sales_invoice
from erpnext.buying.doctype.purchase_order import purchase_order
from erpnext.buying.doctype.supplier_quotation import supplier_quotation
from erpnext.selling.doctype.quotation import quotation
from erpnext.selling.doctype.sales_order import sales_order
from erpnext.stock.doctype.delivery_note import delivery_note
from erpnext.stock.doctype.purchase_receipt import purchase_receipt
mapper = {
"Sales Order": {
"Sales Invoice": sales_order.make_sales_invoice,
"Delivery Note": sales_order.make_delivery_note,
"Payment Entry": payment_entry.get_payment_entry,
},
"Sales Invoice": {
"Delivery Note": sales_invoice.make_delivery_note,
"Payment Entry": payment_entry.get_payment_entry,
},
"Delivery Note": {
"Sales Invoice": delivery_note.make_sales_invoice,
"Packing Slip": delivery_note.make_packing_slip,
},
"Quotation": {
"Sales Order": quotation.make_sales_order,
"Sales Invoice": quotation.make_sales_invoice,
},
"Supplier Quotation": {
"Purchase Order": supplier_quotation.make_purchase_order,
"Purchase Invoice": supplier_quotation.make_purchase_invoice,
},
"Purchase Order": {
"Purchase Invoice": purchase_order.make_purchase_invoice,
"Purchase Receipt": purchase_order.make_purchase_receipt,
"Payment Entry": payment_entry.get_payment_entry,
},
"Purchase Invoice": {
"Purchase Receipt": purchase_invoice.make_purchase_receipt,
"Payment Entry": payment_entry.get_payment_entry,
},
"Purchase Receipt": {"Purchase Invoice": purchase_receipt.make_purchase_invoice},
}
if to_doctype in ["Payment Entry"]:
obj = mapper[from_doctype][to_doctype](from_doctype, doc_name)
else:
obj = mapper[from_doctype][to_doctype](doc_name)
obj.flags.ignore_validate = True
obj.set_title_field()
obj.insert(ignore_mandatory=True)
def check_logger_doc_exists(log_date):
return frappe.db.exists("Bulk Transaction Log", log_date)
def get_logger_doc(log_date):
return frappe.get_doc("Bulk Transaction Log", log_date)
def create_logger_doc():
log_doc = frappe.new_doc("Bulk Transaction Log")
log_doc.set_new_name(set_name=str(date.today()))
log_doc.log_date = date.today()
return log_doc
def append_data_to_logger(log_doc, doc_name, error, from_doctype, to_doctype, status, restarted):
row = log_doc.append("logger_data", {})
row.transaction_name = doc_name
row.date = date.today()
now = datetime.now()
row.time = now.strftime("%H:%M:%S")
row.transaction_status = status
row.error_description = str(error)
row.from_doctype = from_doctype
row.to_doctype = to_doctype
row.retried = restarted
def update_logger(doc_name, e, from_doctype, to_doctype, status, log_date=None, restarted=0):
if not check_logger_doc_exists(log_date):
log_doc = create_logger_doc()
append_data_to_logger(log_doc, doc_name, e, from_doctype, to_doctype, status, restarted)
log_doc.insert()
else:
log_doc = get_logger_doc(log_date)
if record_exists(log_doc, doc_name, status):
append_data_to_logger(log_doc, doc_name, e, from_doctype, to_doctype, status, restarted)
log_doc.save()
def show_job_status(fail_count, deserialized_data_count, to_doctype):
if not fail_count:
frappe.msgprint(
_("Creation of <b><a href='/app/{0}'>{1}(s)</a></b> successful").format(
to_doctype.lower().replace(" ", "-"), to_doctype
),
title="Successful",
indicator="green",
)
elif fail_count != 0 and fail_count < deserialized_data_count:
frappe.msgprint(
_(
"""Creation of {0} partially successful.
Check <b><a href="/app/bulk-transaction-log">Bulk Transaction Log</a></b>"""
).format(to_doctype),
title="Partially successful",
indicator="orange",
)
else:
frappe.msgprint(
_(
"""Creation of {0} failed.
Check <b><a href="/app/bulk-transaction-log">Bulk Transaction Log</a></b>"""
).format(to_doctype),
title="Failed",
indicator="red",
)
def record_exists(log_doc, doc_name, status):
record = mark_retrired_transaction(log_doc, doc_name)
if record and status == "Failed":
return False
elif record and status == "Success":
return True
else:
return True
def mark_retrired_transaction(log_doc, doc_name):
record = 0
for d in log_doc.get("logger_data"):
if d.transaction_name == doc_name and d.transaction_status == "Failed":
d.retried = 1
record = record + 1
log_doc.save()
return record