chore: Miscellanous fixes/enhancements

- `get_valuation_rate`: if no bins are found return 0, SLEs do not exist either
- `get_valuation_rate`: Compute average valuation rate via query
- `get_rm_rate_map`: set order_by as None to avoid creating sort index (modified) each time query runs (seen in process list)
- BOM Update Batch: add status field and hide `boms_updated` so that  users can see progress without loading all updated boms (too much data)
- BOM Update Batch: set batch row status to completed after job runs
- BOM Update Log: remove `parent_boms` field (just pass parent boms to processing function) & remove Paused state (not used)
- Move job to long queue to avoid choking default queue
- `update_cost_in_boms`: use `get_doc` as each BOM is accessed only once. Use `for_update` to lock BOM row
- Commit after every 100 BOMs
This commit is contained in:
marination 2022-06-06 17:01:51 +05:30
parent 62857e3e08
commit 934db57fdd
6 changed files with 73 additions and 43 deletions

View File

@ -714,8 +714,11 @@ class BOM(WebsiteGenerator):
for item in self.get("items"): for item in self.get("items"):
if item.bom_no: if item.bom_no:
# Get Item-Rate from Subassembly BOM # Get Item-Rate from Subassembly BOM
explosion_items = frappe.db.get_all( explosion_items = frappe.get_all(
"BOM Explosion Item", filters={"parent": item.bom_no}, fields=["item_code", "rate"] "BOM Explosion Item",
filters={"parent": item.bom_no},
fields=["item_code", "rate"],
order_by=None, # to avoid sort index creation at db level (granular change)
) )
explosion_item_rate = {item.item_code: flt(item.rate) for item in explosion_items} explosion_item_rate = {item.item_code: flt(item.rate) for item in explosion_items}
rm_rate_map.update(explosion_item_rate) rm_rate_map.update(explosion_item_rate)
@ -935,13 +938,17 @@ def get_bom_item_rate(args, bom_doc):
def get_valuation_rate(args): def get_valuation_rate(args):
"""Get weighted average of valuation rate from all warehouses""" """
1) Get average valuation rate from all warehouses
2) If no value, get last valuation rate from SLE
3) If no value, get valuation rate from Item
"""
total_qty, total_value, valuation_rate = 0.0, 0.0, 0.0 valuation_rate = 0.0
item_bins = frappe.db.sql( item_valuation = frappe.db.sql(
""" """
select select
bin.actual_qty, bin.stock_value (sum(bin.stock_value) / sum(bin.actual_qty)) as valuation_rate
from from
`tabBin` bin, `tabWarehouse` warehouse `tabBin` bin, `tabWarehouse` warehouse
where where
@ -950,14 +957,13 @@ def get_valuation_rate(args):
and warehouse.company=%(company)s""", and warehouse.company=%(company)s""",
{"item": args["item_code"], "company": args["company"]}, {"item": args["item_code"], "company": args["company"]},
as_dict=1, as_dict=1,
) )[0]
for d in item_bins: valuation_rate = item_valuation.get("valuation_rate")
total_qty += flt(d.actual_qty)
total_value += flt(d.stock_value)
if total_qty: if valuation_rate is None:
valuation_rate = total_value / total_qty # Explicit null value check. If null, Bins don't exist, neither does SLE
return valuation_rate
if valuation_rate <= 0: if valuation_rate <= 0:
last_valuation_rate = frappe.db.sql( last_valuation_rate = frappe.db.sql(

View File

@ -7,7 +7,8 @@
"field_order": [ "field_order": [
"level", "level",
"batch_no", "batch_no",
"boms_updated" "boms_updated",
"status"
], ],
"fields": [ "fields": [
{ {
@ -25,14 +26,23 @@
{ {
"fieldname": "boms_updated", "fieldname": "boms_updated",
"fieldtype": "Long Text", "fieldtype": "Long Text",
"hidden": 1,
"in_list_view": 1, "in_list_view": 1,
"label": "BOMs Updated" "label": "BOMs Updated"
},
{
"fieldname": "status",
"fieldtype": "Select",
"in_list_view": 1,
"label": "Status",
"options": "Pending\nCompleted",
"read_only": 1
} }
], ],
"index_web_pages_for_search": 1, "index_web_pages_for_search": 1,
"istable": 1, "istable": 1,
"links": [], "links": [],
"modified": "2022-05-31 23:36:13.628391", "modified": "2022-06-06 14:50:35.161062",
"modified_by": "Administrator", "modified_by": "Administrator",
"module": "Manufacturing", "module": "Manufacturing",
"name": "BOM Update Batch", "name": "BOM Update Batch",

View File

@ -15,7 +15,6 @@
"error_log", "error_log",
"progress_section", "progress_section",
"current_level", "current_level",
"parent_boms",
"processed_boms", "processed_boms",
"bom_batches", "bom_batches",
"amended_from" "amended_from"
@ -52,7 +51,7 @@
"fieldname": "status", "fieldname": "status",
"fieldtype": "Select", "fieldtype": "Select",
"label": "Status", "label": "Status",
"options": "Queued\nIn Progress\nPaused\nCompleted\nFailed" "options": "Queued\nIn Progress\nCompleted\nFailed"
}, },
{ {
"fieldname": "amended_from", "fieldname": "amended_from",
@ -76,15 +75,10 @@
"fieldtype": "Section Break", "fieldtype": "Section Break",
"label": "Progress" "label": "Progress"
}, },
{
"description": "Immediate parent BOMs",
"fieldname": "parent_boms",
"fieldtype": "Long Text",
"label": "Parent BOMs"
},
{ {
"fieldname": "processed_boms", "fieldname": "processed_boms",
"fieldtype": "Long Text", "fieldtype": "Long Text",
"hidden": 1,
"label": "Processed BOMs" "label": "Processed BOMs"
}, },
{ {
@ -102,7 +96,7 @@
"index_web_pages_for_search": 1, "index_web_pages_for_search": 1,
"is_submittable": 1, "is_submittable": 1,
"links": [], "links": [],
"modified": "2022-05-31 20:20:06.370786", "modified": "2022-06-06 15:15:23.883251",
"modified_by": "Administrator", "modified_by": "Administrator",
"module": "Manufacturing", "module": "Manufacturing",
"name": "BOM Update Log", "name": "BOM Update Log",

View File

@ -56,7 +56,7 @@ class BOMUpdateLog(Document):
wip_log = frappe.get_all( wip_log = frappe.get_all(
"BOM Update Log", "BOM Update Log",
{"update_type": "Update Cost", "status": ["in", ["Queued", "In Progress", "Paused"]]}, {"update_type": "Update Cost", "status": ["in", ["Queued", "In Progress"]]},
limit_page_length=1, limit_page_length=1,
) )
if wip_log: if wip_log:
@ -104,10 +104,12 @@ def run_replace_bom_job(
frappe.db.commit() # nosemgrep frappe.db.commit() # nosemgrep
def process_boms_cost_level_wise(update_doc: "BOMUpdateLog") -> None: def process_boms_cost_level_wise(
update_doc: "BOMUpdateLog", parent_boms: List[str] = None
) -> None:
"Queue jobs at the start of new BOM Level in 'Update Cost' Jobs." "Queue jobs at the start of new BOM Level in 'Update Cost' Jobs."
current_boms, parent_boms = {}, [] current_boms = {}
values = {} values = {}
if update_doc.status == "Queued": if update_doc.status == "Queued":
@ -115,26 +117,27 @@ def process_boms_cost_level_wise(update_doc: "BOMUpdateLog") -> None:
current_level = 0 current_level = 0
current_boms = get_leaf_boms() current_boms = get_leaf_boms()
values = { values = {
"parent_boms": "[]",
"processed_boms": json.dumps({}), "processed_boms": json.dumps({}),
"status": "In Progress", "status": "In Progress",
"current_level": current_level, "current_level": current_level,
} }
else: else:
# Resume next level. via Cron Job. # Resume next level. via Cron Job.
if not parent_boms:
return
current_level = cint(update_doc.current_level) + 1 current_level = cint(update_doc.current_level) + 1
parent_boms = json.loads(update_doc.parent_boms)
# Process the next level BOMs. Stage parents as current BOMs. # Process the next level BOMs. Stage parents as current BOMs.
current_boms = parent_boms.copy() current_boms = parent_boms.copy()
values = {"parent_boms": "[]", "current_level": current_level} values = {"current_level": current_level}
set_values_in_log(update_doc.name, values, commit=True) set_values_in_log(update_doc.name, values, commit=True)
queue_bom_cost_jobs(current_boms, update_doc, current_level) queue_bom_cost_jobs(current_boms, update_doc, current_level)
def queue_bom_cost_jobs( def queue_bom_cost_jobs(
current_boms_list: List, update_doc: "BOMUpdateLog", current_level: int current_boms_list: List[str], update_doc: "BOMUpdateLog", current_level: int
) -> None: ) -> None:
"Queue batches of 20k BOMs of the same level to process parallelly" "Queue batches of 20k BOMs of the same level to process parallelly"
batch_no = 0 batch_no = 0
@ -147,7 +150,9 @@ def queue_bom_cost_jobs(
# update list to exclude 20K (queued) BOMs # update list to exclude 20K (queued) BOMs
current_boms_list = current_boms_list[batch_size:] if len(current_boms_list) > batch_size else [] current_boms_list = current_boms_list[batch_size:] if len(current_boms_list) > batch_size else []
batch_row = update_doc.append("bom_batches", {"level": current_level, "batch_no": batch_no}) batch_row = update_doc.append(
"bom_batches", {"level": current_level, "batch_no": batch_no, "status": "Pending"}
)
batch_row.db_insert() batch_row.db_insert()
frappe.enqueue( frappe.enqueue(
@ -155,7 +160,7 @@ def queue_bom_cost_jobs(
doc=update_doc, doc=update_doc,
bom_list=boms_to_process, bom_list=boms_to_process,
batch_name=batch_row.name, batch_name=batch_row.name,
timeout=40000, queue="long",
) )
@ -181,9 +186,11 @@ def resume_bom_cost_update_jobs():
for log in in_progress_logs: for log in in_progress_logs:
# check if all log batches of current level are processed # check if all log batches of current level are processed
bom_batches = frappe.db.get_all( bom_batches = frappe.db.get_all(
"BOM Update Batch", {"parent": log.name, "level": log.current_level}, ["name", "boms_updated"] "BOM Update Batch",
{"parent": log.name, "level": log.current_level},
["name", "boms_updated", "status"],
) )
incomplete_level = any(not row.get("boms_updated") for row in bom_batches) incomplete_level = any(row.get("status") == "Pending" for row in bom_batches)
if not bom_batches or incomplete_level: if not bom_batches or incomplete_level:
continue continue
@ -195,14 +202,15 @@ def resume_bom_cost_update_jobs():
log.name, log.name,
values={ values={
"processed_boms": json.dumps(processed_boms), "processed_boms": json.dumps(processed_boms),
"parent_boms": json.dumps(parent_boms),
"status": "Completed" if not parent_boms else "In Progress", "status": "Completed" if not parent_boms else "In Progress",
}, },
commit=True, commit=True,
) )
if parent_boms: # there is a next level to process if parent_boms: # there is a next level to process
process_boms_cost_level_wise(update_doc=frappe.get_doc("BOM Update Log", log.name)) process_boms_cost_level_wise(
update_doc=frappe.get_doc("BOM Update Log", log.name), parent_boms=parent_boms
)
def get_processed_current_boms( def get_processed_current_boms(

View File

@ -3,7 +3,7 @@
import json import json
from collections import defaultdict from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List, Optional from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
if TYPE_CHECKING: if TYPE_CHECKING:
from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import BOMUpdateLog from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import BOMUpdateLog
@ -38,7 +38,9 @@ def replace_bom(boms: Dict) -> None:
bom_obj.save_version() bom_obj.save_version()
def update_cost_in_level(doc: "BOMUpdateLog", bom_list: List[str], batch_name: int) -> None: def update_cost_in_level(
doc: "BOMUpdateLog", bom_list: List[str], batch_name: Union[int, str]
) -> None:
"Updates Cost for BOMs within a given level. Runs via background jobs." "Updates Cost for BOMs within a given level. Runs via background jobs."
try: try:
@ -49,7 +51,14 @@ def update_cost_in_level(doc: "BOMUpdateLog", bom_list: List[str], batch_name: i
frappe.db.auto_commit_on_many_writes = 1 frappe.db.auto_commit_on_many_writes = 1
update_cost_in_boms(bom_list=bom_list) # main updation logic update_cost_in_boms(bom_list=bom_list) # main updation logic
frappe.db.set_value("BOM Update Batch", batch_name, "boms_updated", json.dumps(bom_list))
bom_batch = frappe.qb.DocType("BOM Update Batch")
(
frappe.qb.update(bom_batch)
.set(bom_batch.boms_updated, json.dumps(bom_list))
.set(bom_batch.status, "Completed")
.where(bom_batch.name == batch_name)
).run()
except Exception: except Exception:
handle_exception(doc) handle_exception(doc)
finally: finally:
@ -105,14 +114,17 @@ def get_bom_unit_cost(bom_name: str) -> float:
def update_cost_in_boms(bom_list: List[str]) -> None: def update_cost_in_boms(bom_list: List[str]) -> None:
"Updates cost in given BOMs. Returns current and total updated BOMs." "Updates cost in given BOMs. Returns current and total updated BOMs."
for bom in bom_list: for index, bom in enumerate(bom_list):
bom_doc = frappe.get_cached_doc("BOM", bom) bom_doc = frappe.get_doc("BOM", bom, for_update=True)
bom_doc.calculate_cost(save_updates=True, update_hour_rate=True) bom_doc.calculate_cost(save_updates=True, update_hour_rate=True)
bom_doc.db_update() bom_doc.db_update()
if index % 100 == 0:
frappe.db.commit()
def get_next_higher_level_boms( def get_next_higher_level_boms(
child_boms: Dict[str, bool], processed_boms: Dict[str, bool] child_boms: List[str], processed_boms: Dict[str, bool]
) -> List[str]: ) -> List[str]:
"Generate immediate higher level dependants with no unresolved dependencies (children)." "Generate immediate higher level dependants with no unresolved dependencies (children)."

View File

@ -40,7 +40,7 @@ def auto_update_latest_price_in_all_boms() -> None:
if frappe.db.get_single_value("Manufacturing Settings", "update_bom_costs_automatically"): if frappe.db.get_single_value("Manufacturing Settings", "update_bom_costs_automatically"):
wip_log = frappe.get_all( wip_log = frappe.get_all(
"BOM Update Log", "BOM Update Log",
{"update_type": "Update Cost", "status": ["in", ["Queued", "In Progress", "Paused"]]}, {"update_type": "Update Cost", "status": ["in", ["Queued", "In Progress"]]},
limit_page_length=1, limit_page_length=1,
) )
if not wip_log: if not wip_log: