f62b3207ff
- Back to back stock recos cause incorrect qty calculation across warehouses - Hard to differentiate how much of the qty is reset by the reco - Maintain Queue and balances warehouse wise and later aggregate for accurate values
432 lines
12 KiB
Python
432 lines
12 KiB
Python
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
|
|
# License: GNU General Public License v3. See license.txt
|
|
|
|
|
|
from operator import itemgetter
|
|
from typing import Dict, List, Tuple, Union
|
|
|
|
import frappe
|
|
from frappe import _
|
|
from frappe.utils import cint, date_diff, flt
|
|
|
|
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
|
|
|
|
Filters = frappe._dict
|
|
|
|
def execute(filters: Filters = None) -> Tuple:
|
|
to_date = filters["to_date"]
|
|
columns = get_columns(filters)
|
|
|
|
item_details = FIFOSlots(filters).generate()
|
|
data = format_report_data(filters, item_details, to_date)
|
|
|
|
chart_data = get_chart_data(data, filters)
|
|
|
|
return columns, data, None, chart_data
|
|
|
|
def format_report_data(filters: Filters, item_details: Dict, to_date: str) -> List[Dict]:
|
|
"Returns ordered, formatted data with ranges."
|
|
_func = itemgetter(1)
|
|
data = []
|
|
|
|
for item, item_dict in item_details.items():
|
|
earliest_age, latest_age = 0, 0
|
|
details = item_dict["details"]
|
|
|
|
fifo_queue = sorted(filter(_func, item_dict["fifo_queue"]), key=_func)
|
|
|
|
if not fifo_queue: continue
|
|
|
|
average_age = get_average_age(fifo_queue, to_date)
|
|
earliest_age = date_diff(to_date, fifo_queue[0][1])
|
|
latest_age = date_diff(to_date, fifo_queue[-1][1])
|
|
range1, range2, range3, above_range3 = get_range_age(filters, fifo_queue, to_date, item_dict)
|
|
|
|
row = [details.name, details.item_name, details.description,
|
|
details.item_group, details.brand]
|
|
|
|
if filters.get("show_warehouse_wise_stock"):
|
|
row.append(details.warehouse)
|
|
|
|
row.extend([item_dict.get("total_qty"), average_age,
|
|
range1, range2, range3, above_range3,
|
|
earliest_age, latest_age,
|
|
details.stock_uom])
|
|
|
|
data.append(row)
|
|
|
|
return data
|
|
|
|
def get_average_age(fifo_queue: List, to_date: str) -> float:
|
|
batch_age = age_qty = total_qty = 0.0
|
|
for batch in fifo_queue:
|
|
batch_age = date_diff(to_date, batch[1])
|
|
|
|
if isinstance(batch[0], (int, float)):
|
|
age_qty += batch_age * batch[0]
|
|
total_qty += batch[0]
|
|
else:
|
|
age_qty += batch_age * 1
|
|
total_qty += 1
|
|
|
|
return flt(age_qty / total_qty, 2) if total_qty else 0.0
|
|
|
|
def get_range_age(filters: Filters, fifo_queue: List, to_date: str, item_dict: Dict) -> Tuple:
|
|
range1 = range2 = range3 = above_range3 = 0.0
|
|
|
|
for item in fifo_queue:
|
|
age = date_diff(to_date, item[1])
|
|
qty = flt(item[0]) if not item_dict["has_serial_no"] else 1.0
|
|
|
|
if age <= filters.range1:
|
|
range1 += qty
|
|
elif age <= filters.range2:
|
|
range2 += qty
|
|
elif age <= filters.range3:
|
|
range3 += qty
|
|
else:
|
|
above_range3 += qty
|
|
|
|
return range1, range2, range3, above_range3
|
|
|
|
def get_columns(filters: Filters) -> List[Dict]:
|
|
range_columns = []
|
|
setup_ageing_columns(filters, range_columns)
|
|
columns = [
|
|
{
|
|
"label": _("Item Code"),
|
|
"fieldname": "item_code",
|
|
"fieldtype": "Link",
|
|
"options": "Item",
|
|
"width": 100
|
|
},
|
|
{
|
|
"label": _("Item Name"),
|
|
"fieldname": "item_name",
|
|
"fieldtype": "Data",
|
|
"width": 100
|
|
},
|
|
{
|
|
"label": _("Description"),
|
|
"fieldname": "description",
|
|
"fieldtype": "Data",
|
|
"width": 200
|
|
},
|
|
{
|
|
"label": _("Item Group"),
|
|
"fieldname": "item_group",
|
|
"fieldtype": "Link",
|
|
"options": "Item Group",
|
|
"width": 100
|
|
},
|
|
{
|
|
"label": _("Brand"),
|
|
"fieldname": "brand",
|
|
"fieldtype": "Link",
|
|
"options": "Brand",
|
|
"width": 100
|
|
}]
|
|
|
|
if filters.get("show_warehouse_wise_stock"):
|
|
columns +=[{
|
|
"label": _("Warehouse"),
|
|
"fieldname": "warehouse",
|
|
"fieldtype": "Link",
|
|
"options": "Warehouse",
|
|
"width": 100
|
|
}]
|
|
|
|
columns.extend([
|
|
{
|
|
"label": _("Available Qty"),
|
|
"fieldname": "qty",
|
|
"fieldtype": "Float",
|
|
"width": 100
|
|
},
|
|
{
|
|
"label": _("Average Age"),
|
|
"fieldname": "average_age",
|
|
"fieldtype": "Float",
|
|
"width": 100
|
|
}])
|
|
columns.extend(range_columns)
|
|
columns.extend([
|
|
{
|
|
"label": _("Earliest"),
|
|
"fieldname": "earliest",
|
|
"fieldtype": "Int",
|
|
"width": 80
|
|
},
|
|
{
|
|
"label": _("Latest"),
|
|
"fieldname": "latest",
|
|
"fieldtype": "Int",
|
|
"width": 80
|
|
},
|
|
{
|
|
"label": _("UOM"),
|
|
"fieldname": "uom",
|
|
"fieldtype": "Link",
|
|
"options": "UOM",
|
|
"width": 100
|
|
}
|
|
])
|
|
|
|
return columns
|
|
|
|
def get_chart_data(data: List, filters: Filters) -> Dict:
|
|
if not data:
|
|
return []
|
|
|
|
labels, datapoints = [], []
|
|
|
|
if filters.get("show_warehouse_wise_stock"):
|
|
return {}
|
|
|
|
data.sort(key = lambda row: row[6], reverse=True)
|
|
|
|
if len(data) > 10:
|
|
data = data[:10]
|
|
|
|
for row in data:
|
|
labels.append(row[0])
|
|
datapoints.append(row[6])
|
|
|
|
return {
|
|
"data" : {
|
|
"labels": labels,
|
|
"datasets": [
|
|
{
|
|
"name": _("Average Age"),
|
|
"values": datapoints
|
|
}
|
|
]
|
|
},
|
|
"type" : "bar"
|
|
}
|
|
|
|
def setup_ageing_columns(filters: Filters, range_columns: List):
|
|
ranges = [
|
|
f"0 - {filters['range1']}",
|
|
f"{cint(filters['range1']) + 1} - {cint(filters['range2'])}",
|
|
f"{cint(filters['range2']) + 1} - {cint(filters['range3'])}",
|
|
f"{cint(filters['range3']) + 1} - {_('Above')}"
|
|
]
|
|
for i, label in enumerate(ranges):
|
|
fieldname = 'range' + str(i+1)
|
|
add_column(range_columns, label=f"Age ({label})",fieldname=fieldname)
|
|
|
|
def add_column(range_columns: List, label:str, fieldname: str, fieldtype: str = 'Float', width: int = 140):
|
|
range_columns.append(dict(
|
|
label=label,
|
|
fieldname=fieldname,
|
|
fieldtype=fieldtype,
|
|
width=width
|
|
))
|
|
|
|
|
|
class FIFOSlots:
|
|
"Returns FIFO computed slots of inwarded stock as per date."
|
|
|
|
def __init__(self, filters: Dict = None , sle: List = None):
|
|
self.item_details = {}
|
|
self.transferred_item_details = {}
|
|
self.serial_no_batch_purchase_details = {}
|
|
self.filters = filters
|
|
self.sle = sle
|
|
|
|
def generate(self) -> Dict:
|
|
"""
|
|
Returns dict of the foll.g structure:
|
|
Key = Item A / (Item A, Warehouse A)
|
|
Key: {
|
|
'details' -> Dict: ** item details **,
|
|
'fifo_queue' -> List: ** list of lists containing entries/slots for existing stock,
|
|
consumed/updated and maintained via FIFO. **
|
|
}
|
|
"""
|
|
if self.sle is None:
|
|
self.sle = self.__get_stock_ledger_entries()
|
|
|
|
for d in self.sle:
|
|
key, fifo_queue, transferred_item_key = self.__init_key_stores(d)
|
|
|
|
if d.voucher_type == "Stock Reconciliation":
|
|
# get difference in qty shift as actual qty
|
|
prev_balance_qty = self.item_details[key].get("qty_after_transaction", 0)
|
|
d.actual_qty = flt(d.qty_after_transaction) - flt(prev_balance_qty)
|
|
|
|
serial_nos = get_serial_nos(d.serial_no) if d.serial_no else []
|
|
|
|
if d.actual_qty > 0:
|
|
self.__compute_incoming_stock(d, fifo_queue, transferred_item_key, serial_nos)
|
|
else:
|
|
self.__compute_outgoing_stock(d, fifo_queue, transferred_item_key, serial_nos)
|
|
|
|
self.__update_balances(d, key)
|
|
|
|
if not self.filters.get("show_warehouse_wise_stock"):
|
|
# (Item 1, WH 1), (Item 1, WH 2) => (Item 1)
|
|
self.item_details = self.__aggregate_details_by_item(self.item_details)
|
|
|
|
return self.item_details
|
|
|
|
def __init_key_stores(self, row: Dict) -> Tuple:
|
|
"Initialise keys and FIFO Queue."
|
|
|
|
key = (row.name, row.warehouse)
|
|
self.item_details.setdefault(key, {"details": row, "fifo_queue": []})
|
|
fifo_queue = self.item_details[key]["fifo_queue"]
|
|
|
|
transferred_item_key = (row.voucher_no, row.name, row.warehouse)
|
|
self.transferred_item_details.setdefault(transferred_item_key, [])
|
|
|
|
return key, fifo_queue, transferred_item_key
|
|
|
|
def __compute_incoming_stock(self, row: Dict, fifo_queue: List, transfer_key: Tuple, serial_nos: List):
|
|
"Update FIFO Queue on inward stock."
|
|
|
|
if self.transferred_item_details.get(transfer_key):
|
|
# inward/outward from same voucher, item & warehouse
|
|
slot = self.transferred_item_details[transfer_key].pop(0)
|
|
fifo_queue.append(slot)
|
|
else:
|
|
if not serial_nos:
|
|
if fifo_queue and flt(fifo_queue[0][0]) < 0:
|
|
# neutralize negative stock by adding positive stock
|
|
fifo_queue[0][0] += flt(row.actual_qty)
|
|
fifo_queue[0][1] = row.posting_date
|
|
else:
|
|
fifo_queue.append([flt(row.actual_qty), row.posting_date])
|
|
return
|
|
|
|
for serial_no in serial_nos:
|
|
if self.serial_no_batch_purchase_details.get(serial_no):
|
|
fifo_queue.append([serial_no, self.serial_no_batch_purchase_details.get(serial_no)])
|
|
else:
|
|
self.serial_no_batch_purchase_details.setdefault(serial_no, row.posting_date)
|
|
fifo_queue.append([serial_no, row.posting_date])
|
|
|
|
def __compute_outgoing_stock(self, row: Dict, fifo_queue: List, transfer_key: Tuple, serial_nos: List):
|
|
"Update FIFO Queue on outward stock."
|
|
if serial_nos:
|
|
fifo_queue[:] = [serial_no for serial_no in fifo_queue if serial_no[0] not in serial_nos]
|
|
return
|
|
|
|
qty_to_pop = abs(row.actual_qty)
|
|
while qty_to_pop:
|
|
slot = fifo_queue[0] if fifo_queue else [0, None]
|
|
if 0 < flt(slot[0]) <= qty_to_pop:
|
|
# qty to pop >= slot qty
|
|
# if +ve and not enough or exactly same balance in current slot, consume whole slot
|
|
qty_to_pop -= flt(slot[0])
|
|
self.transferred_item_details[transfer_key].append(fifo_queue.pop(0))
|
|
elif not fifo_queue:
|
|
# negative stock, no balance but qty yet to consume
|
|
fifo_queue.append([-(qty_to_pop), row.posting_date])
|
|
self.transferred_item_details[transfer_key].append([row.actual_qty, row.posting_date])
|
|
qty_to_pop = 0
|
|
else:
|
|
# qty to pop < slot qty, ample balance
|
|
# consume actual_qty from first slot
|
|
slot[0] = flt(slot[0]) - qty_to_pop
|
|
self.transferred_item_details[transfer_key].append([qty_to_pop, slot[1]])
|
|
qty_to_pop = 0
|
|
|
|
def __update_balances(self, row: Dict, key: Union[Tuple, str]):
|
|
self.item_details[key]["qty_after_transaction"] = row.qty_after_transaction
|
|
|
|
if "total_qty" not in self.item_details[key]:
|
|
self.item_details[key]["total_qty"] = row.actual_qty
|
|
else:
|
|
self.item_details[key]["total_qty"] += row.actual_qty
|
|
|
|
self.item_details[key]["has_serial_no"] = row.has_serial_no
|
|
|
|
def __aggregate_details_by_item(self, wh_wise_data: Dict) -> Dict:
|
|
"Aggregate Item-Wh wise data into single Item entry."
|
|
item_aggregated_data = {}
|
|
for key,row in wh_wise_data.items():
|
|
item = key[0]
|
|
if not item_aggregated_data.get(item):
|
|
item_aggregated_data.setdefault(item, {
|
|
"details": frappe._dict(),
|
|
"fifo_queue": [],
|
|
"qty_after_transaction": 0.0,
|
|
"total_qty": 0.0
|
|
})
|
|
item_row = item_aggregated_data.get(item)
|
|
item_row["details"].update(row["details"])
|
|
item_row["fifo_queue"].extend(row["fifo_queue"])
|
|
item_row["qty_after_transaction"] += flt(row["qty_after_transaction"])
|
|
item_row["total_qty"] += flt(row["total_qty"])
|
|
item_row["has_serial_no"] = row["has_serial_no"]
|
|
|
|
return item_aggregated_data
|
|
|
|
def __get_stock_ledger_entries(self) -> List[Dict]:
|
|
sle = frappe.qb.DocType("Stock Ledger Entry")
|
|
item = self.__get_item_query() # used as derived table in sle query
|
|
|
|
sle_query = (
|
|
frappe.qb.from_(sle).from_(item)
|
|
.select(
|
|
item.name, item.item_name, item.item_group,
|
|
item.brand, item.description,
|
|
item.stock_uom, item.has_serial_no,
|
|
sle.actual_qty, sle.posting_date,
|
|
sle.voucher_type, sle.voucher_no,
|
|
sle.serial_no, sle.batch_no,
|
|
sle.qty_after_transaction, sle.warehouse
|
|
).where(
|
|
(sle.item_code == item.name)
|
|
& (sle.company == self.filters.get("company"))
|
|
& (sle.posting_date <= self.filters.get("to_date"))
|
|
& (sle.is_cancelled != 1)
|
|
)
|
|
)
|
|
|
|
if self.filters.get("warehouse"):
|
|
sle_query = self.__get_warehouse_conditions(sle, sle_query)
|
|
|
|
sle_query = sle_query.orderby(
|
|
sle.posting_date, sle.posting_time, sle.creation, sle.actual_qty
|
|
)
|
|
|
|
return sle_query.run(as_dict=True)
|
|
|
|
def __get_item_query(self) -> str:
|
|
item_table = frappe.qb.DocType("Item")
|
|
|
|
item = frappe.qb.from_("Item").select(
|
|
"name", "item_name", "description", "stock_uom",
|
|
"brand", "item_group", "has_serial_no"
|
|
)
|
|
|
|
if self.filters.get("item_code"):
|
|
item = item.where(item_table.item_code == self.filters.get("item_code"))
|
|
|
|
if self.filters.get("brand"):
|
|
item = item.where(item_table.brand == self.filters.get("brand"))
|
|
|
|
return item
|
|
|
|
def __get_warehouse_conditions(self, sle, sle_query) -> str:
|
|
warehouse = frappe.qb.DocType("Warehouse")
|
|
lft, rgt = frappe.db.get_value(
|
|
"Warehouse",
|
|
self.filters.get("warehouse"),
|
|
['lft', 'rgt']
|
|
)
|
|
|
|
warehouse_results = (
|
|
frappe.qb.from_(warehouse)
|
|
.select("name").where(
|
|
(warehouse.lft >= lft)
|
|
& (warehouse.rgt <= rgt)
|
|
).run()
|
|
)
|
|
warehouse_results = [x[0] for x in warehouse_results]
|
|
|
|
return sle_query.where(sle.warehouse.isin(warehouse_results))
|