feat: Track progress in Log Batch/Job wise
- This was done due to stale reads while the background jobs tried updating status of the log - Added a table where all bom jobs within log will be tracked with what level they are processing - Cron job will check if table jobs are all processed every 5 mins - If yes, it will prepare parents and call `process_boms_cost_level_wise` to start next level - If pending jobs, do nothing - Current BOM Level is being tracked that helps adding rows to the table - Individual bom cost jobs (that are queued) will process and update boms > will update BOM Update Batch table row with list of updated BOMs
This commit is contained in:
parent
a62bc9b6c9
commit
62857e3e08
@ -0,0 +1,45 @@
|
|||||||
|
{
|
||||||
|
"actions": [],
|
||||||
|
"autoname": "autoincrement",
|
||||||
|
"creation": "2022-05-31 17:34:39.825537",
|
||||||
|
"doctype": "DocType",
|
||||||
|
"engine": "InnoDB",
|
||||||
|
"field_order": [
|
||||||
|
"level",
|
||||||
|
"batch_no",
|
||||||
|
"boms_updated"
|
||||||
|
],
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"fieldname": "level",
|
||||||
|
"fieldtype": "Int",
|
||||||
|
"in_list_view": 1,
|
||||||
|
"label": "Level"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldname": "batch_no",
|
||||||
|
"fieldtype": "Int",
|
||||||
|
"in_list_view": 1,
|
||||||
|
"label": "Batch No."
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldname": "boms_updated",
|
||||||
|
"fieldtype": "Long Text",
|
||||||
|
"in_list_view": 1,
|
||||||
|
"label": "BOMs Updated"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"index_web_pages_for_search": 1,
|
||||||
|
"istable": 1,
|
||||||
|
"links": [],
|
||||||
|
"modified": "2022-05-31 23:36:13.628391",
|
||||||
|
"modified_by": "Administrator",
|
||||||
|
"module": "Manufacturing",
|
||||||
|
"name": "BOM Update Batch",
|
||||||
|
"naming_rule": "Autoincrement",
|
||||||
|
"owner": "Administrator",
|
||||||
|
"permissions": [],
|
||||||
|
"sort_field": "modified",
|
||||||
|
"sort_order": "DESC",
|
||||||
|
"states": []
|
||||||
|
}
|
@ -0,0 +1,9 @@
|
|||||||
|
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
|
||||||
|
# For license information, please see license.txt
|
||||||
|
|
||||||
|
# import frappe
|
||||||
|
from frappe.model.document import Document
|
||||||
|
|
||||||
|
|
||||||
|
class BOMUpdateBatch(Document):
|
||||||
|
pass
|
@ -14,9 +14,10 @@
|
|||||||
"status",
|
"status",
|
||||||
"error_log",
|
"error_log",
|
||||||
"progress_section",
|
"progress_section",
|
||||||
"current_boms",
|
"current_level",
|
||||||
"parent_boms",
|
"parent_boms",
|
||||||
"processed_boms",
|
"processed_boms",
|
||||||
|
"bom_batches",
|
||||||
"amended_from"
|
"amended_from"
|
||||||
],
|
],
|
||||||
"fields": [
|
"fields": [
|
||||||
@ -70,15 +71,11 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"collapsible": 1,
|
"collapsible": 1,
|
||||||
|
"depends_on": "eval: doc.update_type == \"Update Cost\"",
|
||||||
"fieldname": "progress_section",
|
"fieldname": "progress_section",
|
||||||
"fieldtype": "Section Break",
|
"fieldtype": "Section Break",
|
||||||
"label": "Progress"
|
"label": "Progress"
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"fieldname": "current_boms",
|
|
||||||
"fieldtype": "Long Text",
|
|
||||||
"label": "Current BOMs"
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"description": "Immediate parent BOMs",
|
"description": "Immediate parent BOMs",
|
||||||
"fieldname": "parent_boms",
|
"fieldname": "parent_boms",
|
||||||
@ -89,13 +86,23 @@
|
|||||||
"fieldname": "processed_boms",
|
"fieldname": "processed_boms",
|
||||||
"fieldtype": "Long Text",
|
"fieldtype": "Long Text",
|
||||||
"label": "Processed BOMs"
|
"label": "Processed BOMs"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldname": "bom_batches",
|
||||||
|
"fieldtype": "Table",
|
||||||
|
"options": "BOM Update Batch"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldname": "current_level",
|
||||||
|
"fieldtype": "Int",
|
||||||
|
"label": "Current Level"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"in_create": 1,
|
"in_create": 1,
|
||||||
"index_web_pages_for_search": 1,
|
"index_web_pages_for_search": 1,
|
||||||
"is_submittable": 1,
|
"is_submittable": 1,
|
||||||
"links": [],
|
"links": [],
|
||||||
"modified": "2022-05-27 17:03:34.712010",
|
"modified": "2022-05-31 20:20:06.370786",
|
||||||
"modified_by": "Administrator",
|
"modified_by": "Administrator",
|
||||||
"module": "Manufacturing",
|
"module": "Manufacturing",
|
||||||
"name": "BOM Update Log",
|
"name": "BOM Update Log",
|
||||||
|
@ -1,15 +1,16 @@
|
|||||||
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
|
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
|
||||||
# For license information, please see license.txt
|
# For license information, please see license.txt
|
||||||
import json
|
import json
|
||||||
from typing import Dict, Optional
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
import frappe
|
import frappe
|
||||||
from frappe import _
|
from frappe import _
|
||||||
from frappe.model.document import Document
|
from frappe.model.document import Document
|
||||||
from frappe.utils import cstr
|
from frappe.utils import cint, cstr
|
||||||
|
|
||||||
from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import (
|
from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import (
|
||||||
get_leaf_boms,
|
get_leaf_boms,
|
||||||
|
get_next_higher_level_boms,
|
||||||
handle_exception,
|
handle_exception,
|
||||||
replace_bom,
|
replace_bom,
|
||||||
set_values_in_log,
|
set_values_in_log,
|
||||||
@ -111,55 +112,110 @@ def process_boms_cost_level_wise(update_doc: "BOMUpdateLog") -> None:
|
|||||||
|
|
||||||
if update_doc.status == "Queued":
|
if update_doc.status == "Queued":
|
||||||
# First level yet to process. On Submit.
|
# First level yet to process. On Submit.
|
||||||
current_boms = {bom: False for bom in get_leaf_boms()}
|
current_level = 0
|
||||||
|
current_boms = get_leaf_boms()
|
||||||
values = {
|
values = {
|
||||||
"current_boms": json.dumps(current_boms),
|
|
||||||
"parent_boms": "[]",
|
"parent_boms": "[]",
|
||||||
"processed_boms": json.dumps({}),
|
"processed_boms": json.dumps({}),
|
||||||
"status": "In Progress",
|
"status": "In Progress",
|
||||||
|
"current_level": current_level,
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
# status is Paused, resume. via Cron Job.
|
# Resume next level. via Cron Job.
|
||||||
current_boms, parent_boms = json.loads(update_doc.current_boms), json.loads(
|
current_level = cint(update_doc.current_level) + 1
|
||||||
update_doc.parent_boms
|
parent_boms = json.loads(update_doc.parent_boms)
|
||||||
)
|
|
||||||
if not current_boms:
|
# Process the next level BOMs. Stage parents as current BOMs.
|
||||||
# Process the next level BOMs. Stage parents as current BOMs.
|
current_boms = parent_boms.copy()
|
||||||
current_boms = {bom: False for bom in parent_boms}
|
values = {"parent_boms": "[]", "current_level": current_level}
|
||||||
values = {
|
|
||||||
"current_boms": json.dumps(current_boms),
|
|
||||||
"parent_boms": "[]",
|
|
||||||
"status": "In Progress",
|
|
||||||
}
|
|
||||||
|
|
||||||
set_values_in_log(update_doc.name, values, commit=True)
|
set_values_in_log(update_doc.name, values, commit=True)
|
||||||
queue_bom_cost_jobs(current_boms, update_doc)
|
queue_bom_cost_jobs(current_boms, update_doc, current_level)
|
||||||
|
|
||||||
|
|
||||||
def queue_bom_cost_jobs(current_boms: Dict, update_doc: "BOMUpdateLog") -> None:
|
def queue_bom_cost_jobs(
|
||||||
|
current_boms_list: List, update_doc: "BOMUpdateLog", current_level: int
|
||||||
|
) -> None:
|
||||||
"Queue batches of 20k BOMs of the same level to process parallelly"
|
"Queue batches of 20k BOMs of the same level to process parallelly"
|
||||||
current_boms_list = [bom for bom in current_boms]
|
batch_no = 0
|
||||||
|
|
||||||
while current_boms_list:
|
while current_boms_list:
|
||||||
|
batch_no += 1
|
||||||
batch_size = 20_000
|
batch_size = 20_000
|
||||||
boms_to_process = current_boms_list[:batch_size] # slice out batch of 20k BOMs
|
boms_to_process = current_boms_list[:batch_size] # slice out batch of 20k BOMs
|
||||||
|
|
||||||
# update list to exclude 20K (queued) BOMs
|
# update list to exclude 20K (queued) BOMs
|
||||||
current_boms_list = current_boms_list[batch_size:] if len(current_boms_list) > batch_size else []
|
current_boms_list = current_boms_list[batch_size:] if len(current_boms_list) > batch_size else []
|
||||||
|
|
||||||
|
batch_row = update_doc.append("bom_batches", {"level": current_level, "batch_no": batch_no})
|
||||||
|
batch_row.db_insert()
|
||||||
|
|
||||||
frappe.enqueue(
|
frappe.enqueue(
|
||||||
method="erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils.update_cost_in_level",
|
method="erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils.update_cost_in_level",
|
||||||
doc=update_doc,
|
doc=update_doc,
|
||||||
bom_list=boms_to_process,
|
bom_list=boms_to_process,
|
||||||
|
batch_name=batch_row.name,
|
||||||
timeout=40000,
|
timeout=40000,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def resume_bom_cost_update_jobs():
|
def resume_bom_cost_update_jobs():
|
||||||
"Called every 10 minutes via Cron job."
|
"""
|
||||||
paused_jobs = frappe.db.get_all("BOM Update Log", {"status": "Paused"})
|
1. Checks for In Progress BOM Update Log.
|
||||||
if not paused_jobs:
|
2. Checks if this job has completed the _current level_.
|
||||||
|
3. If current level is complete, get parent BOMs and start next level.
|
||||||
|
4. If no parents, mark as Complete.
|
||||||
|
5. If current level is WIP, skip the Log.
|
||||||
|
|
||||||
|
Called every 5 minutes via Cron job.
|
||||||
|
"""
|
||||||
|
|
||||||
|
in_progress_logs = frappe.db.get_all(
|
||||||
|
"BOM Update Log",
|
||||||
|
{"update_type": "Update Cost", "status": "In Progress"},
|
||||||
|
["name", "processed_boms", "current_level"],
|
||||||
|
)
|
||||||
|
if not in_progress_logs:
|
||||||
return
|
return
|
||||||
|
|
||||||
for job in paused_jobs:
|
for log in in_progress_logs:
|
||||||
# resume from next level
|
# check if all log batches of current level are processed
|
||||||
process_boms_cost_level_wise(update_doc=frappe.get_doc("BOM Update Log", job.name))
|
bom_batches = frappe.db.get_all(
|
||||||
|
"BOM Update Batch", {"parent": log.name, "level": log.current_level}, ["name", "boms_updated"]
|
||||||
|
)
|
||||||
|
incomplete_level = any(not row.get("boms_updated") for row in bom_batches)
|
||||||
|
if not bom_batches or incomplete_level:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Prep parent BOMs & updated processed BOMs for next level
|
||||||
|
current_boms, processed_boms = get_processed_current_boms(log, bom_batches)
|
||||||
|
parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms)
|
||||||
|
|
||||||
|
set_values_in_log(
|
||||||
|
log.name,
|
||||||
|
values={
|
||||||
|
"processed_boms": json.dumps(processed_boms),
|
||||||
|
"parent_boms": json.dumps(parent_boms),
|
||||||
|
"status": "Completed" if not parent_boms else "In Progress",
|
||||||
|
},
|
||||||
|
commit=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if parent_boms: # there is a next level to process
|
||||||
|
process_boms_cost_level_wise(update_doc=frappe.get_doc("BOM Update Log", log.name))
|
||||||
|
|
||||||
|
|
||||||
|
def get_processed_current_boms(
|
||||||
|
log: Dict[str, Any], bom_batches: Dict[str, Any]
|
||||||
|
) -> Tuple[List[str], Dict[str, Any]]:
|
||||||
|
"Aggregate all BOMs from BOM Update Batch rows into 'processed_boms' field and into current boms list."
|
||||||
|
processed_boms = json.loads(log.processed_boms) if log.processed_boms else {}
|
||||||
|
current_boms = []
|
||||||
|
|
||||||
|
for row in bom_batches:
|
||||||
|
boms_updated = json.loads(row.boms_updated)
|
||||||
|
current_boms.extend(boms_updated)
|
||||||
|
boms_updated_dict = {bom: True for bom in boms_updated}
|
||||||
|
processed_boms.update(boms_updated_dict)
|
||||||
|
|
||||||
|
return current_boms, processed_boms
|
||||||
|
@ -38,7 +38,7 @@ def replace_bom(boms: Dict) -> None:
|
|||||||
bom_obj.save_version()
|
bom_obj.save_version()
|
||||||
|
|
||||||
|
|
||||||
def update_cost_in_level(doc: "BOMUpdateLog", bom_list: List[str]) -> None:
|
def update_cost_in_level(doc: "BOMUpdateLog", bom_list: List[str], batch_name: int) -> None:
|
||||||
"Updates Cost for BOMs within a given level. Runs via background jobs."
|
"Updates Cost for BOMs within a given level. Runs via background jobs."
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -47,19 +47,9 @@ def update_cost_in_level(doc: "BOMUpdateLog", bom_list: List[str]) -> None:
|
|||||||
return
|
return
|
||||||
|
|
||||||
frappe.db.auto_commit_on_many_writes = 1
|
frappe.db.auto_commit_on_many_writes = 1
|
||||||
# main updation logic
|
|
||||||
job_data = update_cost_in_boms(bom_list=bom_list, docname=doc.name)
|
|
||||||
|
|
||||||
set_values_in_log(
|
update_cost_in_boms(bom_list=bom_list) # main updation logic
|
||||||
doc.name,
|
frappe.db.set_value("BOM Update Batch", batch_name, "boms_updated", json.dumps(bom_list))
|
||||||
values={
|
|
||||||
"current_boms": json.dumps(job_data.get("current_boms")),
|
|
||||||
"processed_boms": json.dumps(job_data.get("processed_boms")),
|
|
||||||
},
|
|
||||||
commit=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
process_if_level_is_complete(doc.name, job_data["current_boms"], job_data["processed_boms"])
|
|
||||||
except Exception:
|
except Exception:
|
||||||
handle_exception(doc)
|
handle_exception(doc)
|
||||||
finally:
|
finally:
|
||||||
@ -112,48 +102,13 @@ def get_bom_unit_cost(bom_name: str) -> float:
|
|||||||
return frappe.utils.flt(new_bom_unitcost[0][0])
|
return frappe.utils.flt(new_bom_unitcost[0][0])
|
||||||
|
|
||||||
|
|
||||||
def update_cost_in_boms(bom_list: List[str], docname: str) -> Dict[str, Dict]:
|
def update_cost_in_boms(bom_list: List[str]) -> None:
|
||||||
"Updates cost in given BOMs. Returns current and total updated BOMs."
|
"Updates cost in given BOMs. Returns current and total updated BOMs."
|
||||||
|
|
||||||
updated_boms = {} # current boms that have been updated
|
|
||||||
|
|
||||||
for bom in bom_list:
|
for bom in bom_list:
|
||||||
bom_doc = frappe.get_cached_doc("BOM", bom)
|
bom_doc = frappe.get_cached_doc("BOM", bom)
|
||||||
bom_doc.calculate_cost(save_updates=True, update_hour_rate=True)
|
bom_doc.calculate_cost(save_updates=True, update_hour_rate=True)
|
||||||
bom_doc.db_update()
|
bom_doc.db_update()
|
||||||
updated_boms[bom] = True
|
|
||||||
|
|
||||||
# Update processed BOMs in Log
|
|
||||||
log_data = frappe.db.get_values(
|
|
||||||
"BOM Update Log", docname, ["current_boms", "processed_boms"], as_dict=True
|
|
||||||
)[0]
|
|
||||||
|
|
||||||
for field in ("current_boms", "processed_boms"):
|
|
||||||
log_data[field] = json.loads(log_data.get(field))
|
|
||||||
log_data[field].update(updated_boms)
|
|
||||||
|
|
||||||
return log_data
|
|
||||||
|
|
||||||
|
|
||||||
def process_if_level_is_complete(
|
|
||||||
docname: str, current_boms: Dict[str, bool], processed_boms: Dict[str, bool]
|
|
||||||
) -> None:
|
|
||||||
"Prepare and set higher level BOMs/dependants in Log if current level is complete."
|
|
||||||
|
|
||||||
processing_complete = all(current_boms.get(bom) for bom in current_boms)
|
|
||||||
if not processing_complete:
|
|
||||||
return
|
|
||||||
|
|
||||||
parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms)
|
|
||||||
set_values_in_log(
|
|
||||||
docname,
|
|
||||||
values={
|
|
||||||
"current_boms": json.dumps({}),
|
|
||||||
"parent_boms": json.dumps(parent_boms),
|
|
||||||
"status": "Completed" if not parent_boms else "Paused",
|
|
||||||
},
|
|
||||||
commit=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def get_next_higher_level_boms(
|
def get_next_higher_level_boms(
|
||||||
@ -244,7 +199,7 @@ def set_values_in_log(log_name: str, values: Dict[str, Any], commit: bool = Fals
|
|||||||
query.run()
|
query.run()
|
||||||
|
|
||||||
if commit:
|
if commit:
|
||||||
frappe.db.commit()
|
frappe.db.commit() # nosemgrep
|
||||||
|
|
||||||
|
|
||||||
def handle_exception(doc: "BOMUpdateLog") -> None:
|
def handle_exception(doc: "BOMUpdateLog") -> None:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user