brotherton-erpnext/erpnext/stock/report/stock_ledger/stock_ledger.py
2022-07-20 16:02:13 +05:30

461 lines
12 KiB
Python

# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
import frappe
from frappe import _
from frappe.query_builder.functions import CombineDatetime
from frappe.utils import cint, flt
from pypika.terms import ExistsCriterion
from erpnext.stock.doctype.inventory_dimension.inventory_dimension import get_inventory_dimensions
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
from erpnext.stock.doctype.stock_reconciliation.stock_reconciliation import get_stock_balance_for
from erpnext.stock.utils import (
is_reposting_item_valuation_in_progress,
update_included_uom_in_report,
)
def execute(filters=None):
is_reposting_item_valuation_in_progress()
include_uom = filters.get("include_uom")
columns = get_columns(filters)
items = get_items(filters)
sl_entries = get_stock_ledger_entries(filters, items)
item_details = get_item_details(items, sl_entries, include_uom)
opening_row = get_opening_balance(filters, columns, sl_entries)
precision = cint(frappe.db.get_single_value("System Settings", "float_precision"))
data = []
conversion_factors = []
if opening_row:
data.append(opening_row)
conversion_factors.append(0)
actual_qty = stock_value = 0
available_serial_nos = {}
inventory_dimension_filters_applied = check_inventory_dimension_filters_applied(filters)
for sle in sl_entries:
item_detail = item_details[sle.item_code]
sle.update(item_detail)
if filters.get("batch_no") or inventory_dimension_filters_applied:
actual_qty += flt(sle.actual_qty, precision)
stock_value += sle.stock_value_difference
if sle.voucher_type == "Stock Reconciliation" and not sle.actual_qty:
actual_qty = sle.qty_after_transaction
stock_value = sle.stock_value
sle.update({"qty_after_transaction": actual_qty, "stock_value": stock_value})
sle.update({"in_qty": max(sle.actual_qty, 0), "out_qty": min(sle.actual_qty, 0)})
if sle.serial_no:
update_available_serial_nos(available_serial_nos, sle)
data.append(sle)
if include_uom:
conversion_factors.append(item_detail.conversion_factor)
update_included_uom_in_report(columns, data, include_uom, conversion_factors)
return columns, data
def update_available_serial_nos(available_serial_nos, sle):
serial_nos = get_serial_nos(sle.serial_no)
key = (sle.item_code, sle.warehouse)
if key not in available_serial_nos:
stock_balance = get_stock_balance_for(
sle.item_code, sle.warehouse, sle.posting_date, sle.posting_time
)
serials = get_serial_nos(stock_balance["serial_nos"]) if stock_balance["serial_nos"] else []
available_serial_nos.setdefault(key, serials)
existing_serial_no = available_serial_nos[key]
for sn in serial_nos:
if sle.actual_qty > 0:
if sn in existing_serial_no:
existing_serial_no.remove(sn)
else:
existing_serial_no.append(sn)
else:
if sn in existing_serial_no:
existing_serial_no.remove(sn)
else:
existing_serial_no.append(sn)
sle.balance_serial_no = "\n".join(existing_serial_no)
def get_columns(filters):
columns = [
{"label": _("Date"), "fieldname": "date", "fieldtype": "Datetime", "width": 150},
{
"label": _("Item"),
"fieldname": "item_code",
"fieldtype": "Link",
"options": "Item",
"width": 100,
},
{"label": _("Item Name"), "fieldname": "item_name", "width": 100},
{
"label": _("Stock UOM"),
"fieldname": "stock_uom",
"fieldtype": "Link",
"options": "UOM",
"width": 90,
},
]
for dimension in get_inventory_dimensions():
columns.append(
{
"label": _(dimension.doctype),
"fieldname": dimension.fieldname,
"fieldtype": "Link",
"options": dimension.doctype,
"width": 110,
}
)
columns.extend(
[
{
"label": _("In Qty"),
"fieldname": "in_qty",
"fieldtype": "Float",
"width": 80,
"convertible": "qty",
},
{
"label": _("Out Qty"),
"fieldname": "out_qty",
"fieldtype": "Float",
"width": 80,
"convertible": "qty",
},
{
"label": _("Balance Qty"),
"fieldname": "qty_after_transaction",
"fieldtype": "Float",
"width": 100,
"convertible": "qty",
},
{
"label": _("Voucher #"),
"fieldname": "voucher_no",
"fieldtype": "Dynamic Link",
"options": "voucher_type",
"width": 150,
},
{
"label": _("Warehouse"),
"fieldname": "warehouse",
"fieldtype": "Link",
"options": "Warehouse",
"width": 150,
},
{
"label": _("Item Group"),
"fieldname": "item_group",
"fieldtype": "Link",
"options": "Item Group",
"width": 100,
},
{
"label": _("Brand"),
"fieldname": "brand",
"fieldtype": "Link",
"options": "Brand",
"width": 100,
},
{"label": _("Description"), "fieldname": "description", "width": 200},
{
"label": _("Incoming Rate"),
"fieldname": "incoming_rate",
"fieldtype": "Currency",
"width": 110,
"options": "Company:company:default_currency",
"convertible": "rate",
},
{
"label": _("Valuation Rate"),
"fieldname": "valuation_rate",
"fieldtype": "Currency",
"width": 110,
"options": "Company:company:default_currency",
"convertible": "rate",
},
{
"label": _("Balance Value"),
"fieldname": "stock_value",
"fieldtype": "Currency",
"width": 110,
"options": "Company:company:default_currency",
},
{
"label": _("Value Change"),
"fieldname": "stock_value_difference",
"fieldtype": "Currency",
"width": 110,
"options": "Company:company:default_currency",
},
{"label": _("Voucher Type"), "fieldname": "voucher_type", "width": 110},
{
"label": _("Voucher #"),
"fieldname": "voucher_no",
"fieldtype": "Dynamic Link",
"options": "voucher_type",
"width": 100,
},
{
"label": _("Batch"),
"fieldname": "batch_no",
"fieldtype": "Link",
"options": "Batch",
"width": 100,
},
{
"label": _("Serial No"),
"fieldname": "serial_no",
"fieldtype": "Link",
"options": "Serial No",
"width": 100,
},
{"label": _("Balance Serial No"), "fieldname": "balance_serial_no", "width": 100},
{
"label": _("Project"),
"fieldname": "project",
"fieldtype": "Link",
"options": "Project",
"width": 100,
},
{
"label": _("Company"),
"fieldname": "company",
"fieldtype": "Link",
"options": "Company",
"width": 110,
},
]
)
return columns
def get_stock_ledger_entries(filters, items):
sle = frappe.qb.DocType("Stock Ledger Entry")
query = (
frappe.qb.from_(sle)
.select(
sle.item_code,
CombineDatetime(sle.posting_date, sle.posting_time).as_("date"),
sle.warehouse,
sle.posting_date,
sle.posting_time,
sle.actual_qty,
sle.incoming_rate,
sle.valuation_rate,
sle.company,
sle.voucher_type,
sle.qty_after_transaction,
sle.stock_value_difference,
sle.voucher_no,
sle.stock_value,
sle.batch_no,
sle.serial_no,
sle.project,
)
.where(
(sle.docstatus < 2)
& (sle.is_cancelled == 0)
& (sle.posting_date[filters.from_date : filters.to_date])
)
.orderby(CombineDatetime(sle.posting_date, sle.posting_time))
.orderby(sle.creation)
)
inventory_dimension_fields = get_inventory_dimension_fields()
if inventory_dimension_fields:
for fieldname in inventory_dimension_fields:
query = query.select(fieldname)
if fieldname in filters and filters.get(fieldname):
query = query.where(sle[fieldname].isin(filters.get(fieldname)))
if items:
query = query.where(sle.item_code.isin(items))
for field in ["voucher_no", "batch_no", "project", "company"]:
if filters.get(field):
query = query.where(sle[field] == filters.get(field))
if warehouse := filters.get("warehouse"):
lft, rgt = frappe.db.get_value("Warehouse", warehouse, ["lft", "rgt"])
warehouse_table = frappe.qb.DocType("Warehouse")
chilren_subquery = (
frappe.qb.from_(warehouse_table)
.select(warehouse_table.name)
.where(
(warehouse_table.lft >= lft)
& (warehouse_table.rgt <= rgt)
& (warehouse_table.name == sle.warehouse)
)
)
query = query.where(ExistsCriterion(chilren_subquery))
return query.run(as_dict=True)
def get_inventory_dimension_fields():
return [dimension.fieldname for dimension in get_inventory_dimensions()]
def get_items(filters):
conditions = []
if filters.get("item_code"):
conditions.append("item.name=%(item_code)s")
else:
if filters.get("brand"):
conditions.append("item.brand=%(brand)s")
if filters.get("item_group"):
conditions.append(get_item_group_condition(filters.get("item_group")))
items = []
if conditions:
items = frappe.db.sql_list(
"""select name from `tabItem` item where {}""".format(" and ".join(conditions)), filters
)
return items
def get_item_details(items, sl_entries, include_uom):
item_details = {}
if not items:
items = list(set(d.item_code for d in sl_entries))
if not items:
return item_details
cf_field = cf_join = ""
if include_uom:
cf_field = ", ucd.conversion_factor"
cf_join = (
"left join `tabUOM Conversion Detail` ucd on ucd.parent=item.name and ucd.uom=%s"
% frappe.db.escape(include_uom)
)
res = frappe.db.sql(
"""
select
item.name, item.item_name, item.description, item.item_group, item.brand, item.stock_uom {cf_field}
from
`tabItem` item
{cf_join}
where
item.name in ({item_codes})
""".format(
cf_field=cf_field, cf_join=cf_join, item_codes=",".join(["%s"] * len(items))
),
items,
as_dict=1,
)
for item in res:
item_details.setdefault(item.name, item)
return item_details
def get_sle_conditions(filters):
conditions = []
if filters.get("warehouse"):
warehouse_condition = get_warehouse_condition(filters.get("warehouse"))
if warehouse_condition:
conditions.append(warehouse_condition)
if filters.get("voucher_no"):
conditions.append("voucher_no=%(voucher_no)s")
if filters.get("batch_no"):
conditions.append("batch_no=%(batch_no)s")
if filters.get("project"):
conditions.append("project=%(project)s")
for dimension in get_inventory_dimensions():
if filters.get(dimension.fieldname):
conditions.append(f"{dimension.fieldname} in %({dimension.fieldname})s")
return "and {}".format(" and ".join(conditions)) if conditions else ""
def get_opening_balance(filters, columns, sl_entries):
if not (filters.item_code and filters.warehouse and filters.from_date):
return
from erpnext.stock.stock_ledger import get_previous_sle
last_entry = get_previous_sle(
{
"item_code": filters.item_code,
"warehouse_condition": get_warehouse_condition(filters.warehouse),
"posting_date": filters.from_date,
"posting_time": "00:00:00",
}
)
# check if any SLEs are actually Opening Stock Reconciliation
for sle in sl_entries:
if (
sle.get("voucher_type") == "Stock Reconciliation"
and sle.posting_date == filters.from_date
and frappe.db.get_value("Stock Reconciliation", sle.voucher_no, "purpose") == "Opening Stock"
):
last_entry = sle
sl_entries.remove(sle)
row = {
"item_code": _("'Opening'"),
"qty_after_transaction": last_entry.get("qty_after_transaction", 0),
"valuation_rate": last_entry.get("valuation_rate", 0),
"stock_value": last_entry.get("stock_value", 0),
}
return row
def get_warehouse_condition(warehouse):
warehouse_details = frappe.db.get_value("Warehouse", warehouse, ["lft", "rgt"], as_dict=1)
if warehouse_details:
return (
" exists (select name from `tabWarehouse` wh \
where wh.lft >= %s and wh.rgt <= %s and warehouse = wh.name)"
% (warehouse_details.lft, warehouse_details.rgt)
)
return ""
def get_item_group_condition(item_group):
item_group_details = frappe.db.get_value("Item Group", item_group, ["lft", "rgt"], as_dict=1)
if item_group_details:
return (
"item.item_group in (select ig.name from `tabItem Group` ig \
where ig.lft >= %s and ig.rgt <= %s and item.item_group = ig.name)"
% (item_group_details.lft, item_group_details.rgt)
)
return ""
def check_inventory_dimension_filters_applied(filters) -> bool:
for dimension in get_inventory_dimensions():
if dimension.fieldname in filters and filters.get(dimension.fieldname):
return True
return False