fix: Linter and minor code refactor

- Create an indexed map of stale packed items table to avoid loops to check if packed item row exists
- Reset packed items if row deletion takes place
- Renamed functions to self-explain them
- Split long function
- Reduce function calls inside function (makes it harder to follow through)
This commit is contained in:
marination 2022-01-25 23:52:52 +05:30
parent f8a5786542
commit 4c677eafe9
2 changed files with 103 additions and 90 deletions

View File

@ -441,7 +441,7 @@ erpnext.buying.get_items_from_product_bundle = function(frm) {
type: "GET", type: "GET",
method: "erpnext.stock.doctype.packed_item.packed_item.get_items_from_product_bundle", method: "erpnext.stock.doctype.packed_item.packed_item.get_items_from_product_bundle",
args: { args: {
args: { row: {
item_code: args.product_bundle, item_code: args.product_bundle,
quantity: args.quantity, quantity: args.quantity,
parenttype: frm.doc.doctype, parenttype: frm.doc.doctype,

View File

@ -8,7 +8,7 @@ import json
import frappe import frappe
from frappe.model.document import Document from frappe.model.document import Document
from frappe.utils import cstr, flt from frappe.utils import flt
from erpnext.stock.get_item_details import get_item_details from erpnext.stock.get_item_details import get_item_details
@ -16,23 +16,26 @@ from erpnext.stock.get_item_details import get_item_details
class PackedItem(Document): class PackedItem(Document):
pass pass
def make_packing_list(doc): def make_packing_list(doc):
"""make packing list for Product Bundle item""" """make packing list for Product Bundle item"""
if doc.get("_action") and doc._action == "update_after_submit": if doc.get("_action") and doc._action == "update_after_submit": return
return
parent_items, reset = [], False
stale_packed_items_table = get_indexed_packed_items_table(doc)
if not doc.is_new(): if not doc.is_new():
reset_packing_list_if_deleted_items_exist(doc) reset = reset_packing_list_if_deleted_items_exist(doc)
parent_items = []
for item in doc.get("items"): for item in doc.get("items"):
if frappe.db.exists("Product Bundle", {"new_item_code": item.item_code}): if frappe.db.exists("Product Bundle", {"new_item_code": item.item_code}):
for bundle_item in get_product_bundle_items(item.item_code): for bundle_item in get_product_bundle_items(item.item_code):
update_packing_list_item( pi_row = add_packed_item_row(
doc=doc, packing_item_code=bundle_item.item_code, doc=doc, packing_item=bundle_item,
qty=flt(bundle_item.qty) * flt(item.stock_qty), main_item_row=item, packed_items_table=stale_packed_items_table,
main_item_row=item, description=bundle_item.description reset=reset
) )
update_packed_item_details(bundle_item, pi_row, item, doc)
if [item.item_code, item.name] not in parent_items: if [item.item_code, item.name] not in parent_items:
parent_items.append([item.item_code, item.name]) parent_items.append([item.item_code, item.name])
@ -40,15 +43,31 @@ def make_packing_list(doc):
if frappe.db.get_single_value("Selling Settings", "editable_bundle_item_rates"): if frappe.db.get_single_value("Selling Settings", "editable_bundle_item_rates"):
update_product_bundle_price(doc, parent_items) update_product_bundle_price(doc, parent_items)
def get_indexed_packed_items_table(doc):
"""
Create dict from stale packed items table like:
{(Parent Item 1, Bundle Item 1, ae4b5678): {...}, (key): {value}}
"""
indexed_table = {}
for packed_item in doc.get("packed_items"):
key = (packed_item.parent_item, packed_item.item_code, packed_item.parent_detail_docname)
indexed_table[key] = packed_item
return indexed_table
def reset_packing_list_if_deleted_items_exist(doc): def reset_packing_list_if_deleted_items_exist(doc):
doc_before_save = doc.get_doc_before_save() doc_before_save = doc.get_doc_before_save()
if doc_before_save: reset_table = False
items_are_deleted = len(doc_before_save.get("items")) != len(doc.get("items"))
else:
items_are_deleted = True
if items_are_deleted: if doc_before_save:
# reset table if items were deleted
reset_table = len(doc_before_save.get("items")) > len(doc.get("items"))
else:
reset_table = True # reset if via Update Items (cannot determine action)
if reset_table:
doc.set("packed_items", []) doc.set("packed_items", [])
return reset_table
def get_product_bundle_items(item_code): def get_product_bundle_items(item_code):
product_bundle = frappe.qb.DocType("Product Bundle") product_bundle = frappe.qb.DocType("Product Bundle")
@ -70,52 +89,30 @@ def get_product_bundle_items(item_code):
) )
return query.run(as_dict=True) return query.run(as_dict=True)
def update_packing_list_item(doc, packing_item_code, qty, main_item_row, description): def add_packed_item_row(doc, packing_item, main_item_row, packed_items_table, reset):
old_packed_items_map = None """Add and return packed item row.
doc: Transaction document
packing_item (dict): Packed Item details
main_item_row (dict): Items table row corresponding to packed item
packed_items_table (dict): Packed Items table before save (indexed)
reset (bool): State if table is reset or preserved as is
"""
exists, pi_row = False, {}
if doc.amended_from: # check if row already exists in packed items table
old_packed_items_map = get_old_packed_item_details(doc.packed_items) key = (main_item_row.item_code, packing_item.item_code, main_item_row.name)
if packed_items_table.get(key):
item = get_packing_item_details(packing_item_code, doc.company) pi_row, exists = packed_items_table.get(key), True
# check if exists
exists = 0
for d in doc.get("packed_items"):
if (d.parent_item == main_item_row.item_code and
d.item_code == packing_item_code and
d.parent_detail_docname == main_item_row.name
):
pi, exists = d, 1
break
if not exists: if not exists:
pi = doc.append('packed_items', {}) pi_row = doc.append('packed_items', {})
elif reset: # add row if row exists but table is reset
pi_row.idx, pi_row.name = None, None
pi_row = doc.append('packed_items', pi_row)
pi.parent_item = main_item_row.item_code return pi_row
pi.item_code = packing_item_code
pi.item_name = item.item_name
pi.parent_detail_docname = main_item_row.name
pi.uom = item.stock_uom
pi.qty = flt(qty)
pi.conversion_factor = main_item_row.conversion_factor
if description and not pi.description:
pi.description = description
if not pi.warehouse and not doc.amended_from:
pi.warehouse = (main_item_row.warehouse if ((doc.get('is_pos') or item.is_stock_item \
or not item.default_warehouse) and main_item_row.warehouse) else item.default_warehouse)
if not pi.batch_no and not doc.amended_from:
pi.batch_no = cstr(main_item_row.get("batch_no"))
if not pi.target_warehouse:
pi.target_warehouse = main_item_row.get("target_warehouse")
bin = get_packed_item_bin_qty(packing_item_code, pi.warehouse)
pi.actual_qty = flt(bin.get("actual_qty"))
pi.projected_qty = flt(bin.get("projected_qty"))
if old_packed_items_map and old_packed_items_map.get((packing_item_code, main_item_row.item_code)):
pi.batch_no = old_packed_items_map.get((packing_item_code, main_item_row.item_code))[0].batch_no
pi.serial_no = old_packed_items_map.get((packing_item_code, main_item_row.item_code))[0].serial_no
pi.warehouse = old_packed_items_map.get((packing_item_code, main_item_row.item_code))[0].warehouse
def get_packing_item_details(item_code, company): def get_packed_item_details(item_code, company):
item = frappe.qb.DocType("Item") item = frappe.qb.DocType("Item")
item_default = frappe.qb.DocType("Item Default") item_default = frappe.qb.DocType("Item Default")
query = ( query = (
@ -134,6 +131,44 @@ def get_packing_item_details(item_code, company):
) )
return query.run(as_dict=True)[0] return query.run(as_dict=True)[0]
def update_packed_item_details(packing_item, pi_row, main_item_row, doc):
"Update additional packed item row details."
item = get_packed_item_details(packing_item.item_code, doc.company)
prev_doc_packed_items_map = None
if doc.amended_from:
prev_doc_packed_items_map = get_cancelled_doc_packed_item_details(doc.packed_items)
pi_row.parent_item = main_item_row.item_code
pi_row.parent_detail_docname = main_item_row.name
pi_row.item_code = packing_item.item_code
pi_row.item_name = item.item_name
pi_row.uom = item.stock_uom
pi_row.qty = flt(packing_item.qty) * flt(main_item_row.stock_qty)
pi_row.conversion_factor = main_item_row.conversion_factor
if not pi_row.description:
pi_row.description = packing_item.get("description")
if not pi_row.warehouse and not doc.amended_from:
pi_row.warehouse = (main_item_row.warehouse if ((doc.get('is_pos') or item.is_stock_item \
or not item.default_warehouse) and main_item_row.warehouse) else item.default_warehouse)
# TODO batch_no, actual_batch_qty, incoming_rate
if not pi_row.target_warehouse:
pi_row.target_warehouse = main_item_row.get("target_warehouse")
bin = get_packed_item_bin_qty(packing_item.item_code, pi_row.warehouse)
pi_row.actual_qty = flt(bin.get("actual_qty"))
pi_row.projected_qty = flt(bin.get("projected_qty"))
if prev_doc_packed_items_map and prev_doc_packed_items_map.get((packing_item.item_code, main_item_row.item_code)):
prev_doc_row = prev_doc_packed_items_map.get((packing_item.item_code, main_item_row.item_code))
pi_row.batch_no = prev_doc_row[0].batch_no
pi_row.serial_no = prev_doc_row[0].serial_no
pi_row.warehouse = prev_doc_row[0].warehouse
def get_packed_item_bin_qty(item, warehouse): def get_packed_item_bin_qty(item, warehouse):
bin_data = frappe.db.get_values( bin_data = frappe.db.get_values(
"Bin", "Bin",
@ -144,37 +179,14 @@ def get_packed_item_bin_qty(item, warehouse):
return bin_data[0] if bin_data else {} return bin_data[0] if bin_data else {}
def get_old_packed_item_details(old_packed_items): def get_cancelled_doc_packed_item_details(old_packed_items):
old_packed_items_map = {} prev_doc_packed_items_map = {}
for items in old_packed_items: for items in old_packed_items:
old_packed_items_map.setdefault((items.item_code ,items.parent_item), []).append(items.as_dict()) prev_doc_packed_items_map.setdefault((items.item_code ,items.parent_item), []).append(items.as_dict())
return old_packed_items_map return prev_doc_packed_items_map
def add_item_to_packing_list(doc, packed_item):
doc.append("packed_items", {
'parent_item': packed_item.parent_item,
'item_code': packed_item.item_code,
'item_name': packed_item.item_name,
'uom': packed_item.uom,
'qty': packed_item.qty,
'rate': packed_item.rate,
'conversion_factor': packed_item.conversion_factor,
'description': packed_item.description,
'warehouse': packed_item.warehouse,
'batch_no': packed_item.batch_no,
'actual_batch_qty': packed_item.actual_batch_qty,
'serial_no': packed_item.serial_no,
'target_warehouse': packed_item.target_warehouse,
'actual_qty': packed_item.actual_qty,
'projected_qty': packed_item.projected_qty,
'incoming_rate': packed_item.incoming_rate,
'prevdoc_doctype': packed_item.prevdoc_doctype,
'parent_detail_docname': packed_item.parent_detail_docname
})
def update_product_bundle_price(doc, parent_items): def update_product_bundle_price(doc, parent_items):
"""Updates the prices of Product Bundles based on the rates of the Items in the bundle.""" """Updates the prices of Product Bundles based on the rates of the Items in the bundle."""
if not doc.get('items'): if not doc.get('items'):
return return
@ -204,17 +216,18 @@ def update_parent_item_price(doc, parent_item_code, bundle_price):
parent_item_doc.amount = bundle_price parent_item_doc.amount = bundle_price
parent_item_doc.rate = bundle_price/(parent_item_doc.qty or 1) parent_item_doc.rate = bundle_price/(parent_item_doc.qty or 1)
@frappe.whitelist()
def get_items_from_product_bundle(args):
args, items = json.loads(args), []
bundled_items = get_product_bundle_items(args["item_code"]) @frappe.whitelist()
def get_items_from_product_bundle(row):
row, items = json.loads(row), []
bundled_items = get_product_bundle_items(row["item_code"])
for item in bundled_items: for item in bundled_items:
args.update({ row.update({
"item_code": item.item_code, "item_code": item.item_code,
"qty": flt(args["quantity"]) * flt(item.qty) "qty": flt(row["quantity"]) * flt(item.qty)
}) })
items.append(get_item_details(args)) items.append(get_item_details(row))
return items return items