578 lines
16 KiB
Python
578 lines
16 KiB
Python
from collections import defaultdict
|
|
from typing import List
|
|
|
|
import frappe
|
|
from frappe import _, bold
|
|
from frappe.model.naming import make_autoname
|
|
from frappe.query_builder.functions import Sum
|
|
from frappe.utils import cint, flt, now
|
|
|
|
from erpnext.stock.deprecated_serial_batch import (
|
|
DeprecatedBatchNoValuation,
|
|
DeprecatedSerialNoValuation,
|
|
)
|
|
from erpnext.stock.valuation import round_off_if_near_zero
|
|
|
|
|
|
class SerialBatchBundle:
|
|
def __init__(self, **kwargs):
|
|
for key, value in kwargs.items():
|
|
setattr(self, key, value)
|
|
|
|
self.set_item_details()
|
|
self.process_serial_and_batch_bundle()
|
|
if self.sle.is_cancelled:
|
|
self.delink_serial_and_batch_bundle()
|
|
|
|
self.post_process()
|
|
|
|
def process_serial_and_batch_bundle(self):
|
|
if self.item_details.has_serial_no:
|
|
self.process_serial_no()
|
|
elif self.item_details.has_batch_no:
|
|
self.process_batch_no()
|
|
|
|
def set_item_details(self):
|
|
fields = [
|
|
"has_batch_no",
|
|
"has_serial_no",
|
|
"item_name",
|
|
"item_group",
|
|
"serial_no_series",
|
|
"create_new_batch",
|
|
"batch_number_series",
|
|
]
|
|
|
|
self.item_details = frappe.get_cached_value("Item", self.sle.item_code, fields, as_dict=1)
|
|
|
|
def process_serial_no(self):
|
|
if (
|
|
not self.sle.is_cancelled
|
|
and not self.sle.serial_and_batch_bundle
|
|
and self.sle.actual_qty > 0
|
|
and self.item_details.has_serial_no == 1
|
|
and self.item_details.serial_no_series
|
|
and self.allow_to_make_auto_bundle()
|
|
):
|
|
self.make_serial_batch_no_bundle()
|
|
elif not self.sle.is_cancelled:
|
|
self.validate_item_and_warehouse()
|
|
|
|
def auto_create_serial_nos(self, batch_no=None):
|
|
sr_nos = []
|
|
serial_nos_details = []
|
|
|
|
for i in range(cint(self.sle.actual_qty)):
|
|
serial_no = make_autoname(self.item_details.serial_no_series, "Serial No")
|
|
sr_nos.append(serial_no)
|
|
serial_nos_details.append(
|
|
(
|
|
serial_no,
|
|
serial_no,
|
|
now(),
|
|
now(),
|
|
frappe.session.user,
|
|
frappe.session.user,
|
|
self.warehouse,
|
|
self.company,
|
|
self.item_code,
|
|
self.item_details.item_name,
|
|
self.item_details.description,
|
|
"Active",
|
|
batch_no,
|
|
)
|
|
)
|
|
|
|
if serial_nos_details:
|
|
fields = [
|
|
"name",
|
|
"serial_no",
|
|
"creation",
|
|
"modified",
|
|
"owner",
|
|
"modified_by",
|
|
"warehouse",
|
|
"company",
|
|
"item_code",
|
|
"item_name",
|
|
"description",
|
|
"status",
|
|
"batch_no",
|
|
]
|
|
|
|
frappe.db.bulk_insert("Serial No", fields=fields, values=set(serial_nos_details))
|
|
|
|
return sr_nos
|
|
|
|
def make_serial_batch_no_bundle(self):
|
|
sn_doc = frappe.new_doc("Serial and Batch Bundle")
|
|
sn_doc.item_code = self.item_code
|
|
sn_doc.warehouse = self.warehouse
|
|
sn_doc.item_name = self.item_details.item_name
|
|
sn_doc.item_group = self.item_details.item_group
|
|
sn_doc.has_serial_no = self.item_details.has_serial_no
|
|
sn_doc.has_batch_no = self.item_details.has_batch_no
|
|
sn_doc.voucher_type = self.sle.voucher_type
|
|
sn_doc.voucher_no = self.sle.voucher_no
|
|
sn_doc.voucher_detail_no = self.sle.voucher_detail_no
|
|
sn_doc.total_qty = self.sle.actual_qty
|
|
sn_doc.avg_rate = self.sle.incoming_rate
|
|
sn_doc.total_amount = flt(self.sle.actual_qty) * flt(self.sle.incoming_rate)
|
|
sn_doc.type_of_transaction = "Inward"
|
|
sn_doc.posting_date = self.sle.posting_date
|
|
sn_doc.posting_time = self.sle.posting_time
|
|
sn_doc.is_rejected = self.is_rejected_entry()
|
|
|
|
sn_doc.flags.ignore_mandatory = True
|
|
sn_doc.insert()
|
|
|
|
batch_no = ""
|
|
if self.item_details.has_batch_no:
|
|
batch_no = self.create_batch()
|
|
|
|
incoming_rate = self.sle.incoming_rate
|
|
if not incoming_rate:
|
|
incoming_rate = frappe.get_cached_value(
|
|
self.child_doctype, self.sle.voucher_detail_no, "valuation_rate"
|
|
)
|
|
|
|
if self.item_details.has_serial_no:
|
|
sr_nos = self.auto_create_serial_nos(batch_no)
|
|
self.add_serial_no_to_bundle(sn_doc, sr_nos, incoming_rate, batch_no)
|
|
elif self.item_details.has_batch_no:
|
|
self.add_batch_no_to_bundle(sn_doc, batch_no, incoming_rate)
|
|
sn_doc.save()
|
|
|
|
sn_doc.load_from_db()
|
|
sn_doc.flags.ignore_validate = True
|
|
sn_doc.flags.ignore_mandatory = True
|
|
|
|
sn_doc.submit()
|
|
self.set_serial_and_batch_bundle(sn_doc)
|
|
|
|
def set_serial_and_batch_bundle(self, sn_doc):
|
|
self.sle.db_set("serial_and_batch_bundle", sn_doc.name)
|
|
|
|
if sn_doc.is_rejected:
|
|
frappe.db.set_value(
|
|
self.child_doctype, self.sle.voucher_detail_no, "rejected_serial_and_batch_bundle", sn_doc.name
|
|
)
|
|
else:
|
|
frappe.db.set_value(
|
|
self.child_doctype, self.sle.voucher_detail_no, "serial_and_batch_bundle", sn_doc.name
|
|
)
|
|
|
|
@property
|
|
def child_doctype(self):
|
|
child_doctype = self.sle.voucher_type + " Item"
|
|
if self.sle.voucher_type == "Stock Entry":
|
|
child_doctype = "Stock Entry Detail"
|
|
|
|
return child_doctype
|
|
|
|
def is_rejected_entry(self):
|
|
return is_rejected(self.sle.voucher_type, self.sle.voucher_detail_no, self.sle.warehouse)
|
|
|
|
def add_serial_no_to_bundle(self, sn_doc, serial_nos, incoming_rate, batch_no=None):
|
|
ledgers = []
|
|
|
|
fields = [
|
|
"name",
|
|
"serial_no",
|
|
"batch_no",
|
|
"warehouse",
|
|
"item_code",
|
|
"qty",
|
|
"incoming_rate",
|
|
"parent",
|
|
"parenttype",
|
|
"parentfield",
|
|
]
|
|
|
|
for serial_no in serial_nos:
|
|
ledgers.append(
|
|
(
|
|
frappe.generate_hash("Serial and Batch Ledger", 10),
|
|
serial_no,
|
|
batch_no,
|
|
self.warehouse,
|
|
self.item_details.item_code,
|
|
1,
|
|
incoming_rate,
|
|
sn_doc.name,
|
|
sn_doc.doctype,
|
|
"ledgers",
|
|
)
|
|
)
|
|
|
|
frappe.db.bulk_insert("Serial and Batch Ledger", fields=fields, values=set(ledgers))
|
|
|
|
def add_batch_no_to_bundle(self, sn_doc, batch_no, incoming_rate):
|
|
stock_value_difference = flt(self.sle.actual_qty) * flt(incoming_rate)
|
|
|
|
if self.sle.actual_qty < 0:
|
|
stock_value_difference *= -1
|
|
|
|
sn_doc.append(
|
|
"ledgers",
|
|
{
|
|
"batch_no": batch_no,
|
|
"qty": self.sle.actual_qty,
|
|
"incoming_rate": incoming_rate,
|
|
"stock_value_difference": stock_value_difference,
|
|
},
|
|
)
|
|
|
|
def create_batch(self):
|
|
from erpnext.stock.doctype.batch.batch import make_batch
|
|
|
|
return make_batch(
|
|
frappe._dict(
|
|
{
|
|
"item": self.item_code,
|
|
"reference_doctype": self.sle.voucher_type,
|
|
"reference_name": self.sle.voucher_no,
|
|
}
|
|
)
|
|
)
|
|
|
|
def process_batch_no(self):
|
|
if (
|
|
not self.sle.is_cancelled
|
|
and not self.sle.serial_and_batch_bundle
|
|
and self.sle.actual_qty > 0
|
|
and self.item_details.has_batch_no == 1
|
|
and self.item_details.create_new_batch
|
|
and self.item_details.batch_number_series
|
|
and self.allow_to_make_auto_bundle()
|
|
):
|
|
self.make_serial_batch_no_bundle()
|
|
elif not self.sle.is_cancelled:
|
|
self.validate_item_and_warehouse()
|
|
|
|
def validate_item_and_warehouse(self):
|
|
|
|
data = frappe.db.get_value(
|
|
"Serial and Batch Bundle",
|
|
self.sle.serial_and_batch_bundle,
|
|
["item_code", "warehouse", "voucher_no"],
|
|
as_dict=1,
|
|
)
|
|
|
|
if self.sle.serial_and_batch_bundle and not frappe.db.exists(
|
|
"Serial and Batch Bundle",
|
|
{
|
|
"name": self.sle.serial_and_batch_bundle,
|
|
"item_code": self.item_code,
|
|
"warehouse": self.warehouse,
|
|
"voucher_no": self.sle.voucher_no,
|
|
},
|
|
):
|
|
msg = f"""
|
|
The Serial and Batch Bundle
|
|
{bold(self.sle.serial_and_batch_bundle)}
|
|
does not belong to Item {bold(self.item_code)}
|
|
or Warehouse {bold(self.warehouse)}
|
|
or {self.sle.voucher_type} no {bold(self.sle.voucher_no)}
|
|
"""
|
|
|
|
frappe.throw(_(msg))
|
|
|
|
def delink_serial_and_batch_bundle(self):
|
|
update_values = {
|
|
"serial_and_batch_bundle": "",
|
|
}
|
|
|
|
if is_rejected(self.sle.voucher_type, self.sle.voucher_detail_no, self.sle.warehouse):
|
|
update_values["rejected_serial_and_batch_bundle"] = ""
|
|
|
|
frappe.db.set_value(self.child_doctype, self.sle.voucher_detail_no, update_values)
|
|
|
|
frappe.db.set_value(
|
|
"Serial and Batch Bundle",
|
|
{"voucher_no": self.sle.voucher_no, "voucher_type": self.sle.voucher_type},
|
|
{"is_cancelled": 1, "voucher_no": ""},
|
|
)
|
|
|
|
def allow_to_make_auto_bundle(self):
|
|
if self.sle.voucher_type in ["Stock Entry", "Purchase Receipt", "Purchase Invoice"]:
|
|
if self.sle.voucher_type == "Stock Entry":
|
|
stock_entry_type = frappe.get_cached_value("Stock Entry", self.sle.voucher_no, "purpose")
|
|
|
|
if stock_entry_type in ["Material Receipt", "Manufacture", "Repack"]:
|
|
return True
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
def post_process(self):
|
|
if self.item_details.has_serial_no == 1:
|
|
self.set_warehouse_and_status_in_serial_nos()
|
|
|
|
if (
|
|
self.sle.actual_qty > 0
|
|
and self.item_details.has_serial_no == 1
|
|
and self.item_details.has_batch_no == 1
|
|
):
|
|
self.set_batch_no_in_serial_nos()
|
|
|
|
def set_warehouse_and_status_in_serial_nos(self):
|
|
serial_nos = get_serial_nos(self.sle.serial_and_batch_bundle, check_outward=False)
|
|
warehouse = self.warehouse if self.sle.actual_qty > 0 else None
|
|
|
|
if not serial_nos:
|
|
return
|
|
|
|
sn_table = frappe.qb.DocType("Serial No")
|
|
(
|
|
frappe.qb.update(sn_table)
|
|
.set(sn_table.warehouse, warehouse)
|
|
.set(sn_table.status, "Active" if warehouse else "Inactive")
|
|
.where(sn_table.name.isin(serial_nos))
|
|
).run()
|
|
|
|
def set_batch_no_in_serial_nos(self):
|
|
ledgers = frappe.get_all(
|
|
"Serial and Batch Ledger",
|
|
fields=["serial_no", "batch_no"],
|
|
filters={"parent": self.sle.serial_and_batch_bundle},
|
|
)
|
|
|
|
batch_serial_nos = {}
|
|
for ledger in ledgers:
|
|
batch_serial_nos.setdefault(ledger.batch_no, []).append(ledger.serial_no)
|
|
|
|
for batch_no, serial_nos in batch_serial_nos.items():
|
|
sn_table = frappe.qb.DocType("Serial No")
|
|
(
|
|
frappe.qb.update(sn_table)
|
|
.set(sn_table.batch_no, batch_no)
|
|
.where(sn_table.name.isin(serial_nos))
|
|
).run()
|
|
|
|
|
|
def get_serial_nos(serial_and_batch_bundle, check_outward=True):
|
|
filters = {"parent": serial_and_batch_bundle}
|
|
if check_outward:
|
|
filters["is_outward"] = 1
|
|
|
|
ledgers = frappe.get_all("Serial and Batch Ledger", fields=["serial_no"], filters=filters)
|
|
|
|
return [d.serial_no for d in ledgers]
|
|
|
|
|
|
class SerialNoBundleValuation(DeprecatedSerialNoValuation):
|
|
def __init__(self, **kwargs):
|
|
for key, value in kwargs.items():
|
|
setattr(self, key, value)
|
|
|
|
self.calculate_stock_value_change()
|
|
self.calculate_valuation_rate()
|
|
|
|
def calculate_stock_value_change(self):
|
|
if self.sle.actual_qty > 0:
|
|
self.stock_value_change = frappe.get_cached_value(
|
|
"Serial and Batch Bundle", self.sle.serial_and_batch_bundle, "total_amount"
|
|
)
|
|
|
|
else:
|
|
ledgers = self.get_serial_no_ledgers()
|
|
|
|
self.serial_no_incoming_rate = defaultdict(float)
|
|
self.stock_value_change = 0.0
|
|
|
|
for ledger in ledgers:
|
|
self.stock_value_change += ledger.incoming_rate * -1
|
|
self.serial_no_incoming_rate[ledger.serial_no] = ledger.incoming_rate
|
|
|
|
self.calculate_stock_value_from_deprecarated_ledgers()
|
|
|
|
def get_serial_no_ledgers(self):
|
|
serial_nos = self.get_serial_nos()
|
|
|
|
subquery = f"""
|
|
SELECT
|
|
MAX(
|
|
TIMESTAMP(
|
|
parent.posting_date, parent.posting_time
|
|
)
|
|
), child.name, child.serial_no, child.warehouse
|
|
FROM
|
|
`tabSerial and Batch Bundle` as parent,
|
|
`tabSerial and Batch Ledger` as child
|
|
WHERE
|
|
parent.name = child.parent
|
|
AND child.serial_no IN ({', '.join([frappe.db.escape(s) for s in serial_nos])})
|
|
AND child.is_outward = 0
|
|
AND parent.docstatus < 2
|
|
AND parent.is_cancelled = 0
|
|
AND child.warehouse = {frappe.db.escape(self.sle.warehouse)}
|
|
AND parent.item_code = {frappe.db.escape(self.sle.item_code)}
|
|
AND (
|
|
parent.posting_date < '{self.sle.posting_date}'
|
|
OR (
|
|
parent.posting_date = '{self.sle.posting_date}'
|
|
AND parent.posting_time <= '{self.sle.posting_time}'
|
|
)
|
|
)
|
|
GROUP BY
|
|
child.serial_no
|
|
"""
|
|
|
|
return frappe.db.sql(
|
|
f"""
|
|
SELECT
|
|
ledger.serial_no, ledger.incoming_rate, ledger.warehouse
|
|
FROM
|
|
`tabSerial and Batch Ledger` AS ledger,
|
|
({subquery}) AS SubQuery
|
|
WHERE
|
|
ledger.name = SubQuery.name
|
|
AND ledger.serial_no = SubQuery.serial_no
|
|
AND ledger.warehouse = SubQuery.warehouse
|
|
GROUP BY
|
|
ledger.serial_no
|
|
Order By
|
|
ledger.creation
|
|
""",
|
|
as_dict=1,
|
|
)
|
|
|
|
def get_serial_nos(self):
|
|
if self.sle.get("serial_nos"):
|
|
return self.sle.serial_nos
|
|
|
|
return get_serial_nos(self.sle.serial_and_batch_bundle)
|
|
|
|
def calculate_valuation_rate(self):
|
|
if not hasattr(self, "wh_data"):
|
|
return
|
|
|
|
new_stock_qty = self.wh_data.qty_after_transaction + self.sle.actual_qty
|
|
|
|
if new_stock_qty > 0:
|
|
new_stock_value = (
|
|
self.wh_data.qty_after_transaction * self.wh_data.valuation_rate
|
|
) + self.stock_value_change
|
|
if new_stock_value >= 0:
|
|
# calculate new valuation rate only if stock value is positive
|
|
# else it remains the same as that of previous entry
|
|
self.wh_data.valuation_rate = new_stock_value / new_stock_qty
|
|
|
|
if (
|
|
not self.wh_data.valuation_rate and self.sle.voucher_detail_no and not self.is_rejected_entry()
|
|
):
|
|
allow_zero_rate = self.sle_self.check_if_allow_zero_valuation_rate(
|
|
self.sle.voucher_type, self.sle.voucher_detail_no
|
|
)
|
|
if not allow_zero_rate:
|
|
self.wh_data.valuation_rate = self.sle_self.get_fallback_rate(self.sle)
|
|
|
|
self.wh_data.qty_after_transaction += self.sle.actual_qty
|
|
self.wh_data.stock_value = flt(self.wh_data.qty_after_transaction) * flt(
|
|
self.wh_data.valuation_rate
|
|
)
|
|
|
|
def is_rejected_entry(self):
|
|
return is_rejected(self.sle.voucher_type, self.sle.voucher_detail_no, self.sle.warehouse)
|
|
|
|
def get_incoming_rate(self):
|
|
return abs(flt(self.stock_value_change) / flt(self.sle.actual_qty))
|
|
|
|
|
|
def is_rejected(voucher_type, voucher_detail_no, warehouse):
|
|
if voucher_type in ["Purchase Receipt", "Purchase Invoice"]:
|
|
return warehouse == frappe.get_cached_value(
|
|
voucher_type + " Item", voucher_detail_no, "rejected_warehouse"
|
|
)
|
|
|
|
return False
|
|
|
|
|
|
class BatchNoBundleValuation(DeprecatedBatchNoValuation):
|
|
def __init__(self, **kwargs):
|
|
for key, value in kwargs.items():
|
|
setattr(self, key, value)
|
|
|
|
self.batch_nos = self.get_batch_nos()
|
|
self.calculate_avg_rate()
|
|
self.calculate_valuation_rate()
|
|
|
|
def calculate_avg_rate(self):
|
|
if self.sle.actual_qty > 0:
|
|
self.stock_value_change = frappe.get_cached_value(
|
|
"Serial and Batch Bundle", self.sle.serial_and_batch_bundle, "total_amount"
|
|
)
|
|
else:
|
|
ledgers = self.get_batch_no_ledgers()
|
|
|
|
self.batch_avg_rate = defaultdict(float)
|
|
for ledger in ledgers:
|
|
self.batch_avg_rate[ledger.batch_no] += flt(ledger.incoming_rate) / flt(ledger.qty)
|
|
|
|
self.calculate_avg_rate_from_deprecarated_ledgers()
|
|
self.set_stock_value_difference()
|
|
|
|
def get_batch_no_ledgers(self) -> List[dict]:
|
|
parent = frappe.qb.DocType("Serial and Batch Bundle")
|
|
child = frappe.qb.DocType("Serial and Batch Ledger")
|
|
|
|
batch_nos = list(self.batch_nos.keys())
|
|
|
|
return (
|
|
frappe.qb.from_(parent)
|
|
.inner_join(child)
|
|
.on(parent.name == child.parent)
|
|
.select(
|
|
child.batch_no,
|
|
Sum(child.stock_value_difference).as_("incoming_rate"),
|
|
Sum(child.qty).as_("qty"),
|
|
)
|
|
.where(
|
|
(child.batch_no.isin(batch_nos))
|
|
& (child.parent != self.sle.serial_and_batch_bundle)
|
|
& (parent.warehouse == self.sle.warehouse)
|
|
& (parent.item_code == self.sle.item_code)
|
|
& (parent.is_cancelled == 0)
|
|
)
|
|
.groupby(child.batch_no)
|
|
).run(as_dict=True)
|
|
|
|
def get_batch_nos(self) -> list:
|
|
if self.sle.get("batch_nos"):
|
|
return self.sle.batch_nos
|
|
|
|
ledgers = frappe.get_all(
|
|
"Serial and Batch Ledger",
|
|
fields=["batch_no", "qty", "name"],
|
|
filters={"parent": self.sle.serial_and_batch_bundle, "is_outward": 1},
|
|
)
|
|
|
|
return {d.batch_no: d for d in ledgers}
|
|
|
|
def set_stock_value_difference(self):
|
|
self.stock_value_change = 0
|
|
for batch_no, ledger in self.batch_nos.items():
|
|
stock_value_change = self.batch_avg_rate[batch_no] * ledger.qty
|
|
self.stock_value_change += stock_value_change
|
|
frappe.db.set_value(
|
|
"Serial and Batch Ledger", ledger.name, "stock_value_difference", stock_value_change
|
|
)
|
|
|
|
def calculate_valuation_rate(self):
|
|
if not hasattr(self, "wh_data"):
|
|
return
|
|
|
|
self.wh_data.stock_value = round_off_if_near_zero(
|
|
self.wh_data.stock_value + self.stock_value_change
|
|
)
|
|
|
|
if self.wh_data.qty_after_transaction:
|
|
self.wh_data.valuation_rate = self.wh_data.stock_value / self.wh_data.qty_after_transaction
|
|
|
|
self.wh_data.qty_after_transaction += self.sle.actual_qty
|
|
|
|
def get_incoming_rate(self):
|
|
return abs(flt(self.stock_value_change) / flt(self.sle.actual_qty))
|