refactor: convert queries to ORM/QB, add types

This commit is contained in:
Ankush Menat 2022-04-14 13:52:07 +05:30
parent ba29323e11
commit 9af2d68945
2 changed files with 50 additions and 47 deletions

View File

@ -3,7 +3,7 @@
from operator import itemgetter from operator import itemgetter
from typing import Optional, TypedDict from typing import Any, Dict, List, Optional, TypedDict
import frappe import frappe
from frappe import _ from frappe import _
@ -30,6 +30,9 @@ class StockBalanceFilter(TypedDict):
show_variant_attributes: bool show_variant_attributes: bool
SLEntry = Dict[str, Any]
def execute(filters: Optional[StockBalanceFilter] = None): def execute(filters: Optional[StockBalanceFilter] = None):
is_reposting_item_valuation_in_progress() is_reposting_item_valuation_in_progress()
if not filters: if not filters:
@ -267,7 +270,7 @@ def apply_conditions(query, filters):
return query return query
def get_stock_ledger_entries(filters: StockBalanceFilter, items): def get_stock_ledger_entries(filters: StockBalanceFilter, items: List[str]) -> List[SLEntry]:
sle = frappe.qb.DocType("Stock Ledger Entry") sle = frappe.qb.DocType("Stock Ledger Entry")
query = ( query = (
@ -300,7 +303,7 @@ def get_stock_ledger_entries(filters: StockBalanceFilter, items):
return query.run(as_dict=True) return query.run(as_dict=True)
def get_item_warehouse_map(filters: StockBalanceFilter, sle): def get_item_warehouse_map(filters: StockBalanceFilter, sle: List[SLEntry]):
iwb_map = {} iwb_map = {}
from_date = getdate(filters.get("from_date")) from_date = getdate(filters.get("from_date"))
to_date = getdate(filters.get("to_date")) to_date = getdate(filters.get("to_date"))
@ -358,7 +361,7 @@ def get_item_warehouse_map(filters: StockBalanceFilter, sle):
return iwb_map return iwb_map
def filter_items_with_no_transactions(iwb_map, float_precision): def filter_items_with_no_transactions(iwb_map, float_precision: float):
for (company, item, warehouse) in sorted(iwb_map): for (company, item, warehouse) in sorted(iwb_map):
qty_dict = iwb_map[(company, item, warehouse)] qty_dict = iwb_map[(company, item, warehouse)]
@ -375,7 +378,7 @@ def filter_items_with_no_transactions(iwb_map, float_precision):
return iwb_map return iwb_map
def get_items(filters: StockBalanceFilter): def get_items(filters: StockBalanceFilter) -> List[str]:
"Get items based on item code, item group or brand." "Get items based on item code, item group or brand."
if item_code := filters.get("item_code"): if item_code := filters.get("item_code"):
return [item_code] return [item_code]
@ -390,7 +393,7 @@ def get_items(filters: StockBalanceFilter):
return frappe.get_all("Item", filters=item_filters, pluck="name", order_by=None) return frappe.get_all("Item", filters=item_filters, pluck="name", order_by=None)
def get_item_details(items, sle, filters: StockBalanceFilter): def get_item_details(items: List[str], sle: List[SLEntry], filters: StockBalanceFilter):
item_details = {} item_details = {}
if not items: if not items:
items = list(set(d.item_code for d in sle)) items = list(set(d.item_code for d in sle))
@ -398,31 +401,33 @@ def get_item_details(items, sle, filters: StockBalanceFilter):
if not items: if not items:
return item_details return item_details
cf_field = cf_join = "" item_table = frappe.qb.DocType("Item")
if filters.get("include_uom"):
cf_field = ", ucd.conversion_factor"
cf_join = (
"left join `tabUOM Conversion Detail` ucd on ucd.parent=item.name and ucd.uom=%s"
% frappe.db.escape(filters.get("include_uom"))
)
res = frappe.db.sql( query = (
""" frappe.qb.from_(item_table)
select .select(
item.name, item.item_name, item.description, item.item_group, item.brand, item.stock_uom %s item_table.name,
from item_table.item_name,
`tabItem` item item_table.description,
%s item_table.item_group,
where item_table.brand,
item.name in (%s) item_table.stock_uom,
""" )
% (cf_field, cf_join, ",".join(["%s"] * len(items))), .where(item_table.name.isin(items))
items,
as_dict=1,
) )
for item in res: if uom := filters.get("include_uom"):
item_details.setdefault(item.name, item) uom_conv_detail = frappe.qb.DocType("UOM Conversion Detail")
query = (
query.left_join(uom_conv_detail)
.on((uom_conv_detail.parent == item_table.name) & (uom_conv_detail.uom == uom))
.select(uom_conv_detail.conversion_factor)
)
result = query.run(as_dict=1)
for item_table in result:
item_details.setdefault(item_table.name, item_table)
if filters.get("show_variant_attributes"): if filters.get("show_variant_attributes"):
variant_values = get_variant_values_for(list(item_details)) variant_values = get_variant_values_for(list(item_details))
@ -435,21 +440,16 @@ def get_item_reorder_details(items):
item_reorder_details = frappe._dict() item_reorder_details = frappe._dict()
if items: if items:
item_reorder_details = frappe.db.sql( item_reorder_details = frappe.get_all(
""" "Item Reorder",
select parent, warehouse, warehouse_reorder_qty, warehouse_reorder_level ["parent", "warehouse", "warehouse_reorder_qty", "warehouse_reorder_level"],
from `tabItem Reorder` filters={"parent": ("in", items)},
where parent in ({0})
""".format(
", ".join(frappe.db.escape(i, percent=False) for i in items)
),
as_dict=1,
) )
return dict((d.parent + d.warehouse, d) for d in item_reorder_details) return dict((d.parent + d.warehouse, d) for d in item_reorder_details)
def get_variants_attributes(): def get_variants_attributes() -> List[str]:
"""Return all item variant attributes.""" """Return all item variant attributes."""
return frappe.get_all("Item Attribute", pluck="name") return frappe.get_all("Item Attribute", pluck="name")
@ -457,14 +457,16 @@ def get_variants_attributes():
def get_variant_values_for(items): def get_variant_values_for(items):
"""Returns variant values for items.""" """Returns variant values for items."""
attribute_map = {} attribute_map = {}
for attr in frappe.db.sql(
"""select parent, attribute, attribute_value attribute_info = frappe.get_all(
from `tabItem Variant Attribute` where parent in (%s) "Item Variant Attribute",
""" ["parent", "attribute", "attribute_value"],
% ", ".join(["%s"] * len(items)), {
tuple(items), "parent": ("in", items),
as_dict=1, },
): )
for attr in attribute_info:
attribute_map.setdefault(attr["parent"], {}) attribute_map.setdefault(attr["parent"], {})
attribute_map[attr["parent"]].update({attr["attribute"]: attr["attribute_value"]}) attribute_map[attr["parent"]].update({attr["attribute"]: attr["attribute_value"]})

View File

@ -11,7 +11,8 @@ from erpnext.stock.report.stock_balance.stock_balance import execute
def stock_balance(filters): def stock_balance(filters):
return list(map(_dict, execute(filters)[1])) """Get rows from stock balance report"""
return [_dict(row) for row in execute(filters)[1]]
class TestStockBalance(FrappeTestCase): class TestStockBalance(FrappeTestCase):