From 9af2d689455eb51ee527f900b2b437497beb62ae Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 14 Apr 2022 13:52:07 +0530 Subject: [PATCH] refactor: convert queries to ORM/QB, add types --- .../report/stock_balance/stock_balance.py | 94 ++++++++++--------- .../stock_balance/test_stock_balance.py | 3 +- 2 files changed, 50 insertions(+), 47 deletions(-) diff --git a/erpnext/stock/report/stock_balance/stock_balance.py b/erpnext/stock/report/stock_balance/stock_balance.py index e5ea8e0612..6369f910a4 100644 --- a/erpnext/stock/report/stock_balance/stock_balance.py +++ b/erpnext/stock/report/stock_balance/stock_balance.py @@ -3,7 +3,7 @@ from operator import itemgetter -from typing import Optional, TypedDict +from typing import Any, Dict, List, Optional, TypedDict import frappe from frappe import _ @@ -30,6 +30,9 @@ class StockBalanceFilter(TypedDict): show_variant_attributes: bool +SLEntry = Dict[str, Any] + + def execute(filters: Optional[StockBalanceFilter] = None): is_reposting_item_valuation_in_progress() if not filters: @@ -267,7 +270,7 @@ def apply_conditions(query, filters): return query -def get_stock_ledger_entries(filters: StockBalanceFilter, items): +def get_stock_ledger_entries(filters: StockBalanceFilter, items: List[str]) -> List[SLEntry]: sle = frappe.qb.DocType("Stock Ledger Entry") query = ( @@ -300,7 +303,7 @@ def get_stock_ledger_entries(filters: StockBalanceFilter, items): return query.run(as_dict=True) -def get_item_warehouse_map(filters: StockBalanceFilter, sle): +def get_item_warehouse_map(filters: StockBalanceFilter, sle: List[SLEntry]): iwb_map = {} from_date = getdate(filters.get("from_date")) to_date = getdate(filters.get("to_date")) @@ -358,7 +361,7 @@ def get_item_warehouse_map(filters: StockBalanceFilter, sle): return iwb_map -def filter_items_with_no_transactions(iwb_map, float_precision): +def filter_items_with_no_transactions(iwb_map, float_precision: float): for (company, item, warehouse) in sorted(iwb_map): qty_dict = iwb_map[(company, item, warehouse)] @@ -375,7 +378,7 @@ def filter_items_with_no_transactions(iwb_map, float_precision): return iwb_map -def get_items(filters: StockBalanceFilter): +def get_items(filters: StockBalanceFilter) -> List[str]: "Get items based on item code, item group or brand." if item_code := filters.get("item_code"): return [item_code] @@ -390,7 +393,7 @@ def get_items(filters: StockBalanceFilter): return frappe.get_all("Item", filters=item_filters, pluck="name", order_by=None) -def get_item_details(items, sle, filters: StockBalanceFilter): +def get_item_details(items: List[str], sle: List[SLEntry], filters: StockBalanceFilter): item_details = {} if not items: items = list(set(d.item_code for d in sle)) @@ -398,31 +401,33 @@ def get_item_details(items, sle, filters: StockBalanceFilter): if not items: return item_details - cf_field = cf_join = "" - if filters.get("include_uom"): - cf_field = ", ucd.conversion_factor" - cf_join = ( - "left join `tabUOM Conversion Detail` ucd on ucd.parent=item.name and ucd.uom=%s" - % frappe.db.escape(filters.get("include_uom")) - ) + item_table = frappe.qb.DocType("Item") - res = frappe.db.sql( - """ - select - item.name, item.item_name, item.description, item.item_group, item.brand, item.stock_uom %s - from - `tabItem` item - %s - where - item.name in (%s) - """ - % (cf_field, cf_join, ",".join(["%s"] * len(items))), - items, - as_dict=1, + query = ( + frappe.qb.from_(item_table) + .select( + item_table.name, + item_table.item_name, + item_table.description, + item_table.item_group, + item_table.brand, + item_table.stock_uom, + ) + .where(item_table.name.isin(items)) ) - for item in res: - item_details.setdefault(item.name, item) + if uom := filters.get("include_uom"): + uom_conv_detail = frappe.qb.DocType("UOM Conversion Detail") + query = ( + query.left_join(uom_conv_detail) + .on((uom_conv_detail.parent == item_table.name) & (uom_conv_detail.uom == uom)) + .select(uom_conv_detail.conversion_factor) + ) + + result = query.run(as_dict=1) + + for item_table in result: + item_details.setdefault(item_table.name, item_table) if filters.get("show_variant_attributes"): variant_values = get_variant_values_for(list(item_details)) @@ -435,21 +440,16 @@ def get_item_reorder_details(items): item_reorder_details = frappe._dict() if items: - item_reorder_details = frappe.db.sql( - """ - select parent, warehouse, warehouse_reorder_qty, warehouse_reorder_level - from `tabItem Reorder` - where parent in ({0}) - """.format( - ", ".join(frappe.db.escape(i, percent=False) for i in items) - ), - as_dict=1, + item_reorder_details = frappe.get_all( + "Item Reorder", + ["parent", "warehouse", "warehouse_reorder_qty", "warehouse_reorder_level"], + filters={"parent": ("in", items)}, ) return dict((d.parent + d.warehouse, d) for d in item_reorder_details) -def get_variants_attributes(): +def get_variants_attributes() -> List[str]: """Return all item variant attributes.""" return frappe.get_all("Item Attribute", pluck="name") @@ -457,14 +457,16 @@ def get_variants_attributes(): def get_variant_values_for(items): """Returns variant values for items.""" attribute_map = {} - for attr in frappe.db.sql( - """select parent, attribute, attribute_value - from `tabItem Variant Attribute` where parent in (%s) - """ - % ", ".join(["%s"] * len(items)), - tuple(items), - as_dict=1, - ): + + attribute_info = frappe.get_all( + "Item Variant Attribute", + ["parent", "attribute", "attribute_value"], + { + "parent": ("in", items), + }, + ) + + for attr in attribute_info: attribute_map.setdefault(attr["parent"], {}) attribute_map[attr["parent"]].update({attr["attribute"]: attr["attribute_value"]}) diff --git a/erpnext/stock/report/stock_balance/test_stock_balance.py b/erpnext/stock/report/stock_balance/test_stock_balance.py index 09054fb972..e963de293a 100644 --- a/erpnext/stock/report/stock_balance/test_stock_balance.py +++ b/erpnext/stock/report/stock_balance/test_stock_balance.py @@ -11,7 +11,8 @@ from erpnext.stock.report.stock_balance.stock_balance import execute def stock_balance(filters): - return list(map(_dict, execute(filters)[1])) + """Get rows from stock balance report""" + return [_dict(row) for row in execute(filters)[1]] class TestStockBalance(FrappeTestCase):