refactor: rewrite stock balance query to QB
This commit is contained in:
parent
8a499e95d3
commit
febc74a21b
@ -7,11 +7,13 @@ from typing import Optional, TypedDict
|
|||||||
|
|
||||||
import frappe
|
import frappe
|
||||||
from frappe import _
|
from frappe import _
|
||||||
|
from frappe.query_builder.functions import CombineDatetime
|
||||||
from frappe.utils import cint, date_diff, flt, getdate
|
from frappe.utils import cint, date_diff, flt, getdate
|
||||||
|
from frappe.utils.nestedset import get_descendants_of
|
||||||
|
from pypika.terms import ExistsCriterion
|
||||||
|
|
||||||
import erpnext
|
import erpnext
|
||||||
from erpnext.stock.report.stock_ageing.stock_ageing import FIFOSlots, get_average_age
|
from erpnext.stock.report.stock_ageing.stock_ageing import FIFOSlots, get_average_age
|
||||||
from erpnext.stock.report.stock_ledger.stock_ledger import get_item_group_condition
|
|
||||||
from erpnext.stock.utils import add_additional_uom_columns, is_reposting_item_valuation_in_progress
|
from erpnext.stock.utils import add_additional_uom_columns, is_reposting_item_valuation_in_progress
|
||||||
|
|
||||||
|
|
||||||
@ -33,8 +35,6 @@ def execute(filters: Optional[StockBalanceFilter] = None):
|
|||||||
if not filters:
|
if not filters:
|
||||||
filters = {}
|
filters = {}
|
||||||
|
|
||||||
to_date = filters.get("to_date")
|
|
||||||
|
|
||||||
if filters.get("company"):
|
if filters.get("company"):
|
||||||
company_currency = erpnext.get_company_currency(filters.get("company"))
|
company_currency = erpnext.get_company_currency(filters.get("company"))
|
||||||
else:
|
else:
|
||||||
@ -62,6 +62,7 @@ def execute(filters: Optional[StockBalanceFilter] = None):
|
|||||||
|
|
||||||
_func = itemgetter(1)
|
_func = itemgetter(1)
|
||||||
|
|
||||||
|
to_date = filters.get("to_date")
|
||||||
for (company, item, warehouse) in sorted(iwb_map):
|
for (company, item, warehouse) in sorted(iwb_map):
|
||||||
if item_map.get(item):
|
if item_map.get(item):
|
||||||
qty_dict = iwb_map[(company, item, warehouse)]
|
qty_dict = iwb_map[(company, item, warehouse)]
|
||||||
@ -229,63 +230,74 @@ def get_columns(filters: StockBalanceFilter):
|
|||||||
return columns
|
return columns
|
||||||
|
|
||||||
|
|
||||||
def get_conditions(filters: StockBalanceFilter):
|
def apply_conditions(query, filters):
|
||||||
conditions = ""
|
sle = frappe.qb.DocType("Stock Ledger Entry")
|
||||||
|
warehouse_table = frappe.qb.DocType("Warehouse")
|
||||||
|
|
||||||
if not filters.get("from_date"):
|
if not filters.get("from_date"):
|
||||||
frappe.throw(_("'From Date' is required"))
|
frappe.throw(_("'From Date' is required"))
|
||||||
|
|
||||||
if filters.get("to_date"):
|
if to_date := filters.get("to_date"):
|
||||||
conditions += " and sle.posting_date <= %s" % frappe.db.escape(filters.get("to_date"))
|
query = query.where(sle.posting_date <= to_date)
|
||||||
else:
|
else:
|
||||||
frappe.throw(_("'To Date' is required"))
|
frappe.throw(_("'To Date' is required"))
|
||||||
|
|
||||||
if filters.get("company"):
|
if company := filters.get("company"):
|
||||||
conditions += " and sle.company = %s" % frappe.db.escape(filters.get("company"))
|
query = query.where(sle.company == company)
|
||||||
|
|
||||||
if filters.get("warehouse"):
|
if warehouse := filters.get("warehouse"):
|
||||||
warehouse_details = frappe.db.get_value(
|
lft, rgt = frappe.db.get_value("Warehouse", warehouse, ["lft", "rgt"])
|
||||||
"Warehouse", filters.get("warehouse"), ["lft", "rgt"], as_dict=1
|
chilren_subquery = (
|
||||||
|
frappe.qb.from_(warehouse_table)
|
||||||
|
.select(warehouse_table.name)
|
||||||
|
.where(
|
||||||
|
(warehouse_table.lft >= lft)
|
||||||
|
& (warehouse_table.rgt <= rgt)
|
||||||
|
& (warehouse_table.name == sle.warehouse)
|
||||||
)
|
)
|
||||||
if warehouse_details:
|
)
|
||||||
conditions += (
|
query = query.where(ExistsCriterion(chilren_subquery))
|
||||||
" and exists (select name from `tabWarehouse` wh \
|
elif warehouse_type := filters.get("warehouse_type"):
|
||||||
where wh.lft >= %s and wh.rgt <= %s and sle.warehouse = wh.name)"
|
query = (
|
||||||
% (warehouse_details.lft, warehouse_details.rgt)
|
query.join(warehouse_table)
|
||||||
|
.on(warehouse_table.name == sle.warehouse)
|
||||||
|
.where(warehouse_table.warehouse_type == warehouse_type)
|
||||||
)
|
)
|
||||||
|
|
||||||
if filters.get("warehouse_type") and not filters.get("warehouse"):
|
return query
|
||||||
conditions += (
|
|
||||||
" and exists (select name from `tabWarehouse` wh \
|
|
||||||
where wh.warehouse_type = '%s' and sle.warehouse = wh.name)"
|
|
||||||
% (filters.get("warehouse_type"))
|
|
||||||
)
|
|
||||||
|
|
||||||
return conditions
|
|
||||||
|
|
||||||
|
|
||||||
def get_stock_ledger_entries(filters: StockBalanceFilter, items):
|
def get_stock_ledger_entries(filters: StockBalanceFilter, items):
|
||||||
item_conditions_sql = ""
|
sle = frappe.qb.DocType("Stock Ledger Entry")
|
||||||
|
|
||||||
|
query = (
|
||||||
|
frappe.qb.from_(sle)
|
||||||
|
.select(
|
||||||
|
sle.item_code,
|
||||||
|
sle.warehouse,
|
||||||
|
sle.posting_date,
|
||||||
|
sle.actual_qty,
|
||||||
|
sle.valuation_rate,
|
||||||
|
sle.company,
|
||||||
|
sle.voucher_type,
|
||||||
|
sle.qty_after_transaction,
|
||||||
|
sle.stock_value_difference,
|
||||||
|
sle.item_code.as_("name"),
|
||||||
|
sle.voucher_no,
|
||||||
|
sle.stock_value,
|
||||||
|
sle.batch_no,
|
||||||
|
)
|
||||||
|
.where((sle.docstatus < 2) & (sle.is_cancelled == 0))
|
||||||
|
.orderby(CombineDatetime(sle.posting_date, sle.posting_time))
|
||||||
|
.orderby(sle.creation)
|
||||||
|
.orderby(sle.actual_qty)
|
||||||
|
)
|
||||||
|
|
||||||
if items:
|
if items:
|
||||||
item_conditions_sql = " and sle.item_code in ({})".format(
|
query = query.where(sle.item_code.isin(items))
|
||||||
", ".join(frappe.db.escape(i, percent=False) for i in items)
|
|
||||||
)
|
|
||||||
|
|
||||||
conditions = get_conditions(filters)
|
query = apply_conditions(query, filters)
|
||||||
|
return query.run(as_dict=True)
|
||||||
return frappe.db.sql(
|
|
||||||
"""
|
|
||||||
select
|
|
||||||
sle.item_code, warehouse, sle.posting_date, sle.actual_qty, sle.valuation_rate,
|
|
||||||
sle.company, sle.voucher_type, sle.qty_after_transaction, sle.stock_value_difference,
|
|
||||||
sle.item_code as name, sle.voucher_no, sle.stock_value, sle.batch_no
|
|
||||||
from
|
|
||||||
`tabStock Ledger Entry` sle
|
|
||||||
where sle.docstatus < 2 %s %s
|
|
||||||
and is_cancelled = 0
|
|
||||||
order by sle.posting_date, sle.posting_time, sle.creation, sle.actual_qty"""
|
|
||||||
% (item_conditions_sql, conditions), # nosec
|
|
||||||
as_dict=1,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def get_item_warehouse_map(filters: StockBalanceFilter, sle):
|
def get_item_warehouse_map(filters: StockBalanceFilter, sle):
|
||||||
@ -365,21 +377,17 @@ def filter_items_with_no_transactions(iwb_map, float_precision):
|
|||||||
|
|
||||||
def get_items(filters: StockBalanceFilter):
|
def get_items(filters: StockBalanceFilter):
|
||||||
"Get items based on item code, item group or brand."
|
"Get items based on item code, item group or brand."
|
||||||
conditions = []
|
if item_code := filters.get("item_code"):
|
||||||
if filters.get("item_code"):
|
return [item_code]
|
||||||
conditions.append("item.name=%(item_code)s")
|
|
||||||
else:
|
else:
|
||||||
if filters.get("item_group"):
|
item_filters = {}
|
||||||
conditions.append(get_item_group_condition(filters.get("item_group")))
|
if item_group := filters.get("item_group"):
|
||||||
if filters.get("brand"): # used in stock analytics report
|
children = get_descendants_of("Item Group", item_group, ignore_permissions=True)
|
||||||
conditions.append("item.brand=%(brand)s")
|
item_filters["item_group"] = ("in", children + [item_group])
|
||||||
|
if brand := filters.get("brand"):
|
||||||
|
item_filters["brand"] = brand
|
||||||
|
|
||||||
items = []
|
return frappe.get_all("Item", filters=item_filters, pluck="name", order_by=None, debug=1)
|
||||||
if conditions:
|
|
||||||
items = frappe.db.sql_list(
|
|
||||||
"""select name from `tabItem` item where {}""".format(" and ".join(conditions)), filters
|
|
||||||
)
|
|
||||||
return items
|
|
||||||
|
|
||||||
|
|
||||||
def get_item_details(items, sle, filters: StockBalanceFilter):
|
def get_item_details(items, sle, filters: StockBalanceFilter):
|
||||||
@ -416,7 +424,7 @@ def get_item_details(items, sle, filters: StockBalanceFilter):
|
|||||||
for item in res:
|
for item in res:
|
||||||
item_details.setdefault(item.name, item)
|
item_details.setdefault(item.name, item)
|
||||||
|
|
||||||
if filters.get("show_variant_attributes", 0) == 1:
|
if filters.get("show_variant_attributes"):
|
||||||
variant_values = get_variant_values_for(list(item_details))
|
variant_values = get_variant_values_for(list(item_details))
|
||||||
item_details = {k: v.update(variant_values.get(k, {})) for k, v in item_details.items()}
|
item_details = {k: v.update(variant_values.get(k, {})) for k, v in item_details.items()}
|
||||||
|
|
||||||
@ -443,7 +451,7 @@ def get_item_reorder_details(items):
|
|||||||
|
|
||||||
def get_variants_attributes():
|
def get_variants_attributes():
|
||||||
"""Return all item variant attributes."""
|
"""Return all item variant attributes."""
|
||||||
return [i.name for i in frappe.get_all("Item Attribute")]
|
return frappe.get_all("Item Attribute", pluck="name")
|
||||||
|
|
||||||
|
|
||||||
def get_variant_values_for(items):
|
def get_variant_values_for(items):
|
||||||
|
@ -38,11 +38,9 @@ class TestStockBalance(FrappeTestCase):
|
|||||||
def generate_stock_ledger(self, item_code: str, movements):
|
def generate_stock_ledger(self, item_code: str, movements):
|
||||||
|
|
||||||
for movement in map(_dict, movements):
|
for movement in map(_dict, movements):
|
||||||
make_stock_entry(
|
if "to_warehouse" not in movement:
|
||||||
item_code=item_code,
|
movement.to_warehouse = "_Test Warehouse - _TC"
|
||||||
**movement,
|
make_stock_entry(item_code=item_code, **movement)
|
||||||
to_warehouse=movement.to_warehouse or "_Test Warehouse - _TC",
|
|
||||||
)
|
|
||||||
|
|
||||||
def assertInvariants(self, rows):
|
def assertInvariants(self, rows):
|
||||||
last_balance = frappe.db.sql(
|
last_balance = frappe.db.sql(
|
||||||
@ -135,3 +133,20 @@ class TestStockBalance(FrappeTestCase):
|
|||||||
|
|
||||||
rows = stock_balance(self.filters.update({"include_uom": "Box"}))
|
rows = stock_balance(self.filters.update({"include_uom": "Box"}))
|
||||||
self.assertEqual(rows[0].bal_qty_alt, 1)
|
self.assertEqual(rows[0].bal_qty_alt, 1)
|
||||||
|
|
||||||
|
def test_item_group(self):
|
||||||
|
self.filters.pop("item_code", None)
|
||||||
|
rows = stock_balance(self.filters.update({"item_group": self.item.item_group}))
|
||||||
|
self.assertTrue(all(r.item_group == self.item.item_group for r in rows))
|
||||||
|
|
||||||
|
def test_child_warehouse_balances(self):
|
||||||
|
# This is default
|
||||||
|
self.generate_stock_ledger(self.item.name, [_dict(qty=5, rate=10, to_warehouse="Stores - _TC")])
|
||||||
|
|
||||||
|
self.filters.pop("item_code", None)
|
||||||
|
rows = stock_balance(self.filters.update({"warehouse": "All Warehouses - _TC"}))
|
||||||
|
|
||||||
|
self.assertTrue(
|
||||||
|
any(r.item_code == self.item.name and r.warehouse == "Stores - _TC" for r in rows),
|
||||||
|
msg=f"Expected child warehouse balances \n{rows}",
|
||||||
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user