diff --git a/erpnext/hooks.py b/erpnext/hooks.py index 813ac17ca0..05f06b3bda 100644 --- a/erpnext/hooks.py +++ b/erpnext/hooks.py @@ -392,9 +392,12 @@ after_migrate = ["erpnext.setup.install.update_select_perm_after_install"] scheduler_events = { "cron": { + "0/5 * * * *": [ + "erpnext.manufacturing.doctype.bom_update_log.bom_update_log.resume_bom_cost_update_jobs", + ], "0/30 * * * *": [ "erpnext.utilities.doctype.video.video.update_youtube_data", - ] + ], }, "all": [ "erpnext.projects.doctype.project.project.project_status_update_reminder", diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json index 98c1acb71c..3455b86657 100644 --- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json +++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json @@ -13,6 +13,10 @@ "update_type", "status", "error_log", + "progress_section", + "current_boms", + "parent_boms", + "processed_boms", "amended_from" ], "fields": [ @@ -47,7 +51,7 @@ "fieldname": "status", "fieldtype": "Select", "label": "Status", - "options": "Queued\nIn Progress\nCompleted\nFailed" + "options": "Queued\nIn Progress\nPaused\nCompleted\nFailed" }, { "fieldname": "amended_from", @@ -63,13 +67,34 @@ "fieldtype": "Link", "label": "Error Log", "options": "Error Log" + }, + { + "fieldname": "progress_section", + "fieldtype": "Section Break", + "label": "Progress" + }, + { + "fieldname": "current_boms", + "fieldtype": "Text", + "label": "Current BOMs" + }, + { + "description": "Immediate parent BOMs", + "fieldname": "parent_boms", + "fieldtype": "Text", + "label": "Parent BOMs" + }, + { + "fieldname": "processed_boms", + "fieldtype": "Text", + "label": "Processed BOMs" } ], "in_create": 1, "index_web_pages_for_search": 1, "is_submittable": 1, "links": [], - "modified": "2022-03-31 12:51:44.885102", + "modified": "2022-05-23 14:42:14.725914", "modified_by": "Administrator", "module": "Manufacturing", "name": "BOM Update Log", diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py index c0770fac90..639628ac38 100644 --- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py +++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py @@ -1,13 +1,19 @@ # Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors # For license information, please see license.txt -from typing import Dict, List, Literal, Optional +import json +from typing import Dict, Optional import frappe from frappe import _ from frappe.model.document import Document -from frappe.utils import cstr, flt +from frappe.utils import cstr -from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost +from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import ( + get_leaf_boms, + handle_exception, + replace_bom, + set_values_in_log, +) class BOMMissingError(frappe.ValidationError): @@ -49,116 +55,93 @@ class BOMUpdateLog(Document): if self.update_type == "Replace BOM": boms = {"current_bom": self.current_bom, "new_bom": self.new_bom} frappe.enqueue( - method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job", + method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_replace_bom_job", doc=self, boms=boms, timeout=40000, ) else: - frappe.enqueue( - method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job", - doc=self, - update_type="Update Cost", - timeout=40000, - ) + process_boms_cost_level_wise(self) -def replace_bom(boms: Dict) -> None: - """Replace current BOM with new BOM in parent BOMs.""" - current_bom = boms.get("current_bom") - new_bom = boms.get("new_bom") - - unit_cost = get_new_bom_unit_cost(new_bom) - update_new_bom_in_bom_items(unit_cost, current_bom, new_bom) - - frappe.cache().delete_key("bom_children") - parent_boms = get_parent_boms(new_bom) - - for bom in parent_boms: - bom_obj = frappe.get_doc("BOM", bom) - # this is only used for versioning and we do not want - # to make separate db calls by using load_doc_before_save - # which proves to be expensive while doing bulk replace - bom_obj._doc_before_save = bom_obj - bom_obj.update_exploded_items() - bom_obj.calculate_cost() - bom_obj.update_parent_cost() - bom_obj.db_update() - if bom_obj.meta.get("track_changes") and not bom_obj.flags.ignore_version: - bom_obj.save_version() - - -def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None: - bom_item = frappe.qb.DocType("BOM Item") - ( - frappe.qb.update(bom_item) - .set(bom_item.bom_no, new_bom) - .set(bom_item.rate, unit_cost) - .set(bom_item.amount, (bom_item.stock_qty * unit_cost)) - .where( - (bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM") - ) - ).run() - - -def get_parent_boms(new_bom: str, bom_list: Optional[List] = None) -> List: - bom_list = bom_list or [] - bom_item = frappe.qb.DocType("BOM Item") - - parents = ( - frappe.qb.from_(bom_item) - .select(bom_item.parent) - .where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")) - .run(as_dict=True) - ) - - for d in parents: - if new_bom == d.parent: - frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent)) - - bom_list.append(d.parent) - get_parent_boms(d.parent, bom_list) - - return list(set(bom_list)) - - -def get_new_bom_unit_cost(new_bom: str) -> float: - bom = frappe.qb.DocType("BOM") - new_bom_unitcost = ( - frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == new_bom).run() - ) - - return flt(new_bom_unitcost[0][0]) - - -def run_bom_job( +def run_replace_bom_job( doc: "BOMUpdateLog", boms: Optional[Dict[str, str]] = None, - update_type: Literal["Replace BOM", "Update Cost"] = "Replace BOM", ) -> None: try: doc.db_set("status", "In Progress") + if not frappe.flags.in_test: frappe.db.commit() frappe.db.auto_commit_on_many_writes = 1 - boms = frappe._dict(boms or {}) - - if update_type == "Replace BOM": - replace_bom(boms) - else: - update_cost() + replace_bom(boms) doc.db_set("status", "Completed") - except Exception: - frappe.db.rollback() - error_log = doc.log_error("BOM Update Tool Error") - - doc.db_set("status", "Failed") - doc.db_set("error_log", error_log.name) - + handle_exception(doc) finally: frappe.db.auto_commit_on_many_writes = 0 frappe.db.commit() # nosemgrep + + +def process_boms_cost_level_wise(update_doc: "BOMUpdateLog") -> None: + "Queue jobs at the start of new BOM Level in 'Update Cost' Jobs." + + current_boms, parent_boms = {}, [] + values = {} + + if update_doc.status == "Queued": + # First level yet to process. On Submit. + current_boms = {bom: False for bom in get_leaf_boms()} + values = { + "current_boms": json.dumps(current_boms), + "parent_boms": "[]", + "processed_boms": json.dumps({}), + "status": "In Progress", + } + else: + # status is Paused, resume. via Cron Job. + current_boms, parent_boms = json.loads(update_doc.current_boms), json.loads( + update_doc.parent_boms + ) + if not current_boms: + # Process the next level BOMs. Stage parents as current BOMs. + current_boms = {bom: False for bom in parent_boms} + values = { + "current_boms": json.dumps(current_boms), + "parent_boms": "[]", + "status": "In Progress", + } + + set_values_in_log(update_doc.name, values, commit=True) + queue_bom_cost_jobs(current_boms, update_doc) + + +def queue_bom_cost_jobs(current_boms: Dict, update_doc: "BOMUpdateLog") -> None: + "Queue batches of 20k BOMs of the same level to process parallelly" + current_boms_list = [bom for bom in current_boms] + + while current_boms_list: + boms_to_process = current_boms_list[:20000] # slice out batch of 20k BOMs + + # update list to exclude 20K (queued) BOMs + current_boms_list = current_boms_list[20000:] if len(current_boms_list) > 20000 else [] + frappe.enqueue( + method="erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils.update_cost_in_level", + doc=update_doc, + bom_list=boms_to_process, + timeout=40000, + ) + + +def resume_bom_cost_update_jobs(): + "Called every 10 minutes via Cron job." + paused_jobs = frappe.db.get_all("BOM Update Log", {"status": "Paused"}) + if not paused_jobs: + return + + for job in paused_jobs: + # resume from next level + process_boms_cost_level_wise(update_doc=frappe.get_doc("BOM Update Log", job.name)) diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py new file mode 100644 index 0000000000..b5964cec9d --- /dev/null +++ b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py @@ -0,0 +1,223 @@ +# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors +# For license information, please see license.txt + +import json +from collections import defaultdict +from typing import TYPE_CHECKING, Dict, List, Optional + +if TYPE_CHECKING: + from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import BOMUpdateLog + +import frappe +from frappe import _ + + +def replace_bom(boms: Dict) -> None: + """Replace current BOM with new BOM in parent BOMs.""" + current_bom = boms.get("current_bom") + new_bom = boms.get("new_bom") + + unit_cost = get_bom_unit_cost(new_bom) + update_new_bom_in_bom_items(unit_cost, current_bom, new_bom) + + frappe.cache().delete_key("bom_children") + parent_boms = get_ancestor_boms(new_bom) + + for bom in parent_boms: + bom_obj = frappe.get_doc("BOM", bom) + # this is only used for versioning and we do not want + # to make separate db calls by using load_doc_before_save + # which proves to be expensive while doing bulk replace + bom_obj._doc_before_save = bom_obj + bom_obj.update_exploded_items() + bom_obj.calculate_cost() + bom_obj.update_parent_cost() + bom_obj.db_update() + if bom_obj.meta.get("track_changes") and not bom_obj.flags.ignore_version: + bom_obj.save_version() + + +def update_cost_in_level(doc: "BOMUpdateLog", bom_list: List[str]) -> None: + "Updates Cost for BOMs within a given level. Runs via background jobs." + try: + status = frappe.db.get_value("BOM Update Log", doc.name, "status") + if status == "Failed": + return + + frappe.db.auto_commit_on_many_writes = 1 + # main updation logic + job_data = update_cost_in_boms(bom_list=bom_list, docname=doc.name) + + set_values_in_log( + doc.name, + values={ + "current_boms": json.dumps(job_data.get("current_boms")), + "processed_boms": json.dumps(job_data.get("processed_boms")), + }, + commit=True, + ) + + process_if_level_is_complete(doc.name, job_data["current_boms"], job_data["processed_boms"]) + except Exception: + handle_exception(doc) + finally: + frappe.db.auto_commit_on_many_writes = 0 + frappe.db.commit() # nosemgrep + + +def get_ancestor_boms(new_bom: str, bom_list: Optional[List] = None) -> List: + bom_list = bom_list or [] + bom_item = frappe.qb.DocType("BOM Item") + + parents = ( + frappe.qb.from_(bom_item) + .select(bom_item.parent) + .where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")) + .run(as_dict=True) + ) + + for d in parents: + if new_bom == d.parent: + frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent)) + + bom_list.append(d.parent) + get_ancestor_boms(d.parent, bom_list) + + return list(set(bom_list)) + + +def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None: + bom_item = frappe.qb.DocType("BOM Item") + ( + frappe.qb.update(bom_item) + .set(bom_item.bom_no, new_bom) + .set(bom_item.rate, unit_cost) + .set(bom_item.amount, (bom_item.stock_qty * unit_cost)) + .where( + (bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM") + ) + ).run() + + +def get_bom_unit_cost(new_bom: str) -> float: + bom = frappe.qb.DocType("BOM") + new_bom_unitcost = ( + frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == new_bom).run() + ) + + return frappe.utils.flt(new_bom_unitcost[0][0]) + + +def update_cost_in_boms(bom_list: List[str], docname: str) -> Dict: + "Updates cost in given BOMs. Returns current and total updated BOMs." + updated_boms = {} # current boms that have been updated + + for bom in bom_list: + bom_doc = frappe.get_cached_doc("BOM", bom) + bom_doc.calculate_cost(save_updates=True, update_hour_rate=True) + # bom_doc.update_exploded_items(save=True) #TODO: edit exploded items rate + bom_doc.db_update() + updated_boms[bom] = True + + # Update processed BOMs in Log + log_data = frappe.db.get_values( + "BOM Update Log", docname, ["current_boms", "processed_boms"], as_dict=True + )[0] + + for field in ("current_boms", "processed_boms"): + log_data[field] = json.loads(log_data.get(field)) + log_data[field].update(updated_boms) + + return log_data + + +def process_if_level_is_complete(docname: str, current_boms: Dict, processed_boms: Dict) -> None: + "Prepare and set higher level BOMs in Log if current level is complete." + processing_complete = all(current_boms.get(bom) for bom in current_boms) + if not processing_complete: + return + + parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms) + set_values_in_log( + docname, + values={ + "current_boms": json.dumps({}), + "parent_boms": json.dumps(parent_boms), + "status": "Completed" if not parent_boms else "Paused", + }, + commit=True, + ) + + +def get_next_higher_level_boms(child_boms: Dict, processed_boms: Dict): + "Generate immediate higher level dependants with no unresolved dependencies." + + def _all_children_are_processed(parent): + bom_doc = frappe.get_cached_doc("BOM", parent) + return all(processed_boms.get(row.bom_no) for row in bom_doc.items if row.bom_no) + + dependants_map = _generate_dependants_map() + dependants = set() + for bom in child_boms: + parents = dependants_map.get(bom) or [] + for parent in parents: + if _all_children_are_processed(parent): + dependants.add(parent) + + return list(dependants) + + +def get_leaf_boms(): + return frappe.db.sql_list( + """select name from `tabBOM` bom + where docstatus=1 and is_active=1 + and not exists(select bom_no from `tabBOM Item` + where parent=bom.name and ifnull(bom_no, '')!='')""" + ) + + +def _generate_dependants_map(): + bom = frappe.qb.DocType("BOM") + bom_item = frappe.qb.DocType("BOM Item") + + bom_parents = ( + frappe.qb.from_(bom_item) + .join(bom) + .on(bom_item.parent == bom.name) + .select(bom_item.bom_no, bom_item.parent) + .where( + (bom_item.bom_no.isnotnull()) + & (bom_item.bom_no != "") + & (bom.docstatus == 1) + & (bom.is_active == 1) + & (bom_item.parenttype == "BOM") + ) + ).run(as_dict=True) + + child_parent_map = defaultdict(list) + for bom in bom_parents: + child_parent_map[bom.bom_no].append(bom.parent) + + return child_parent_map + + +def set_values_in_log(log_name: str, values: Dict, commit: bool = False) -> None: + "Update BOM Update Log record." + if not values: + return + + bom_update_log = frappe.qb.DocType("BOM Update Log") + query = frappe.qb.update(bom_update_log).where(bom_update_log.name == log_name) + + for key, value in values.items(): + query = query.set(key, value) + query.run() + + if commit: + frappe.db.commit() + + +def handle_exception(doc: "BOMUpdateLog"): + frappe.db.rollback() + error_log = doc.log_error("BOM Update Tool Error") + set_values_in_log(doc.name, {"status": "Failed", "error_log": error_log.name}) diff --git a/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py b/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py index 47efea961b..4f151334a2 100644 --- a/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py +++ b/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py @@ -6,7 +6,7 @@ from frappe.tests.utils import FrappeTestCase from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import ( BOMMissingError, - run_bom_job, + run_replace_bom_job, ) from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import enqueue_replace_bom @@ -71,7 +71,7 @@ class TestBOMUpdateLog(FrappeTestCase): # Explicitly commits log, new bom (setUp) and replacement impact. # Is run via background jobs IRL - run_bom_job( + run_replace_bom_job( doc=log, boms=self.boms, update_type="Replace BOM", @@ -88,7 +88,7 @@ class TestBOMUpdateLog(FrappeTestCase): log2 = enqueue_replace_bom( boms=self.boms, ) - run_bom_job( # Explicitly commits + run_replace_bom_job( # Explicitly commits doc=log2, boms=boms, update_type="Replace BOM", diff --git a/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py b/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py index e765725340..4a2e03fb18 100644 --- a/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py +++ b/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py @@ -2,8 +2,7 @@ # For license information, please see license.txt import json -from collections import defaultdict -from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Union +from typing import TYPE_CHECKING, Dict, Literal, Optional, Union if TYPE_CHECKING: from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import BOMUpdateLog @@ -39,17 +38,7 @@ def enqueue_update_cost() -> "BOMUpdateLog": def auto_update_latest_price_in_all_boms() -> None: """Called via hooks.py.""" if frappe.db.get_single_value("Manufacturing Settings", "update_bom_costs_automatically"): - update_cost() - - -def update_cost() -> None: - """Updates Cost for all BOMs from bottom to top.""" - bom_list = get_boms_in_bottom_up_order() - for bom in bom_list: - bom_doc = frappe.get_cached_doc("BOM", bom) - bom_doc.calculate_cost(save_updates=True, update_hour_rate=True) - # bom_doc.update_exploded_items(save=True) #TODO: edit exploded items rate - bom_doc.db_update() + create_bom_update_log(update_type="Update Cost") def create_bom_update_log( @@ -69,90 +58,3 @@ def create_bom_update_log( "update_type": update_type, } ).submit() - - -def get_boms_in_bottom_up_order(bom_no: Optional[str] = None) -> List: - """ - Eg: Main BOM - |- Sub BOM 1 - |- Leaf BOM 1 - |- Sub BOM 2 - |- Leaf BOM 2 - Result: [Leaf BOM 1, Leaf BOM 2, Sub BOM 1, Sub BOM 2, Main BOM] - """ - leaf_boms = [] - if bom_no: - leaf_boms.append(bom_no) - else: - leaf_boms = _get_leaf_boms() - - child_parent_map = _generate_child_parent_map() - bom_list = leaf_boms.copy() - - for leaf_bom in leaf_boms: - parent_list = _get_flat_parent_map(leaf_bom, child_parent_map) - - if not parent_list: - continue - - bom_list.extend(parent_list) - bom_list = list(dict.fromkeys(bom_list).keys()) # remove duplicates - - return bom_list - - -def _generate_child_parent_map(): - bom = frappe.qb.DocType("BOM") - bom_item = frappe.qb.DocType("BOM Item") - - bom_parents = ( - frappe.qb.from_(bom_item) - .join(bom) - .on(bom_item.parent == bom.name) - .select(bom_item.bom_no, bom_item.parent) - .where( - (bom_item.bom_no.isnotnull()) - & (bom_item.bom_no != "") - & (bom.docstatus == 1) - & (bom.is_active == 1) - & (bom_item.parenttype == "BOM") - ) - ).run(as_dict=True) - - child_parent_map = defaultdict(list) - for bom in bom_parents: - child_parent_map[bom.bom_no].append(bom.parent) - - return child_parent_map - - -def _get_flat_parent_map(leaf, child_parent_map): - "Get ancestors at all levels of a leaf BOM." - parents_list = [] - - def _get_parents(node, parents_list): - "Returns recursively updated ancestors list." - first_parents = child_parent_map.get(node) # immediate parents of node - if not first_parents: # top most node - return parents_list - - parents_list.extend(first_parents) - parents_list = list(dict.fromkeys(parents_list).keys()) # remove duplicates - - for nth_node in first_parents: - # recursively find parents - parents_list = _get_parents(nth_node, parents_list) - - return parents_list - - parents_list = _get_parents(leaf, parents_list) - return parents_list - - -def _get_leaf_boms(): - return frappe.db.sql_list( - """select name from `tabBOM` bom - where docstatus=1 and is_active=1 - and not exists(select bom_no from `tabBOM Item` - where parent=bom.name and ifnull(bom_no, '')!='')""" - )