Merge pull request #31072 from marination/perf-bom-update-tool

perf: BOM Update Tool
This commit is contained in:
Ankush Menat 2022-06-09 17:04:00 +05:30 committed by GitHub
commit 16c8b7404d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 663 additions and 285 deletions

View File

@ -392,9 +392,12 @@ after_migrate = ["erpnext.setup.install.update_select_perm_after_install"]
scheduler_events = { scheduler_events = {
"cron": { "cron": {
"0/5 * * * *": [
"erpnext.manufacturing.doctype.bom_update_log.bom_update_log.resume_bom_cost_update_jobs",
],
"0/30 * * * *": [ "0/30 * * * *": [
"erpnext.utilities.doctype.video.video.update_youtube_data", "erpnext.utilities.doctype.video.video.update_youtube_data",
] ],
}, },
"all": [ "all": [
"erpnext.projects.doctype.project.project.project_status_update_reminder", "erpnext.projects.doctype.project.project.project_status_update_reminder",

View File

@ -1,11 +1,11 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors # Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt # License: GNU General Public License v3. See license.txt
import functools import functools
import re import re
from collections import deque from collections import deque
from operator import itemgetter from operator import itemgetter
from typing import List from typing import Dict, List
import frappe import frappe
from frappe import _ from frappe import _
@ -189,6 +189,7 @@ class BOM(WebsiteGenerator):
self.validate_transfer_against() self.validate_transfer_against()
self.set_routing_operations() self.set_routing_operations()
self.validate_operations() self.validate_operations()
self.update_exploded_items(save=False)
self.calculate_cost() self.calculate_cost()
self.update_stock_qty() self.update_stock_qty()
self.update_cost(update_parent=False, from_child_bom=True, update_hour_rate=False, save=False) self.update_cost(update_parent=False, from_child_bom=True, update_hour_rate=False, save=False)
@ -386,40 +387,14 @@ class BOM(WebsiteGenerator):
existing_bom_cost = self.total_cost existing_bom_cost = self.total_cost
for d in self.get("items"):
if not d.item_code:
continue
rate = self.get_rm_rate(
{
"company": self.company,
"item_code": d.item_code,
"bom_no": d.bom_no,
"qty": d.qty,
"uom": d.uom,
"stock_uom": d.stock_uom,
"conversion_factor": d.conversion_factor,
"sourced_by_supplier": d.sourced_by_supplier,
}
)
if rate:
d.rate = rate
d.amount = flt(d.rate) * flt(d.qty)
d.base_rate = flt(d.rate) * flt(self.conversion_rate)
d.base_amount = flt(d.amount) * flt(self.conversion_rate)
if save:
d.db_update()
if self.docstatus == 1: if self.docstatus == 1:
self.flags.ignore_validate_update_after_submit = True self.flags.ignore_validate_update_after_submit = True
self.calculate_cost(update_hour_rate)
self.calculate_cost(save_updates=save, update_hour_rate=update_hour_rate)
if save: if save:
self.db_update() self.db_update()
self.update_exploded_items(save=save)
# update parent BOMs # update parent BOMs
if self.total_cost != existing_bom_cost and update_parent: if self.total_cost != existing_bom_cost and update_parent:
parent_boms = frappe.db.sql_list( parent_boms = frappe.db.sql_list(
@ -608,11 +583,15 @@ class BOM(WebsiteGenerator):
bom_list.reverse() bom_list.reverse()
return bom_list return bom_list
def calculate_cost(self, update_hour_rate=False): def calculate_cost(self, save_updates=False, update_hour_rate=False):
"""Calculate bom totals""" """Calculate bom totals"""
self.calculate_op_cost(update_hour_rate) self.calculate_op_cost(update_hour_rate)
self.calculate_rm_cost() self.calculate_rm_cost(save=save_updates)
self.calculate_sm_cost() self.calculate_sm_cost(save=save_updates)
if save_updates:
# not via doc event, table is not regenerated and needs updation
self.calculate_exploded_cost()
self.total_cost = self.operating_cost + self.raw_material_cost - self.scrap_material_cost self.total_cost = self.operating_cost + self.raw_material_cost - self.scrap_material_cost
self.base_total_cost = ( self.base_total_cost = (
self.base_operating_cost + self.base_raw_material_cost - self.base_scrap_material_cost self.base_operating_cost + self.base_raw_material_cost - self.base_scrap_material_cost
@ -654,12 +633,26 @@ class BOM(WebsiteGenerator):
if update_hour_rate: if update_hour_rate:
row.db_update() row.db_update()
def calculate_rm_cost(self): def calculate_rm_cost(self, save=False):
"""Fetch RM rate as per today's valuation rate and calculate totals""" """Fetch RM rate as per today's valuation rate and calculate totals"""
total_rm_cost = 0 total_rm_cost = 0
base_total_rm_cost = 0 base_total_rm_cost = 0
for d in self.get("items"): for d in self.get("items"):
old_rate = d.rate
d.rate = self.get_rm_rate(
{
"company": self.company,
"item_code": d.item_code,
"bom_no": d.bom_no,
"qty": d.qty,
"uom": d.uom,
"stock_uom": d.stock_uom,
"conversion_factor": d.conversion_factor,
"sourced_by_supplier": d.sourced_by_supplier,
}
)
d.base_rate = flt(d.rate) * flt(self.conversion_rate) d.base_rate = flt(d.rate) * flt(self.conversion_rate)
d.amount = flt(d.rate, d.precision("rate")) * flt(d.qty, d.precision("qty")) d.amount = flt(d.rate, d.precision("rate")) * flt(d.qty, d.precision("qty"))
d.base_amount = d.amount * flt(self.conversion_rate) d.base_amount = d.amount * flt(self.conversion_rate)
@ -669,11 +662,13 @@ class BOM(WebsiteGenerator):
total_rm_cost += d.amount total_rm_cost += d.amount
base_total_rm_cost += d.base_amount base_total_rm_cost += d.base_amount
if save and (old_rate != d.rate):
d.db_update()
self.raw_material_cost = total_rm_cost self.raw_material_cost = total_rm_cost
self.base_raw_material_cost = base_total_rm_cost self.base_raw_material_cost = base_total_rm_cost
def calculate_sm_cost(self): def calculate_sm_cost(self, save=False):
"""Fetch RM rate as per today's valuation rate and calculate totals""" """Fetch RM rate as per today's valuation rate and calculate totals"""
total_sm_cost = 0 total_sm_cost = 0
base_total_sm_cost = 0 base_total_sm_cost = 0
@ -688,10 +683,45 @@ class BOM(WebsiteGenerator):
) )
total_sm_cost += d.amount total_sm_cost += d.amount
base_total_sm_cost += d.base_amount base_total_sm_cost += d.base_amount
if save:
d.db_update()
self.scrap_material_cost = total_sm_cost self.scrap_material_cost = total_sm_cost
self.base_scrap_material_cost = base_total_sm_cost self.base_scrap_material_cost = base_total_sm_cost
def calculate_exploded_cost(self):
"Set exploded row cost from it's parent BOM."
rm_rate_map = self.get_rm_rate_map()
for row in self.get("exploded_items"):
old_rate = flt(row.rate)
row.rate = rm_rate_map.get(row.item_code)
row.amount = flt(row.stock_qty) * flt(row.rate)
if old_rate != row.rate:
# Only db_update if changed
row.db_update()
def get_rm_rate_map(self) -> Dict[str, float]:
"Create Raw Material-Rate map for Exploded Items. Fetch rate from Items table or Subassembly BOM."
rm_rate_map = {}
for item in self.get("items"):
if item.bom_no:
# Get Item-Rate from Subassembly BOM
explosion_items = frappe.get_all(
"BOM Explosion Item",
filters={"parent": item.bom_no},
fields=["item_code", "rate"],
order_by=None, # to avoid sort index creation at db level (granular change)
)
explosion_item_rate = {item.item_code: flt(item.rate) for item in explosion_items}
rm_rate_map.update(explosion_item_rate)
else:
rm_rate_map[item.item_code] = flt(item.base_rate) / flt(item.conversion_factor or 1.0)
return rm_rate_map
def update_exploded_items(self, save=True): def update_exploded_items(self, save=True):
"""Update Flat BOM, following will be correct data""" """Update Flat BOM, following will be correct data"""
self.get_exploded_items() self.get_exploded_items()
@ -902,44 +932,46 @@ def get_bom_item_rate(args, bom_doc):
return flt(rate) return flt(rate)
def get_valuation_rate(args): def get_valuation_rate(data):
"""Get weighted average of valuation rate from all warehouses""" """
1) Get average valuation rate from all warehouses
2) If no value, get last valuation rate from SLE
3) If no value, get valuation rate from Item
"""
from frappe.query_builder.functions import Sum
total_qty, total_value, valuation_rate = 0.0, 0.0, 0.0 item_code, company = data.get("item_code"), data.get("company")
item_bins = frappe.db.sql( valuation_rate = 0.0
"""
select
bin.actual_qty, bin.stock_value
from
`tabBin` bin, `tabWarehouse` warehouse
where
bin.item_code=%(item)s
and bin.warehouse = warehouse.name
and warehouse.company=%(company)s""",
{"item": args["item_code"], "company": args["company"]},
as_dict=1,
)
for d in item_bins: bin_table = frappe.qb.DocType("Bin")
total_qty += flt(d.actual_qty) wh_table = frappe.qb.DocType("Warehouse")
total_value += flt(d.stock_value) item_valuation = (
frappe.qb.from_(bin_table)
.join(wh_table)
.on(bin_table.warehouse == wh_table.name)
.select((Sum(bin_table.stock_value) / Sum(bin_table.actual_qty)).as_("valuation_rate"))
.where((bin_table.item_code == item_code) & (wh_table.company == company))
).run(as_dict=True)[0]
if total_qty: valuation_rate = item_valuation.get("valuation_rate")
valuation_rate = total_value / total_qty
if valuation_rate <= 0: if (valuation_rate is not None) and valuation_rate <= 0:
last_valuation_rate = frappe.db.sql( # Explicit null value check. If None, Bins don't exist, neither does SLE
"""select valuation_rate sle = frappe.qb.DocType("Stock Ledger Entry")
from `tabStock Ledger Entry` last_val_rate = (
where item_code = %s and valuation_rate > 0 and is_cancelled = 0 frappe.qb.from_(sle)
order by posting_date desc, posting_time desc, creation desc limit 1""", .select(sle.valuation_rate)
args["item_code"], .where((sle.item_code == item_code) & (sle.valuation_rate > 0) & (sle.is_cancelled == 0))
) .orderby(sle.posting_date, order=frappe.qb.desc)
.orderby(sle.posting_time, order=frappe.qb.desc)
.orderby(sle.creation, order=frappe.qb.desc)
.limit(1)
).run(as_dict=True)
valuation_rate = flt(last_valuation_rate[0][0]) if last_valuation_rate else 0 valuation_rate = flt(last_val_rate[0].get("valuation_rate")) if last_val_rate else 0
if not valuation_rate: if not valuation_rate:
valuation_rate = frappe.db.get_value("Item", args["item_code"], "valuation_rate") valuation_rate = frappe.db.get_value("Item", item_code, "valuation_rate")
return flt(valuation_rate) return flt(valuation_rate)
@ -1125,39 +1157,6 @@ def get_children(parent=None, is_root=False, **filters):
return bom_items return bom_items
def get_boms_in_bottom_up_order(bom_no=None):
def _get_parent(bom_no):
return frappe.db.sql_list(
"""
select distinct bom_item.parent from `tabBOM Item` bom_item
where bom_item.bom_no = %s and bom_item.docstatus=1 and bom_item.parenttype='BOM'
and exists(select bom.name from `tabBOM` bom where bom.name=bom_item.parent and bom.is_active=1)
""",
bom_no,
)
count = 0
bom_list = []
if bom_no:
bom_list.append(bom_no)
else:
# get all leaf BOMs
bom_list = frappe.db.sql_list(
"""select name from `tabBOM` bom
where docstatus=1 and is_active=1
and not exists(select bom_no from `tabBOM Item`
where parent=bom.name and ifnull(bom_no, '')!='')"""
)
while count < len(bom_list):
for child_bom in _get_parent(bom_list[count]):
if child_bom not in bom_list:
bom_list.append(child_bom)
count += 1
return bom_list
def add_additional_cost(stock_entry, work_order): def add_additional_cost(stock_entry, work_order):
# Add non stock items cost in the additional cost # Add non stock items cost in the additional cost
stock_entry.additional_costs = [] stock_entry.additional_costs = []

View File

@ -11,7 +11,9 @@ from frappe.utils import cstr, flt
from erpnext.buying.doctype.purchase_order.test_purchase_order import create_purchase_order from erpnext.buying.doctype.purchase_order.test_purchase_order import create_purchase_order
from erpnext.manufacturing.doctype.bom.bom import BOMRecursionError, item_query, make_variant_bom from erpnext.manufacturing.doctype.bom.bom import BOMRecursionError, item_query, make_variant_bom
from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost from erpnext.manufacturing.doctype.bom_update_log.test_bom_update_log import (
update_cost_in_all_boms_in_test,
)
from erpnext.stock.doctype.item.test_item import make_item from erpnext.stock.doctype.item.test_item import make_item
from erpnext.stock.doctype.stock_reconciliation.test_stock_reconciliation import ( from erpnext.stock.doctype.stock_reconciliation.test_stock_reconciliation import (
create_stock_reconciliation, create_stock_reconciliation,
@ -69,26 +71,31 @@ class TestBOM(FrappeTestCase):
def test_update_bom_cost_in_all_boms(self): def test_update_bom_cost_in_all_boms(self):
# get current rate for '_Test Item 2' # get current rate for '_Test Item 2'
rm_rate = frappe.db.sql( bom_rates = frappe.db.get_values(
"""select rate from `tabBOM Item` "BOM Item",
where parent='BOM-_Test Item Home Desktop Manufactured-001' {
and item_code='_Test Item 2' and docstatus=1 and parenttype='BOM'""" "parent": "BOM-_Test Item Home Desktop Manufactured-001",
"item_code": "_Test Item 2",
"docstatus": 1,
},
fieldname=["rate", "base_rate"],
as_dict=True,
) )
rm_rate = rm_rate[0][0] if rm_rate else 0 rm_base_rate = bom_rates[0].get("base_rate") if bom_rates else 0
# Reset item valuation rate # Reset item valuation rate
reset_item_valuation_rate(item_code="_Test Item 2", qty=200, rate=rm_rate + 10) reset_item_valuation_rate(item_code="_Test Item 2", qty=200, rate=rm_base_rate + 10)
# update cost of all BOMs based on latest valuation rate # update cost of all BOMs based on latest valuation rate
update_cost() update_cost_in_all_boms_in_test()
# check if new valuation rate updated in all BOMs # check if new valuation rate updated in all BOMs
for d in frappe.db.sql( for d in frappe.db.sql(
"""select rate from `tabBOM Item` """select base_rate from `tabBOM Item`
where item_code='_Test Item 2' and docstatus=1 and parenttype='BOM'""", where item_code='_Test Item 2' and docstatus=1 and parenttype='BOM'""",
as_dict=1, as_dict=1,
): ):
self.assertEqual(d.rate, rm_rate + 10) self.assertEqual(d.base_rate, rm_base_rate + 10)
def test_bom_cost(self): def test_bom_cost(self):
bom = frappe.copy_doc(test_records[2]) bom = frappe.copy_doc(test_records[2])

View File

@ -32,6 +32,7 @@
"is_active": 1, "is_active": 1,
"is_default": 1, "is_default": 1,
"item": "_Test Item Home Desktop Manufactured", "item": "_Test Item Home Desktop Manufactured",
"company": "_Test Company",
"quantity": 1.0 "quantity": 1.0
}, },
{ {

View File

@ -169,13 +169,15 @@
"index_web_pages_for_search": 1, "index_web_pages_for_search": 1,
"istable": 1, "istable": 1,
"links": [], "links": [],
"modified": "2020-10-08 16:21:29.386212", "modified": "2022-05-27 13:42:23.305455",
"modified_by": "Administrator", "modified_by": "Administrator",
"module": "Manufacturing", "module": "Manufacturing",
"name": "BOM Explosion Item", "name": "BOM Explosion Item",
"naming_rule": "Random",
"owner": "Administrator", "owner": "Administrator",
"permissions": [], "permissions": [],
"sort_field": "modified", "sort_field": "modified",
"sort_order": "DESC", "sort_order": "DESC",
"states": [],
"track_changes": 1 "track_changes": 1
} }

View File

@ -0,0 +1,55 @@
{
"actions": [],
"autoname": "autoincrement",
"creation": "2022-05-31 17:34:39.825537",
"doctype": "DocType",
"engine": "InnoDB",
"field_order": [
"level",
"batch_no",
"boms_updated",
"status"
],
"fields": [
{
"fieldname": "level",
"fieldtype": "Int",
"in_list_view": 1,
"label": "Level"
},
{
"fieldname": "batch_no",
"fieldtype": "Int",
"in_list_view": 1,
"label": "Batch No."
},
{
"fieldname": "boms_updated",
"fieldtype": "Long Text",
"hidden": 1,
"in_list_view": 1,
"label": "BOMs Updated"
},
{
"fieldname": "status",
"fieldtype": "Select",
"in_list_view": 1,
"label": "Status",
"options": "Pending\nCompleted",
"read_only": 1
}
],
"index_web_pages_for_search": 1,
"istable": 1,
"links": [],
"modified": "2022-06-06 14:50:35.161062",
"modified_by": "Administrator",
"module": "Manufacturing",
"name": "BOM Update Batch",
"naming_rule": "Autoincrement",
"owner": "Administrator",
"permissions": [],
"sort_field": "modified",
"sort_order": "DESC",
"states": []
}

View File

@ -0,0 +1,9 @@
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
# import frappe
from frappe.model.document import Document
class BOMUpdateBatch(Document):
pass

View File

@ -13,6 +13,10 @@
"update_type", "update_type",
"status", "status",
"error_log", "error_log",
"progress_section",
"current_level",
"processed_boms",
"bom_batches",
"amended_from" "amended_from"
], ],
"fields": [ "fields": [
@ -63,13 +67,36 @@
"fieldtype": "Link", "fieldtype": "Link",
"label": "Error Log", "label": "Error Log",
"options": "Error Log" "options": "Error Log"
},
{
"collapsible": 1,
"depends_on": "eval: doc.update_type == \"Update Cost\"",
"fieldname": "progress_section",
"fieldtype": "Section Break",
"label": "Progress"
},
{
"fieldname": "processed_boms",
"fieldtype": "Long Text",
"hidden": 1,
"label": "Processed BOMs"
},
{
"fieldname": "bom_batches",
"fieldtype": "Table",
"options": "BOM Update Batch"
},
{
"fieldname": "current_level",
"fieldtype": "Int",
"label": "Current Level"
} }
], ],
"in_create": 1, "in_create": 1,
"index_web_pages_for_search": 1, "index_web_pages_for_search": 1,
"is_submittable": 1, "is_submittable": 1,
"links": [], "links": [],
"modified": "2022-03-31 12:51:44.885102", "modified": "2022-06-06 15:15:23.883251",
"modified_by": "Administrator", "modified_by": "Administrator",
"module": "Manufacturing", "module": "Manufacturing",
"name": "BOM Update Log", "name": "BOM Update Log",

View File

@ -1,13 +1,20 @@
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors # Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt # For license information, please see license.txt
from typing import Dict, List, Literal, Optional import json
from typing import Any, Dict, List, Optional, Tuple, Union
import frappe import frappe
from frappe import _ from frappe import _
from frappe.model.document import Document from frappe.model.document import Document
from frappe.utils import cstr, flt from frappe.utils import cint, cstr
from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import (
get_leaf_boms,
get_next_higher_level_boms,
handle_exception,
replace_bom,
set_values_in_log,
)
class BOMMissingError(frappe.ValidationError): class BOMMissingError(frappe.ValidationError):
@ -20,6 +27,8 @@ class BOMUpdateLog(Document):
self.validate_boms_are_specified() self.validate_boms_are_specified()
self.validate_same_bom() self.validate_same_bom()
self.validate_bom_items() self.validate_bom_items()
else:
self.validate_bom_cost_update_in_progress()
self.status = "Queued" self.status = "Queued"
@ -42,123 +51,184 @@ class BOMUpdateLog(Document):
if current_bom_item != new_bom_item: if current_bom_item != new_bom_item:
frappe.throw(_("The selected BOMs are not for the same item")) frappe.throw(_("The selected BOMs are not for the same item"))
def on_submit(self): def validate_bom_cost_update_in_progress(self):
if frappe.flags.in_test: "If another Cost Updation Log is still in progress, dont make new ones."
return
wip_log = frappe.get_all(
"BOM Update Log",
{"update_type": "Update Cost", "status": ["in", ["Queued", "In Progress"]]},
limit_page_length=1,
)
if wip_log:
log_link = frappe.utils.get_link_to_form("BOM Update Log", wip_log[0].name)
frappe.throw(
_("BOM Updation already in progress. Please wait until {0} is complete.").format(log_link),
title=_("Note"),
)
def on_submit(self):
if self.update_type == "Replace BOM": if self.update_type == "Replace BOM":
boms = {"current_bom": self.current_bom, "new_bom": self.new_bom} boms = {"current_bom": self.current_bom, "new_bom": self.new_bom}
frappe.enqueue( frappe.enqueue(
method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job", method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_replace_bom_job",
doc=self, doc=self,
boms=boms, boms=boms,
timeout=40000, timeout=40000,
now=frappe.flags.in_test,
) )
else: else:
frappe.enqueue( process_boms_cost_level_wise(self)
method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job",
doc=self,
update_type="Update Cost",
timeout=40000,
)
def replace_bom(boms: Dict) -> None: def run_replace_bom_job(
"""Replace current BOM with new BOM in parent BOMs."""
current_bom = boms.get("current_bom")
new_bom = boms.get("new_bom")
unit_cost = get_new_bom_unit_cost(new_bom)
update_new_bom_in_bom_items(unit_cost, current_bom, new_bom)
frappe.cache().delete_key("bom_children")
parent_boms = get_parent_boms(new_bom)
for bom in parent_boms:
bom_obj = frappe.get_doc("BOM", bom)
# this is only used for versioning and we do not want
# to make separate db calls by using load_doc_before_save
# which proves to be expensive while doing bulk replace
bom_obj._doc_before_save = bom_obj
bom_obj.update_exploded_items()
bom_obj.calculate_cost()
bom_obj.update_parent_cost()
bom_obj.db_update()
if bom_obj.meta.get("track_changes") and not bom_obj.flags.ignore_version:
bom_obj.save_version()
def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None:
bom_item = frappe.qb.DocType("BOM Item")
(
frappe.qb.update(bom_item)
.set(bom_item.bom_no, new_bom)
.set(bom_item.rate, unit_cost)
.set(bom_item.amount, (bom_item.stock_qty * unit_cost))
.where(
(bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")
)
).run()
def get_parent_boms(new_bom: str, bom_list: Optional[List] = None) -> List:
bom_list = bom_list or []
bom_item = frappe.qb.DocType("BOM Item")
parents = (
frappe.qb.from_(bom_item)
.select(bom_item.parent)
.where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM"))
.run(as_dict=True)
)
for d in parents:
if new_bom == d.parent:
frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent))
bom_list.append(d.parent)
get_parent_boms(d.parent, bom_list)
return list(set(bom_list))
def get_new_bom_unit_cost(new_bom: str) -> float:
bom = frappe.qb.DocType("BOM")
new_bom_unitcost = (
frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == new_bom).run()
)
return flt(new_bom_unitcost[0][0])
def run_bom_job(
doc: "BOMUpdateLog", doc: "BOMUpdateLog",
boms: Optional[Dict[str, str]] = None, boms: Optional[Dict[str, str]] = None,
update_type: Literal["Replace BOM", "Update Cost"] = "Replace BOM",
) -> None: ) -> None:
try: try:
doc.db_set("status", "In Progress") doc.db_set("status", "In Progress")
if not frappe.flags.in_test: if not frappe.flags.in_test:
frappe.db.commit() frappe.db.commit()
frappe.db.auto_commit_on_many_writes = 1 frappe.db.auto_commit_on_many_writes = 1
boms = frappe._dict(boms or {}) boms = frappe._dict(boms or {})
replace_bom(boms, doc.name)
if update_type == "Replace BOM":
replace_bom(boms)
else:
update_cost()
doc.db_set("status", "Completed") doc.db_set("status", "Completed")
except Exception: except Exception:
frappe.db.rollback() handle_exception(doc)
error_log = doc.log_error("BOM Update Tool Error")
doc.db_set("status", "Failed")
doc.db_set("error_log", error_log.name)
finally: finally:
frappe.db.auto_commit_on_many_writes = 0 frappe.db.auto_commit_on_many_writes = 0
frappe.db.commit() # nosemgrep
if not frappe.flags.in_test:
frappe.db.commit() # nosemgrep
def process_boms_cost_level_wise(
update_doc: "BOMUpdateLog", parent_boms: List[str] = None
) -> Union[None, Tuple]:
"Queue jobs at the start of new BOM Level in 'Update Cost' Jobs."
current_boms = {}
values = {}
if update_doc.status == "Queued":
# First level yet to process. On Submit.
current_level = 0
current_boms = get_leaf_boms()
values = {
"processed_boms": json.dumps({}),
"status": "In Progress",
"current_level": current_level,
}
else:
# Resume next level. via Cron Job.
if not parent_boms:
return
current_level = cint(update_doc.current_level) + 1
# Process the next level BOMs. Stage parents as current BOMs.
current_boms = parent_boms.copy()
values = {"current_level": current_level}
set_values_in_log(update_doc.name, values, commit=True)
queue_bom_cost_jobs(current_boms, update_doc, current_level)
def queue_bom_cost_jobs(
current_boms_list: List[str], update_doc: "BOMUpdateLog", current_level: int
) -> None:
"Queue batches of 20k BOMs of the same level to process parallelly"
batch_no = 0
while current_boms_list:
batch_no += 1
batch_size = 20_000
boms_to_process = current_boms_list[:batch_size] # slice out batch of 20k BOMs
# update list to exclude 20K (queued) BOMs
current_boms_list = current_boms_list[batch_size:] if len(current_boms_list) > batch_size else []
batch_row = update_doc.append(
"bom_batches", {"level": current_level, "batch_no": batch_no, "status": "Pending"}
)
batch_row.db_insert()
frappe.enqueue(
method="erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils.update_cost_in_level",
doc=update_doc,
bom_list=boms_to_process,
batch_name=batch_row.name,
queue="long",
now=frappe.flags.in_test,
)
def resume_bom_cost_update_jobs():
"""
1. Checks for In Progress BOM Update Log.
2. Checks if this job has completed the _current level_.
3. If current level is complete, get parent BOMs and start next level.
4. If no parents, mark as Complete.
5. If current level is WIP, skip the Log.
Called every 5 minutes via Cron job.
"""
in_progress_logs = frappe.db.get_all(
"BOM Update Log",
{"update_type": "Update Cost", "status": "In Progress"},
["name", "processed_boms", "current_level"],
)
if not in_progress_logs:
return
for log in in_progress_logs:
# check if all log batches of current level are processed
bom_batches = frappe.db.get_all(
"BOM Update Batch",
{"parent": log.name, "level": log.current_level},
["name", "boms_updated", "status"],
)
incomplete_level = any(row.get("status") == "Pending" for row in bom_batches)
if not bom_batches or incomplete_level:
continue
# Prep parent BOMs & updated processed BOMs for next level
current_boms, processed_boms = get_processed_current_boms(log, bom_batches)
parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms)
# Unset processed BOMs if log is complete, it is used for next level BOMs
set_values_in_log(
log.name,
values={
"processed_boms": json.dumps([] if not parent_boms else processed_boms),
"status": "Completed" if not parent_boms else "In Progress",
},
commit=True,
)
if parent_boms: # there is a next level to process
process_boms_cost_level_wise(
update_doc=frappe.get_doc("BOM Update Log", log.name), parent_boms=parent_boms
)
def get_processed_current_boms(
log: Dict[str, Any], bom_batches: Dict[str, Any]
) -> Tuple[List[str], Dict[str, Any]]:
"""
Aggregate all BOMs from BOM Update Batch rows into 'processed_boms' field
and into current boms list.
"""
processed_boms = json.loads(log.processed_boms) if log.processed_boms else {}
current_boms = []
for row in bom_batches:
boms_updated = json.loads(row.boms_updated)
current_boms.extend(boms_updated)
boms_updated_dict = {bom: True for bom in boms_updated}
processed_boms.update(boms_updated_dict)
return current_boms, processed_boms

View File

@ -0,0 +1,225 @@
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
import copy
import json
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
if TYPE_CHECKING:
from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import BOMUpdateLog
import frappe
from frappe import _
def replace_bom(boms: Dict, log_name: str) -> None:
"Replace current BOM with new BOM in parent BOMs."
current_bom = boms.get("current_bom")
new_bom = boms.get("new_bom")
unit_cost = get_bom_unit_cost(new_bom)
update_new_bom_in_bom_items(unit_cost, current_bom, new_bom)
frappe.cache().delete_key("bom_children")
parent_boms = get_ancestor_boms(new_bom)
for bom in parent_boms:
bom_obj = frappe.get_doc("BOM", bom)
# this is only used for versioning and we do not want
# to make separate db calls by using load_doc_before_save
# which proves to be expensive while doing bulk replace
bom_obj._doc_before_save = copy.deepcopy(bom_obj)
bom_obj.update_exploded_items()
bom_obj.calculate_cost()
bom_obj.update_parent_cost()
bom_obj.db_update()
bom_obj.flags.updater_reference = {
"doctype": "BOM Update Log",
"docname": log_name,
"label": _("via BOM Update Tool"),
}
bom_obj.save_version()
def update_cost_in_level(
doc: "BOMUpdateLog", bom_list: List[str], batch_name: Union[int, str]
) -> None:
"Updates Cost for BOMs within a given level. Runs via background jobs."
try:
status = frappe.db.get_value("BOM Update Log", doc.name, "status")
if status == "Failed":
return
update_cost_in_boms(bom_list=bom_list) # main updation logic
bom_batch = frappe.qb.DocType("BOM Update Batch")
(
frappe.qb.update(bom_batch)
.set(bom_batch.boms_updated, json.dumps(bom_list))
.set(bom_batch.status, "Completed")
.where(bom_batch.name == batch_name)
).run()
except Exception:
handle_exception(doc)
finally:
if not frappe.flags.in_test:
frappe.db.commit() # nosemgrep
def get_ancestor_boms(new_bom: str, bom_list: Optional[List] = None) -> List:
"Recursively get all ancestors of BOM."
bom_list = bom_list or []
bom_item = frappe.qb.DocType("BOM Item")
parents = (
frappe.qb.from_(bom_item)
.select(bom_item.parent)
.where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM"))
.run(as_dict=True)
)
for d in parents:
if new_bom == d.parent:
frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent))
bom_list.append(d.parent)
get_ancestor_boms(d.parent, bom_list)
return list(set(bom_list))
def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None:
bom_item = frappe.qb.DocType("BOM Item")
(
frappe.qb.update(bom_item)
.set(bom_item.bom_no, new_bom)
.set(bom_item.rate, unit_cost)
.set(bom_item.amount, (bom_item.stock_qty * unit_cost))
.where(
(bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")
)
).run()
def get_bom_unit_cost(bom_name: str) -> float:
bom = frappe.qb.DocType("BOM")
new_bom_unitcost = (
frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == bom_name).run()
)
return frappe.utils.flt(new_bom_unitcost[0][0])
def update_cost_in_boms(bom_list: List[str]) -> None:
"Updates cost in given BOMs. Returns current and total updated BOMs."
for index, bom in enumerate(bom_list):
bom_doc = frappe.get_doc("BOM", bom, for_update=True)
bom_doc.calculate_cost(save_updates=True, update_hour_rate=True)
bom_doc.db_update()
if (index % 50 == 0) and not frappe.flags.in_test:
frappe.db.commit() # nosemgrep
def get_next_higher_level_boms(
child_boms: List[str], processed_boms: Dict[str, bool]
) -> List[str]:
"Generate immediate higher level dependants with no unresolved dependencies (children)."
def _all_children_are_processed(parent_bom):
child_boms = dependency_map.get(parent_bom)
return all(processed_boms.get(bom) for bom in child_boms)
dependants_map, dependency_map = _generate_dependence_map()
dependants = []
for bom in child_boms:
# generate list of immediate dependants
parents = dependants_map.get(bom) or []
dependants.extend(parents)
dependants = set(dependants) # remove duplicates
resolved_dependants = set()
# consider only if children are all resolved
for parent_bom in dependants:
if _all_children_are_processed(parent_bom):
resolved_dependants.add(parent_bom)
return list(resolved_dependants)
def get_leaf_boms() -> List[str]:
"Get BOMs that have no dependencies."
return frappe.db.sql_list(
"""select name from `tabBOM` bom
where docstatus=1 and is_active=1
and not exists(select bom_no from `tabBOM Item`
where parent=bom.name and ifnull(bom_no, '')!='')"""
)
def _generate_dependence_map() -> defaultdict:
"""
Generate maps such as: { BOM-1: [Dependant-BOM-1, Dependant-BOM-2, ..] }.
Here BOM-1 is the leaf/lower level node/dependency.
The list contains one level higher nodes/dependants that depend on BOM-1.
Generate and return the reverse as well.
"""
bom = frappe.qb.DocType("BOM")
bom_item = frappe.qb.DocType("BOM Item")
bom_items = (
frappe.qb.from_(bom_item)
.join(bom)
.on(bom_item.parent == bom.name)
.select(bom_item.bom_no, bom_item.parent)
.where(
(bom_item.bom_no.isnotnull())
& (bom_item.bom_no != "")
& (bom.docstatus == 1)
& (bom.is_active == 1)
& (bom_item.parenttype == "BOM")
)
).run(as_dict=True)
child_parent_map = defaultdict(list)
parent_child_map = defaultdict(list)
for row in bom_items:
child_parent_map[row.bom_no].append(row.parent)
parent_child_map[row.parent].append(row.bom_no)
return child_parent_map, parent_child_map
def set_values_in_log(log_name: str, values: Dict[str, Any], commit: bool = False) -> None:
"Update BOM Update Log record."
if not values:
return
bom_update_log = frappe.qb.DocType("BOM Update Log")
query = frappe.qb.update(bom_update_log).where(bom_update_log.name == log_name)
for key, value in values.items():
query = query.set(key, value)
query.run()
if commit and not frappe.flags.in_test:
frappe.db.commit() # nosemgrep
def handle_exception(doc: "BOMUpdateLog") -> None:
"Rolls back and fails BOM Update Log."
frappe.db.rollback()
error_log = doc.log_error("BOM Update Tool Error")
set_values_in_log(doc.name, {"status": "Failed", "error_log": error_log.name})

View File

@ -6,9 +6,12 @@ from frappe.tests.utils import FrappeTestCase
from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import ( from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import (
BOMMissingError, BOMMissingError,
run_bom_job, resume_bom_cost_update_jobs,
)
from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import (
enqueue_replace_bom,
enqueue_update_cost,
) )
from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import enqueue_replace_bom
test_records = frappe.get_test_records("BOM") test_records = frappe.get_test_records("BOM")
@ -31,17 +34,12 @@ class TestBOMUpdateLog(FrappeTestCase):
def tearDown(self): def tearDown(self):
frappe.db.rollback() frappe.db.rollback()
if self._testMethodName == "test_bom_update_log_completion":
# clear logs and delete BOM created via setUp
frappe.db.delete("BOM Update Log")
self.new_bom_doc.cancel()
self.new_bom_doc.delete()
# explicitly commit and restore to original state
frappe.db.commit() # nosemgrep
def test_bom_update_log_validate(self): def test_bom_update_log_validate(self):
"Test if BOM presence is validated." """
1) Test if BOM presence is validated.
2) Test if same BOMs are validated.
3) Test of non-existent BOM is validated.
"""
with self.assertRaises(BOMMissingError): with self.assertRaises(BOMMissingError):
enqueue_replace_bom(boms={}) enqueue_replace_bom(boms={})
@ -52,45 +50,22 @@ class TestBOMUpdateLog(FrappeTestCase):
with self.assertRaises(frappe.ValidationError): with self.assertRaises(frappe.ValidationError):
enqueue_replace_bom(boms=frappe._dict(current_bom=self.boms.new_bom, new_bom="Dummy BOM")) enqueue_replace_bom(boms=frappe._dict(current_bom=self.boms.new_bom, new_bom="Dummy BOM"))
def test_bom_update_log_queueing(self):
"Test if BOM Update Log is created and queued."
log = enqueue_replace_bom(
boms=self.boms,
)
self.assertEqual(log.docstatus, 1)
self.assertEqual(log.status, "Queued")
def test_bom_update_log_completion(self): def test_bom_update_log_completion(self):
"Test if BOM Update Log handles job completion correctly." "Test if BOM Update Log handles job completion correctly."
log = enqueue_replace_bom( log = enqueue_replace_bom(boms=self.boms)
boms=self.boms,
)
# Explicitly commits log, new bom (setUp) and replacement impact.
# Is run via background jobs IRL
run_bom_job(
doc=log,
boms=self.boms,
update_type="Replace BOM",
)
log.reload() log.reload()
self.assertEqual(log.status, "Completed") self.assertEqual(log.status, "Completed")
# teardown (undo replace impact) due to commit
boms = frappe._dict( def update_cost_in_all_boms_in_test():
current_bom=self.boms.new_bom, """
new_bom=self.boms.current_bom, Utility to run 'Update Cost' job in tests without Cron job until fully complete.
) """
log2 = enqueue_replace_bom( log = enqueue_update_cost() # create BOM Update Log
boms=self.boms,
) while log.status != "Completed":
run_bom_job( # Explicitly commits resume_bom_cost_update_jobs() # run cron job until complete
doc=log2, log.reload()
boms=boms,
update_type="Replace BOM", return log
)
self.assertEqual(log2.status, "Completed")

View File

@ -10,8 +10,6 @@ if TYPE_CHECKING:
import frappe import frappe
from frappe.model.document import Document from frappe.model.document import Document
from erpnext.manufacturing.doctype.bom.bom import get_boms_in_bottom_up_order
class BOMUpdateTool(Document): class BOMUpdateTool(Document):
pass pass
@ -40,14 +38,13 @@ def enqueue_update_cost() -> "BOMUpdateLog":
def auto_update_latest_price_in_all_boms() -> None: def auto_update_latest_price_in_all_boms() -> None:
"""Called via hooks.py.""" """Called via hooks.py."""
if frappe.db.get_single_value("Manufacturing Settings", "update_bom_costs_automatically"): if frappe.db.get_single_value("Manufacturing Settings", "update_bom_costs_automatically"):
update_cost() wip_log = frappe.get_all(
"BOM Update Log",
{"update_type": "Update Cost", "status": ["in", ["Queued", "In Progress"]]},
def update_cost() -> None: limit_page_length=1,
"""Updates Cost for all BOMs from bottom to top.""" )
bom_list = get_boms_in_bottom_up_order() if not wip_log:
for bom in bom_list: create_bom_update_log(update_type="Update Cost")
frappe.get_doc("BOM", bom).update_cost(update_parent=False, from_child_bom=True)
def create_bom_update_log( def create_bom_update_log(

View File

@ -1,11 +1,13 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors # Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt # License: GNU General Public License v3. See license.txt
import frappe import frappe
from frappe.tests.utils import FrappeTestCase from frappe.tests.utils import FrappeTestCase
from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import replace_bom from erpnext.manufacturing.doctype.bom_update_log.test_bom_update_log import (
from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost update_cost_in_all_boms_in_test,
)
from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import enqueue_replace_bom
from erpnext.manufacturing.doctype.production_plan.test_production_plan import make_bom from erpnext.manufacturing.doctype.production_plan.test_production_plan import make_bom
from erpnext.stock.doctype.item.test_item import create_item from erpnext.stock.doctype.item.test_item import create_item
@ -15,6 +17,9 @@ test_records = frappe.get_test_records("BOM")
class TestBOMUpdateTool(FrappeTestCase): class TestBOMUpdateTool(FrappeTestCase):
"Test major functions run via BOM Update Tool." "Test major functions run via BOM Update Tool."
def tearDown(self):
frappe.db.rollback()
def test_replace_bom(self): def test_replace_bom(self):
current_bom = "BOM-_Test Item Home Desktop Manufactured-001" current_bom = "BOM-_Test Item Home Desktop Manufactured-001"
@ -23,15 +28,10 @@ class TestBOMUpdateTool(FrappeTestCase):
bom_doc.insert() bom_doc.insert()
boms = frappe._dict(current_bom=current_bom, new_bom=bom_doc.name) boms = frappe._dict(current_bom=current_bom, new_bom=bom_doc.name)
replace_bom(boms) enqueue_replace_bom(boms=boms)
self.assertFalse(frappe.db.sql("select name from `tabBOM Item` where bom_no=%s", current_bom)) self.assertFalse(frappe.db.exists("BOM Item", {"bom_no": current_bom, "docstatus": 1}))
self.assertTrue(frappe.db.sql("select name from `tabBOM Item` where bom_no=%s", bom_doc.name)) self.assertTrue(frappe.db.exists("BOM Item", {"bom_no": bom_doc.name, "docstatus": 1}))
# reverse, as it affects other testcases
boms.current_bom = bom_doc.name
boms.new_bom = current_bom
replace_bom(boms)
def test_bom_cost(self): def test_bom_cost(self):
for item in ["BOM Cost Test Item 1", "BOM Cost Test Item 2", "BOM Cost Test Item 3"]: for item in ["BOM Cost Test Item 1", "BOM Cost Test Item 2", "BOM Cost Test Item 3"]:
@ -52,13 +52,13 @@ class TestBOMUpdateTool(FrappeTestCase):
self.assertEqual(doc.total_cost, 200) self.assertEqual(doc.total_cost, 200)
frappe.db.set_value("Item", "BOM Cost Test Item 2", "valuation_rate", 200) frappe.db.set_value("Item", "BOM Cost Test Item 2", "valuation_rate", 200)
update_cost() update_cost_in_all_boms_in_test()
doc.load_from_db() doc.load_from_db()
self.assertEqual(doc.total_cost, 300) self.assertEqual(doc.total_cost, 300)
frappe.db.set_value("Item", "BOM Cost Test Item 2", "valuation_rate", 100) frappe.db.set_value("Item", "BOM Cost Test Item 2", "valuation_rate", 100)
update_cost() update_cost_in_all_boms_in_test()
doc.load_from_db() doc.load_from_db()
self.assertEqual(doc.total_cost, 200) self.assertEqual(doc.total_cost, 200)

View File

@ -798,7 +798,6 @@ def make_bom(**args):
for item in args.raw_materials: for item in args.raw_materials:
item_doc = frappe.get_doc("Item", item) item_doc = frappe.get_doc("Item", item)
bom.append( bom.append(
"items", "items",
{ {

View File

@ -417,7 +417,7 @@ class TestWorkOrder(FrappeTestCase):
"doctype": "Item Price", "doctype": "Item Price",
"item_code": "_Test FG Non Stock Item", "item_code": "_Test FG Non Stock Item",
"price_list_rate": 1000, "price_list_rate": 1000,
"price_list": "Standard Buying", "price_list": "_Test Price List India",
} }
).insert(ignore_permissions=True) ).insert(ignore_permissions=True)
@ -426,8 +426,17 @@ class TestWorkOrder(FrappeTestCase):
item_code="_Test FG Item", target="_Test Warehouse - _TC", qty=1, basic_rate=100 item_code="_Test FG Item", target="_Test Warehouse - _TC", qty=1, basic_rate=100
) )
if not frappe.db.get_value("BOM", {"item": fg_item}): if not frappe.db.get_value("BOM", {"item": fg_item, "docstatus": 1}):
make_bom(item=fg_item, rate=1000, raw_materials=["_Test FG Item", "_Test FG Non Stock Item"]) bom = make_bom(
item=fg_item,
rate=1000,
raw_materials=["_Test FG Item", "_Test FG Non Stock Item"],
do_not_save=True,
)
bom.rm_cost_as_per = "Price List" # non stock item won't have valuation rate
bom.buying_price_list = "_Test Price List India"
bom.currency = "INR"
bom.save()
wo = make_wo_order_test_record(production_item=fg_item) wo = make_wo_order_test_record(production_item=fg_item)