refactor: Batch Item Expiry Status report (#36106)

This commit is contained in:
s-aga-r 2023-07-13 05:44:58 +05:30 committed by GitHub
parent 596a14e34f
commit 5f307f92e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 74 additions and 87 deletions

View File

@ -9,13 +9,27 @@ frappe.query_reports["Batch Item Expiry Status"] = {
"fieldtype": "Date",
"width": "80",
"default": frappe.sys_defaults.year_start_date,
"reqd": 1,
},
{
"fieldname":"to_date",
"label": __("To Date"),
"fieldtype": "Date",
"width": "80",
"default": frappe.datetime.get_today()
"default": frappe.datetime.get_today(),
"reqd": 1,
},
{
"fieldname":"item",
"label": __("Item"),
"fieldtype": "Link",
"options": "Item",
"width": "100",
"get_query": function () {
return {
filters: {"has_batch_no": 1}
}
}
}
]
}

View File

@ -4,113 +4,86 @@
import frappe
from frappe import _
from frappe.query_builder.functions import IfNull
from frappe.utils import cint, getdate
from frappe.query_builder.functions import Date
def execute(filters=None):
if not filters:
filters = {}
validate_filters(filters)
float_precision = cint(frappe.db.get_default("float_precision")) or 3
columns = get_columns(filters)
item_map = get_item_details(filters)
iwb_map = get_item_warehouse_batch_map(filters, float_precision)
data = []
for item in sorted(iwb_map):
for wh in sorted(iwb_map[item]):
for batch in sorted(iwb_map[item][wh]):
qty_dict = iwb_map[item][wh][batch]
data.append(
[
item,
item_map[item]["item_name"],
item_map[item]["description"],
wh,
batch,
frappe.db.get_value("Batch", batch, "expiry_date"),
qty_dict.expiry_status,
]
)
columns = get_columns()
data = get_data(filters)
return columns, data
def get_columns(filters):
"""return columns based on filters"""
def validate_filters(filters):
if not filters:
frappe.throw(_("Please select the required filters"))
columns = (
[_("Item") + ":Link/Item:100"]
+ [_("Item Name") + "::150"]
+ [_("Description") + "::150"]
+ [_("Warehouse") + ":Link/Warehouse:100"]
+ [_("Batch") + ":Link/Batch:100"]
+ [_("Expires On") + ":Date:90"]
+ [_("Expiry (In Days)") + ":Int:120"]
)
return columns
def get_stock_ledger_entries(filters):
if not filters.get("from_date"):
frappe.throw(_("'From Date' is required"))
if not filters.get("to_date"):
frappe.throw(_("'To Date' is required"))
sle = frappe.qb.DocType("Stock Ledger Entry")
query = (
frappe.qb.from_(sle)
.select(sle.item_code, sle.batch_no, sle.warehouse, sle.posting_date, sle.actual_qty)
.where(
(sle.is_cancelled == 0)
& (sle.docstatus < 2)
& (IfNull(sle.batch_no, "") != "")
& (sle.posting_date <= filters["to_date"])
)
.orderby(sle.item_code, sle.warehouse)
def get_columns():
return (
[_("Item") + ":Link/Item:150"]
+ [_("Item Name") + "::150"]
+ [_("Batch") + ":Link/Batch:150"]
+ [_("Stock UOM") + ":Link/UOM:100"]
+ [_("Quantity") + ":Float:100"]
+ [_("Expires On") + ":Date:100"]
+ [_("Expiry (In Days)") + ":Int:130"]
)
return query.run(as_dict=True)
def get_data(filters):
data = []
def get_item_warehouse_batch_map(filters, float_precision):
sle = get_stock_ledger_entries(filters)
iwb_map = {}
from_date = getdate(filters["from_date"])
to_date = getdate(filters["to_date"])
for d in sle:
iwb_map.setdefault(d.item_code, {}).setdefault(d.warehouse, {}).setdefault(
d.batch_no, frappe._dict({"expires_on": None, "expiry_status": None})
for batch in get_batch_details(filters):
data.append(
[
batch.item,
batch.item_name,
batch.name,
batch.stock_uom,
batch.batch_qty,
batch.expiry_date,
max((batch.expiry_date - frappe.utils.datetime.date.today()).days, 0)
if batch.expiry_date
else None,
]
)
qty_dict = iwb_map[d.item_code][d.warehouse][d.batch_no]
expiry_date_unicode = frappe.db.get_value("Batch", d.batch_no, "expiry_date")
qty_dict.expires_on = expiry_date_unicode
exp_date = frappe.utils.data.getdate(expiry_date_unicode)
qty_dict.expires_on = exp_date
expires_in_days = (exp_date - frappe.utils.datetime.date.today()).days
if expires_in_days > 0:
qty_dict.expiry_status = expires_in_days
else:
qty_dict.expiry_status = 0
return iwb_map
return data
def get_item_details(filters):
item_map = {}
for d in (frappe.qb.from_("Item").select("name", "item_name", "description")).run(as_dict=True):
item_map.setdefault(d.name, d)
def get_batch_details(filters):
batch = frappe.qb.DocType("Batch")
query = (
frappe.qb.from_(batch)
.select(
batch.name,
batch.creation,
batch.expiry_date,
batch.item,
batch.item_name,
batch.stock_uom,
batch.batch_qty,
)
.where(
(batch.disabled == 0)
& (batch.batch_qty > 0)
& (
(Date(batch.creation) >= filters["from_date"]) & (Date(batch.creation) <= filters["to_date"])
)
)
.orderby(batch.creation)
)
return item_map
if filters.get("item"):
query = query.where(batch.item == filters["item"])
return query.run(as_dict=True)