feat: Level-wise BOM cost updation

- Process BOMs level wise and Pause after level is complete
- Cron job will resume Paused jobs, which will again process the new level and pause at the end
- This will go on until all BOMs are updated
- Added Progress section with fields to track updated BOMs in Log
- Cleanup: Add BOM Updation utils file to contain helper functions/sub-functions
- Cleanup: BOM Update Log file will only contain functions that are in direct context of the Log

Co-authored-by: Gavin D'souza <gavin18d@gmail.com>
This commit is contained in:
marination 2022-05-23 13:00:00 +05:30
parent 90d4dc0cd6
commit ab2d95a74d
6 changed files with 335 additions and 199 deletions

View File

@ -392,9 +392,12 @@ after_migrate = ["erpnext.setup.install.update_select_perm_after_install"]
scheduler_events = {
"cron": {
"0/5 * * * *": [
"erpnext.manufacturing.doctype.bom_update_log.bom_update_log.resume_bom_cost_update_jobs",
],
"0/30 * * * *": [
"erpnext.utilities.doctype.video.video.update_youtube_data",
]
],
},
"all": [
"erpnext.projects.doctype.project.project.project_status_update_reminder",

View File

@ -13,6 +13,10 @@
"update_type",
"status",
"error_log",
"progress_section",
"current_boms",
"parent_boms",
"processed_boms",
"amended_from"
],
"fields": [
@ -47,7 +51,7 @@
"fieldname": "status",
"fieldtype": "Select",
"label": "Status",
"options": "Queued\nIn Progress\nCompleted\nFailed"
"options": "Queued\nIn Progress\nPaused\nCompleted\nFailed"
},
{
"fieldname": "amended_from",
@ -63,13 +67,34 @@
"fieldtype": "Link",
"label": "Error Log",
"options": "Error Log"
},
{
"fieldname": "progress_section",
"fieldtype": "Section Break",
"label": "Progress"
},
{
"fieldname": "current_boms",
"fieldtype": "Text",
"label": "Current BOMs"
},
{
"description": "Immediate parent BOMs",
"fieldname": "parent_boms",
"fieldtype": "Text",
"label": "Parent BOMs"
},
{
"fieldname": "processed_boms",
"fieldtype": "Text",
"label": "Processed BOMs"
}
],
"in_create": 1,
"index_web_pages_for_search": 1,
"is_submittable": 1,
"links": [],
"modified": "2022-03-31 12:51:44.885102",
"modified": "2022-05-23 14:42:14.725914",
"modified_by": "Administrator",
"module": "Manufacturing",
"name": "BOM Update Log",

View File

@ -1,13 +1,19 @@
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
from typing import Dict, List, Literal, Optional
import json
from typing import Dict, Optional
import frappe
from frappe import _
from frappe.model.document import Document
from frappe.utils import cstr, flt
from frappe.utils import cstr
from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost
from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import (
get_leaf_boms,
handle_exception,
replace_bom,
set_values_in_log,
)
class BOMMissingError(frappe.ValidationError):
@ -49,116 +55,93 @@ class BOMUpdateLog(Document):
if self.update_type == "Replace BOM":
boms = {"current_bom": self.current_bom, "new_bom": self.new_bom}
frappe.enqueue(
method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job",
method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_replace_bom_job",
doc=self,
boms=boms,
timeout=40000,
)
else:
frappe.enqueue(
method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job",
doc=self,
update_type="Update Cost",
timeout=40000,
)
process_boms_cost_level_wise(self)
def replace_bom(boms: Dict) -> None:
"""Replace current BOM with new BOM in parent BOMs."""
current_bom = boms.get("current_bom")
new_bom = boms.get("new_bom")
unit_cost = get_new_bom_unit_cost(new_bom)
update_new_bom_in_bom_items(unit_cost, current_bom, new_bom)
frappe.cache().delete_key("bom_children")
parent_boms = get_parent_boms(new_bom)
for bom in parent_boms:
bom_obj = frappe.get_doc("BOM", bom)
# this is only used for versioning and we do not want
# to make separate db calls by using load_doc_before_save
# which proves to be expensive while doing bulk replace
bom_obj._doc_before_save = bom_obj
bom_obj.update_exploded_items()
bom_obj.calculate_cost()
bom_obj.update_parent_cost()
bom_obj.db_update()
if bom_obj.meta.get("track_changes") and not bom_obj.flags.ignore_version:
bom_obj.save_version()
def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None:
bom_item = frappe.qb.DocType("BOM Item")
(
frappe.qb.update(bom_item)
.set(bom_item.bom_no, new_bom)
.set(bom_item.rate, unit_cost)
.set(bom_item.amount, (bom_item.stock_qty * unit_cost))
.where(
(bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")
)
).run()
def get_parent_boms(new_bom: str, bom_list: Optional[List] = None) -> List:
bom_list = bom_list or []
bom_item = frappe.qb.DocType("BOM Item")
parents = (
frappe.qb.from_(bom_item)
.select(bom_item.parent)
.where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM"))
.run(as_dict=True)
)
for d in parents:
if new_bom == d.parent:
frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent))
bom_list.append(d.parent)
get_parent_boms(d.parent, bom_list)
return list(set(bom_list))
def get_new_bom_unit_cost(new_bom: str) -> float:
bom = frappe.qb.DocType("BOM")
new_bom_unitcost = (
frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == new_bom).run()
)
return flt(new_bom_unitcost[0][0])
def run_bom_job(
def run_replace_bom_job(
doc: "BOMUpdateLog",
boms: Optional[Dict[str, str]] = None,
update_type: Literal["Replace BOM", "Update Cost"] = "Replace BOM",
) -> None:
try:
doc.db_set("status", "In Progress")
if not frappe.flags.in_test:
frappe.db.commit()
frappe.db.auto_commit_on_many_writes = 1
boms = frappe._dict(boms or {})
if update_type == "Replace BOM":
replace_bom(boms)
else:
update_cost()
replace_bom(boms)
doc.db_set("status", "Completed")
except Exception:
frappe.db.rollback()
error_log = doc.log_error("BOM Update Tool Error")
doc.db_set("status", "Failed")
doc.db_set("error_log", error_log.name)
handle_exception(doc)
finally:
frappe.db.auto_commit_on_many_writes = 0
frappe.db.commit() # nosemgrep
def process_boms_cost_level_wise(update_doc: "BOMUpdateLog") -> None:
"Queue jobs at the start of new BOM Level in 'Update Cost' Jobs."
current_boms, parent_boms = {}, []
values = {}
if update_doc.status == "Queued":
# First level yet to process. On Submit.
current_boms = {bom: False for bom in get_leaf_boms()}
values = {
"current_boms": json.dumps(current_boms),
"parent_boms": "[]",
"processed_boms": json.dumps({}),
"status": "In Progress",
}
else:
# status is Paused, resume. via Cron Job.
current_boms, parent_boms = json.loads(update_doc.current_boms), json.loads(
update_doc.parent_boms
)
if not current_boms:
# Process the next level BOMs. Stage parents as current BOMs.
current_boms = {bom: False for bom in parent_boms}
values = {
"current_boms": json.dumps(current_boms),
"parent_boms": "[]",
"status": "In Progress",
}
set_values_in_log(update_doc.name, values, commit=True)
queue_bom_cost_jobs(current_boms, update_doc)
def queue_bom_cost_jobs(current_boms: Dict, update_doc: "BOMUpdateLog") -> None:
"Queue batches of 20k BOMs of the same level to process parallelly"
current_boms_list = [bom for bom in current_boms]
while current_boms_list:
boms_to_process = current_boms_list[:20000] # slice out batch of 20k BOMs
# update list to exclude 20K (queued) BOMs
current_boms_list = current_boms_list[20000:] if len(current_boms_list) > 20000 else []
frappe.enqueue(
method="erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils.update_cost_in_level",
doc=update_doc,
bom_list=boms_to_process,
timeout=40000,
)
def resume_bom_cost_update_jobs():
"Called every 10 minutes via Cron job."
paused_jobs = frappe.db.get_all("BOM Update Log", {"status": "Paused"})
if not paused_jobs:
return
for job in paused_jobs:
# resume from next level
process_boms_cost_level_wise(update_doc=frappe.get_doc("BOM Update Log", job.name))

View File

@ -0,0 +1,223 @@
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
import json
from collections import defaultdict
from typing import TYPE_CHECKING, Dict, List, Optional
if TYPE_CHECKING:
from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import BOMUpdateLog
import frappe
from frappe import _
def replace_bom(boms: Dict) -> None:
"""Replace current BOM with new BOM in parent BOMs."""
current_bom = boms.get("current_bom")
new_bom = boms.get("new_bom")
unit_cost = get_bom_unit_cost(new_bom)
update_new_bom_in_bom_items(unit_cost, current_bom, new_bom)
frappe.cache().delete_key("bom_children")
parent_boms = get_ancestor_boms(new_bom)
for bom in parent_boms:
bom_obj = frappe.get_doc("BOM", bom)
# this is only used for versioning and we do not want
# to make separate db calls by using load_doc_before_save
# which proves to be expensive while doing bulk replace
bom_obj._doc_before_save = bom_obj
bom_obj.update_exploded_items()
bom_obj.calculate_cost()
bom_obj.update_parent_cost()
bom_obj.db_update()
if bom_obj.meta.get("track_changes") and not bom_obj.flags.ignore_version:
bom_obj.save_version()
def update_cost_in_level(doc: "BOMUpdateLog", bom_list: List[str]) -> None:
"Updates Cost for BOMs within a given level. Runs via background jobs."
try:
status = frappe.db.get_value("BOM Update Log", doc.name, "status")
if status == "Failed":
return
frappe.db.auto_commit_on_many_writes = 1
# main updation logic
job_data = update_cost_in_boms(bom_list=bom_list, docname=doc.name)
set_values_in_log(
doc.name,
values={
"current_boms": json.dumps(job_data.get("current_boms")),
"processed_boms": json.dumps(job_data.get("processed_boms")),
},
commit=True,
)
process_if_level_is_complete(doc.name, job_data["current_boms"], job_data["processed_boms"])
except Exception:
handle_exception(doc)
finally:
frappe.db.auto_commit_on_many_writes = 0
frappe.db.commit() # nosemgrep
def get_ancestor_boms(new_bom: str, bom_list: Optional[List] = None) -> List:
bom_list = bom_list or []
bom_item = frappe.qb.DocType("BOM Item")
parents = (
frappe.qb.from_(bom_item)
.select(bom_item.parent)
.where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM"))
.run(as_dict=True)
)
for d in parents:
if new_bom == d.parent:
frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent))
bom_list.append(d.parent)
get_ancestor_boms(d.parent, bom_list)
return list(set(bom_list))
def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None:
bom_item = frappe.qb.DocType("BOM Item")
(
frappe.qb.update(bom_item)
.set(bom_item.bom_no, new_bom)
.set(bom_item.rate, unit_cost)
.set(bom_item.amount, (bom_item.stock_qty * unit_cost))
.where(
(bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")
)
).run()
def get_bom_unit_cost(new_bom: str) -> float:
bom = frappe.qb.DocType("BOM")
new_bom_unitcost = (
frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == new_bom).run()
)
return frappe.utils.flt(new_bom_unitcost[0][0])
def update_cost_in_boms(bom_list: List[str], docname: str) -> Dict:
"Updates cost in given BOMs. Returns current and total updated BOMs."
updated_boms = {} # current boms that have been updated
for bom in bom_list:
bom_doc = frappe.get_cached_doc("BOM", bom)
bom_doc.calculate_cost(save_updates=True, update_hour_rate=True)
# bom_doc.update_exploded_items(save=True) #TODO: edit exploded items rate
bom_doc.db_update()
updated_boms[bom] = True
# Update processed BOMs in Log
log_data = frappe.db.get_values(
"BOM Update Log", docname, ["current_boms", "processed_boms"], as_dict=True
)[0]
for field in ("current_boms", "processed_boms"):
log_data[field] = json.loads(log_data.get(field))
log_data[field].update(updated_boms)
return log_data
def process_if_level_is_complete(docname: str, current_boms: Dict, processed_boms: Dict) -> None:
"Prepare and set higher level BOMs in Log if current level is complete."
processing_complete = all(current_boms.get(bom) for bom in current_boms)
if not processing_complete:
return
parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms)
set_values_in_log(
docname,
values={
"current_boms": json.dumps({}),
"parent_boms": json.dumps(parent_boms),
"status": "Completed" if not parent_boms else "Paused",
},
commit=True,
)
def get_next_higher_level_boms(child_boms: Dict, processed_boms: Dict):
"Generate immediate higher level dependants with no unresolved dependencies."
def _all_children_are_processed(parent):
bom_doc = frappe.get_cached_doc("BOM", parent)
return all(processed_boms.get(row.bom_no) for row in bom_doc.items if row.bom_no)
dependants_map = _generate_dependants_map()
dependants = set()
for bom in child_boms:
parents = dependants_map.get(bom) or []
for parent in parents:
if _all_children_are_processed(parent):
dependants.add(parent)
return list(dependants)
def get_leaf_boms():
return frappe.db.sql_list(
"""select name from `tabBOM` bom
where docstatus=1 and is_active=1
and not exists(select bom_no from `tabBOM Item`
where parent=bom.name and ifnull(bom_no, '')!='')"""
)
def _generate_dependants_map():
bom = frappe.qb.DocType("BOM")
bom_item = frappe.qb.DocType("BOM Item")
bom_parents = (
frappe.qb.from_(bom_item)
.join(bom)
.on(bom_item.parent == bom.name)
.select(bom_item.bom_no, bom_item.parent)
.where(
(bom_item.bom_no.isnotnull())
& (bom_item.bom_no != "")
& (bom.docstatus == 1)
& (bom.is_active == 1)
& (bom_item.parenttype == "BOM")
)
).run(as_dict=True)
child_parent_map = defaultdict(list)
for bom in bom_parents:
child_parent_map[bom.bom_no].append(bom.parent)
return child_parent_map
def set_values_in_log(log_name: str, values: Dict, commit: bool = False) -> None:
"Update BOM Update Log record."
if not values:
return
bom_update_log = frappe.qb.DocType("BOM Update Log")
query = frappe.qb.update(bom_update_log).where(bom_update_log.name == log_name)
for key, value in values.items():
query = query.set(key, value)
query.run()
if commit:
frappe.db.commit()
def handle_exception(doc: "BOMUpdateLog"):
frappe.db.rollback()
error_log = doc.log_error("BOM Update Tool Error")
set_values_in_log(doc.name, {"status": "Failed", "error_log": error_log.name})

View File

@ -6,7 +6,7 @@ from frappe.tests.utils import FrappeTestCase
from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import (
BOMMissingError,
run_bom_job,
run_replace_bom_job,
)
from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import enqueue_replace_bom
@ -71,7 +71,7 @@ class TestBOMUpdateLog(FrappeTestCase):
# Explicitly commits log, new bom (setUp) and replacement impact.
# Is run via background jobs IRL
run_bom_job(
run_replace_bom_job(
doc=log,
boms=self.boms,
update_type="Replace BOM",
@ -88,7 +88,7 @@ class TestBOMUpdateLog(FrappeTestCase):
log2 = enqueue_replace_bom(
boms=self.boms,
)
run_bom_job( # Explicitly commits
run_replace_bom_job( # Explicitly commits
doc=log2,
boms=boms,
update_type="Replace BOM",

View File

@ -2,8 +2,7 @@
# For license information, please see license.txt
import json
from collections import defaultdict
from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Union
from typing import TYPE_CHECKING, Dict, Literal, Optional, Union
if TYPE_CHECKING:
from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import BOMUpdateLog
@ -39,17 +38,7 @@ def enqueue_update_cost() -> "BOMUpdateLog":
def auto_update_latest_price_in_all_boms() -> None:
"""Called via hooks.py."""
if frappe.db.get_single_value("Manufacturing Settings", "update_bom_costs_automatically"):
update_cost()
def update_cost() -> None:
"""Updates Cost for all BOMs from bottom to top."""
bom_list = get_boms_in_bottom_up_order()
for bom in bom_list:
bom_doc = frappe.get_cached_doc("BOM", bom)
bom_doc.calculate_cost(save_updates=True, update_hour_rate=True)
# bom_doc.update_exploded_items(save=True) #TODO: edit exploded items rate
bom_doc.db_update()
create_bom_update_log(update_type="Update Cost")
def create_bom_update_log(
@ -69,90 +58,3 @@ def create_bom_update_log(
"update_type": update_type,
}
).submit()
def get_boms_in_bottom_up_order(bom_no: Optional[str] = None) -> List:
"""
Eg: Main BOM
|- Sub BOM 1
|- Leaf BOM 1
|- Sub BOM 2
|- Leaf BOM 2
Result: [Leaf BOM 1, Leaf BOM 2, Sub BOM 1, Sub BOM 2, Main BOM]
"""
leaf_boms = []
if bom_no:
leaf_boms.append(bom_no)
else:
leaf_boms = _get_leaf_boms()
child_parent_map = _generate_child_parent_map()
bom_list = leaf_boms.copy()
for leaf_bom in leaf_boms:
parent_list = _get_flat_parent_map(leaf_bom, child_parent_map)
if not parent_list:
continue
bom_list.extend(parent_list)
bom_list = list(dict.fromkeys(bom_list).keys()) # remove duplicates
return bom_list
def _generate_child_parent_map():
bom = frappe.qb.DocType("BOM")
bom_item = frappe.qb.DocType("BOM Item")
bom_parents = (
frappe.qb.from_(bom_item)
.join(bom)
.on(bom_item.parent == bom.name)
.select(bom_item.bom_no, bom_item.parent)
.where(
(bom_item.bom_no.isnotnull())
& (bom_item.bom_no != "")
& (bom.docstatus == 1)
& (bom.is_active == 1)
& (bom_item.parenttype == "BOM")
)
).run(as_dict=True)
child_parent_map = defaultdict(list)
for bom in bom_parents:
child_parent_map[bom.bom_no].append(bom.parent)
return child_parent_map
def _get_flat_parent_map(leaf, child_parent_map):
"Get ancestors at all levels of a leaf BOM."
parents_list = []
def _get_parents(node, parents_list):
"Returns recursively updated ancestors list."
first_parents = child_parent_map.get(node) # immediate parents of node
if not first_parents: # top most node
return parents_list
parents_list.extend(first_parents)
parents_list = list(dict.fromkeys(parents_list).keys()) # remove duplicates
for nth_node in first_parents:
# recursively find parents
parents_list = _get_parents(nth_node, parents_list)
return parents_list
parents_list = _get_parents(leaf, parents_list)
return parents_list
def _get_leaf_boms():
return frappe.db.sql_list(
"""select name from `tabBOM` bom
where docstatus=1 and is_active=1
and not exists(select bom_no from `tabBOM Item`
where parent=bom.name and ifnull(bom_no, '')!='')"""
)