Merge branch 'develop' into fix/production-plan/test/planned-qty

This commit is contained in:
Sagar Sharma 2022-06-22 15:33:08 +05:30 committed by GitHub
commit 70bc51a8fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 499 additions and 157 deletions

View File

@ -59,6 +59,7 @@ frappe.query_reports["Purchase Order Analysis"] = {
for (let option of status){
options.push({
"value": option,
"label": __(option),
"description": ""
})
}

View File

@ -7,11 +7,11 @@
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"current_bom",
"new_bom",
"column_break_3",
"update_type",
"status",
"column_break_3",
"current_bom",
"new_bom",
"error_log",
"progress_section",
"current_level",
@ -37,6 +37,7 @@
"options": "BOM"
},
{
"depends_on": "eval:doc.update_type === \"Replace BOM\"",
"fieldname": "column_break_3",
"fieldtype": "Column Break"
},
@ -87,6 +88,7 @@
"options": "BOM Update Batch"
},
{
"depends_on": "eval:doc.status !== \"Completed\"",
"fieldname": "current_level",
"fieldtype": "Int",
"label": "Current Level"
@ -96,7 +98,7 @@
"index_web_pages_for_search": 1,
"is_submittable": 1,
"links": [],
"modified": "2022-06-06 15:15:23.883251",
"modified": "2022-06-20 15:43:55.696388",
"modified_by": "Administrator",
"module": "Manufacturing",
"name": "BOM Update Log",

View File

@ -6,6 +6,8 @@ from typing import Any, Dict, List, Optional, Tuple, Union
import frappe
from frappe import _
from frappe.model.document import Document
from frappe.query_builder import DocType, Interval
from frappe.query_builder.functions import Now
from frappe.utils import cint, cstr
from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import (
@ -22,6 +24,17 @@ class BOMMissingError(frappe.ValidationError):
class BOMUpdateLog(Document):
@staticmethod
def clear_old_logs(days=None):
days = days or 90
table = DocType("BOM Update Log")
frappe.db.delete(
table,
filters=(
(table.modified < (Now() - Interval(days=days))) & (table.update_type == "Update Cost")
),
)
def validate(self):
if self.update_type == "Replace BOM":
self.validate_boms_are_specified()
@ -77,7 +90,11 @@ class BOMUpdateLog(Document):
now=frappe.flags.in_test,
)
else:
process_boms_cost_level_wise(self)
frappe.enqueue(
method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.process_boms_cost_level_wise",
update_doc=self,
now=frappe.flags.in_test,
)
def run_replace_bom_job(
@ -112,28 +129,31 @@ def process_boms_cost_level_wise(
current_boms = {}
values = {}
if update_doc.status == "Queued":
# First level yet to process. On Submit.
current_level = 0
current_boms = get_leaf_boms()
values = {
"processed_boms": json.dumps({}),
"status": "In Progress",
"current_level": current_level,
}
else:
# Resume next level. via Cron Job.
if not parent_boms:
return
try:
if update_doc.status == "Queued":
# First level yet to process. On Submit.
current_level = 0
current_boms = get_leaf_boms()
values = {
"processed_boms": json.dumps({}),
"status": "In Progress",
"current_level": current_level,
}
else:
# Resume next level. via Cron Job.
if not parent_boms:
return
current_level = cint(update_doc.current_level) + 1
current_level = cint(update_doc.current_level) + 1
# Process the next level BOMs. Stage parents as current BOMs.
current_boms = parent_boms.copy()
values = {"current_level": current_level}
# Process the next level BOMs. Stage parents as current BOMs.
current_boms = parent_boms.copy()
values = {"current_level": current_level}
set_values_in_log(update_doc.name, values, commit=True)
queue_bom_cost_jobs(current_boms, update_doc, current_level)
set_values_in_log(update_doc.name, values, commit=True)
queue_bom_cost_jobs(current_boms, update_doc, current_level)
except Exception:
handle_exception(update_doc)
def queue_bom_cost_jobs(
@ -199,16 +219,22 @@ def resume_bom_cost_update_jobs():
current_boms, processed_boms = get_processed_current_boms(log, bom_batches)
parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms)
# Unset processed BOMs if log is complete, it is used for next level BOMs
# Unset processed BOMs (it is used for next level BOMs) & change status if log is complete
status = "Completed" if not parent_boms else "In Progress"
processed_boms = json.dumps([] if not parent_boms else processed_boms)
set_values_in_log(
log.name,
values={
"processed_boms": json.dumps([] if not parent_boms else processed_boms),
"status": "Completed" if not parent_boms else "In Progress",
"processed_boms": processed_boms,
"status": status,
},
commit=True,
)
# clear progress section
if status == "Completed":
frappe.db.delete("BOM Update Batch", {"parent": log.name})
if parent_boms: # there is a next level to process
process_boms_cost_level_wise(
update_doc=frappe.get_doc("BOM Update Log", log.name), parent_boms=parent_boms

View File

@ -1,6 +1,6 @@
frappe.listview_settings['BOM Update Log'] = {
add_fields: ["status"],
get_indicator: function(doc) {
get_indicator: (doc) => {
let status_map = {
"Queued": "orange",
"In Progress": "blue",
@ -9,5 +9,22 @@ frappe.listview_settings['BOM Update Log'] = {
};
return [__(doc.status), status_map[doc.status], "status,=," + doc.status];
}
},
onload: () => {
if (!frappe.model.can_write("Log Settings")) {
return;
}
let sidebar_entry = $(
'<ul class="list-unstyled sidebar-menu log-retention-note"></ul>'
).appendTo(cur_list.page.sidebar);
let message = __("Note: Automatic log deletion only applies to logs of type <i>Update Cost</i>");
$(`<hr><div class='text-muted'>${message}</div>`).appendTo(sidebar_entry);
frappe.require("logtypes.bundle.js", () => {
frappe.utils.logtypes.show_log_retention_message(cur_list.doctype);
});
},
};

View File

@ -1,6 +1,8 @@
# Copyright (c) 2021, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
import copy
import frappe
from frappe.tests.utils import FrappeTestCase, change_settings, timeout
from frappe.utils import add_days, add_months, cint, flt, now, today
@ -19,6 +21,7 @@ from erpnext.manufacturing.doctype.work_order.work_order import (
)
from erpnext.selling.doctype.sales_order.test_sales_order import make_sales_order
from erpnext.stock.doctype.item.test_item import create_item, make_item
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
from erpnext.stock.doctype.stock_entry import test_stock_entry
from erpnext.stock.doctype.warehouse.test_warehouse import create_warehouse
from erpnext.stock.utils import get_bin
@ -28,6 +31,7 @@ class TestWorkOrder(FrappeTestCase):
def setUp(self):
self.warehouse = "_Test Warehouse 2 - _TC"
self.item = "_Test Item"
prepare_data_for_backflush_based_on_materials_transferred()
def tearDown(self):
frappe.db.rollback()
@ -527,6 +531,8 @@ class TestWorkOrder(FrappeTestCase):
work_order.cancel()
def test_work_order_with_non_transfer_item(self):
frappe.db.set_value("Manufacturing Settings", None, "backflush_raw_materials_based_on", "BOM")
items = {"Finished Good Transfer Item": 1, "_Test FG Item": 1, "_Test FG Item 1": 0}
for item, allow_transfer in items.items():
make_item(item, {"include_item_in_manufacturing": allow_transfer})
@ -1071,7 +1077,7 @@ class TestWorkOrder(FrappeTestCase):
sm = frappe.get_doc(make_stock_entry(wo_order.name, "Material Transfer for Manufacture", 100))
for row in sm.get("items"):
if row.get("item_code") == "_Test Item":
row.qty = 110
row.qty = 120
sm.submit()
cancel_stock_entry.append(sm.name)
@ -1079,21 +1085,21 @@ class TestWorkOrder(FrappeTestCase):
s = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 90))
for row in s.get("items"):
if row.get("item_code") == "_Test Item":
self.assertEqual(row.get("qty"), 100)
self.assertEqual(row.get("qty"), 108)
s.submit()
cancel_stock_entry.append(s.name)
s1 = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 5))
for row in s1.get("items"):
if row.get("item_code") == "_Test Item":
self.assertEqual(row.get("qty"), 5)
self.assertEqual(row.get("qty"), 6)
s1.submit()
cancel_stock_entry.append(s1.name)
s2 = frappe.get_doc(make_stock_entry(wo_order.name, "Manufacture", 5))
for row in s2.get("items"):
if row.get("item_code") == "_Test Item":
self.assertEqual(row.get("qty"), 5)
self.assertEqual(row.get("qty"), 6)
cancel_stock_entry.reverse()
for ste in cancel_stock_entry:
@ -1203,6 +1209,269 @@ class TestWorkOrder(FrappeTestCase):
self.assertEqual(work_order.required_items[0].transferred_qty, 1)
self.assertEqual(work_order.required_items[1].transferred_qty, 2)
def test_backflushed_batch_raw_materials_based_on_transferred(self):
frappe.db.set_value(
"Manufacturing Settings",
None,
"backflush_raw_materials_based_on",
"Material Transferred for Manufacture",
)
batch_item = "Test Batch MCC Keyboard"
fg_item = "Test FG Item with Batch Raw Materials"
ste_doc = test_stock_entry.make_stock_entry(
item_code=batch_item, target="Stores - _TC", qty=2, basic_rate=100, do_not_save=True
)
ste_doc.append(
"items",
{
"item_code": batch_item,
"item_name": batch_item,
"description": batch_item,
"basic_rate": 100,
"t_warehouse": "Stores - _TC",
"qty": 2,
"uom": "Nos",
"stock_uom": "Nos",
"conversion_factor": 1,
},
)
# Inward raw materials in Stores warehouse
ste_doc.insert()
ste_doc.submit()
batch_list = [row.batch_no for row in ste_doc.items]
wo_doc = make_wo_order_test_record(production_item=fg_item, qty=4)
transferred_ste_doc = frappe.get_doc(
make_stock_entry(wo_doc.name, "Material Transfer for Manufacture", 4)
)
transferred_ste_doc.items[0].qty = 2
transferred_ste_doc.items[0].batch_no = batch_list[0]
new_row = copy.deepcopy(transferred_ste_doc.items[0])
new_row.name = ""
new_row.batch_no = batch_list[1]
# Transferred two batches from Stores to WIP Warehouse
transferred_ste_doc.append("items", new_row)
transferred_ste_doc.submit()
# First Manufacture stock entry
manufacture_ste_doc1 = frappe.get_doc(make_stock_entry(wo_doc.name, "Manufacture", 1))
# Batch no should be same as transferred Batch no
self.assertEqual(manufacture_ste_doc1.items[0].batch_no, batch_list[0])
self.assertEqual(manufacture_ste_doc1.items[0].qty, 1)
manufacture_ste_doc1.submit()
# Second Manufacture stock entry
manufacture_ste_doc2 = frappe.get_doc(make_stock_entry(wo_doc.name, "Manufacture", 2))
# Batch no should be same as transferred Batch no
self.assertEqual(manufacture_ste_doc2.items[0].batch_no, batch_list[0])
self.assertEqual(manufacture_ste_doc2.items[0].qty, 1)
self.assertEqual(manufacture_ste_doc2.items[1].batch_no, batch_list[1])
self.assertEqual(manufacture_ste_doc2.items[1].qty, 1)
def test_backflushed_serial_no_raw_materials_based_on_transferred(self):
frappe.db.set_value(
"Manufacturing Settings",
None,
"backflush_raw_materials_based_on",
"Material Transferred for Manufacture",
)
sn_item = "Test Serial No BTT Headphone"
fg_item = "Test FG Item with Serial No Raw Materials"
ste_doc = test_stock_entry.make_stock_entry(
item_code=sn_item, target="Stores - _TC", qty=4, basic_rate=100, do_not_save=True
)
# Inward raw materials in Stores warehouse
ste_doc.submit()
serial_nos_list = sorted(get_serial_nos(ste_doc.items[0].serial_no))
wo_doc = make_wo_order_test_record(production_item=fg_item, qty=4)
transferred_ste_doc = frappe.get_doc(
make_stock_entry(wo_doc.name, "Material Transfer for Manufacture", 4)
)
transferred_ste_doc.items[0].serial_no = "\n".join(serial_nos_list)
transferred_ste_doc.submit()
# First Manufacture stock entry
manufacture_ste_doc1 = frappe.get_doc(make_stock_entry(wo_doc.name, "Manufacture", 1))
# Serial nos should be same as transferred Serial nos
self.assertEqual(get_serial_nos(manufacture_ste_doc1.items[0].serial_no), serial_nos_list[0:1])
self.assertEqual(manufacture_ste_doc1.items[0].qty, 1)
manufacture_ste_doc1.submit()
# Second Manufacture stock entry
manufacture_ste_doc2 = frappe.get_doc(make_stock_entry(wo_doc.name, "Manufacture", 2))
# Serial nos should be same as transferred Serial nos
self.assertEqual(get_serial_nos(manufacture_ste_doc2.items[0].serial_no), serial_nos_list[1:3])
self.assertEqual(manufacture_ste_doc2.items[0].qty, 2)
def test_backflushed_serial_no_batch_raw_materials_based_on_transferred(self):
frappe.db.set_value(
"Manufacturing Settings",
None,
"backflush_raw_materials_based_on",
"Material Transferred for Manufacture",
)
sn_batch_item = "Test Batch Serial No WebCam"
fg_item = "Test FG Item with Serial & Batch No Raw Materials"
ste_doc = test_stock_entry.make_stock_entry(
item_code=sn_batch_item, target="Stores - _TC", qty=2, basic_rate=100, do_not_save=True
)
ste_doc.append(
"items",
{
"item_code": sn_batch_item,
"item_name": sn_batch_item,
"description": sn_batch_item,
"basic_rate": 100,
"t_warehouse": "Stores - _TC",
"qty": 2,
"uom": "Nos",
"stock_uom": "Nos",
"conversion_factor": 1,
},
)
# Inward raw materials in Stores warehouse
ste_doc.insert()
ste_doc.submit()
batch_dict = {row.batch_no: get_serial_nos(row.serial_no) for row in ste_doc.items}
batches = list(batch_dict.keys())
wo_doc = make_wo_order_test_record(production_item=fg_item, qty=4)
transferred_ste_doc = frappe.get_doc(
make_stock_entry(wo_doc.name, "Material Transfer for Manufacture", 4)
)
transferred_ste_doc.items[0].qty = 2
transferred_ste_doc.items[0].batch_no = batches[0]
transferred_ste_doc.items[0].serial_no = "\n".join(batch_dict.get(batches[0]))
new_row = copy.deepcopy(transferred_ste_doc.items[0])
new_row.name = ""
new_row.batch_no = batches[1]
new_row.serial_no = "\n".join(batch_dict.get(batches[1]))
# Transferred two batches from Stores to WIP Warehouse
transferred_ste_doc.append("items", new_row)
transferred_ste_doc.submit()
# First Manufacture stock entry
manufacture_ste_doc1 = frappe.get_doc(make_stock_entry(wo_doc.name, "Manufacture", 1))
# Batch no & Serial Nos should be same as transferred Batch no & Serial Nos
batch_no = manufacture_ste_doc1.items[0].batch_no
self.assertEqual(
get_serial_nos(manufacture_ste_doc1.items[0].serial_no)[0], batch_dict.get(batch_no)[0]
)
self.assertEqual(manufacture_ste_doc1.items[0].qty, 1)
manufacture_ste_doc1.submit()
# Second Manufacture stock entry
manufacture_ste_doc2 = frappe.get_doc(make_stock_entry(wo_doc.name, "Manufacture", 2))
# Batch no & Serial Nos should be same as transferred Batch no & Serial Nos
batch_no = manufacture_ste_doc2.items[0].batch_no
self.assertEqual(
get_serial_nos(manufacture_ste_doc2.items[0].serial_no)[0], batch_dict.get(batch_no)[1]
)
self.assertEqual(manufacture_ste_doc2.items[0].qty, 1)
batch_no = manufacture_ste_doc2.items[1].batch_no
self.assertEqual(
get_serial_nos(manufacture_ste_doc2.items[1].serial_no)[0], batch_dict.get(batch_no)[0]
)
self.assertEqual(manufacture_ste_doc2.items[1].qty, 1)
def prepare_data_for_backflush_based_on_materials_transferred():
batch_item_doc = make_item(
"Test Batch MCC Keyboard",
{
"is_stock_item": 1,
"has_batch_no": 1,
"create_new_batch": 1,
"batch_number_series": "TBMK.#####",
"valuation_rate": 100,
"stock_uom": "Nos",
},
)
item = make_item(
"Test FG Item with Batch Raw Materials",
{
"is_stock_item": 1,
},
)
make_bom(item=item.name, source_warehouse="Stores - _TC", raw_materials=[batch_item_doc.name])
sn_item_doc = make_item(
"Test Serial No BTT Headphone",
{
"is_stock_item": 1,
"has_serial_no": 1,
"serial_no_series": "TSBH.#####",
"valuation_rate": 100,
"stock_uom": "Nos",
},
)
item = make_item(
"Test FG Item with Serial No Raw Materials",
{
"is_stock_item": 1,
},
)
make_bom(item=item.name, source_warehouse="Stores - _TC", raw_materials=[sn_item_doc.name])
sn_batch_item_doc = make_item(
"Test Batch Serial No WebCam",
{
"is_stock_item": 1,
"has_batch_no": 1,
"create_new_batch": 1,
"batch_number_series": "TBSW.#####",
"has_serial_no": 1,
"serial_no_series": "TBSWC.#####",
"valuation_rate": 100,
"stock_uom": "Nos",
},
)
item = make_item(
"Test FG Item with Serial & Batch No Raw Materials",
{
"is_stock_item": 1,
},
)
make_bom(item=item.name, source_warehouse="Stores - _TC", raw_materials=[sn_batch_item_doc.name])
def update_job_card(job_card, jc_qty=None):
employee = frappe.db.get_value("Employee", {"status": "Active"}, "name")

View File

@ -1,6 +1,6 @@
import frappe
from frappe import qb
from frappe.query_builder import Case
from frappe.query_builder import Case, CustomFunction
from frappe.query_builder.custom import ConstantColumn
from frappe.query_builder.functions import IfNull
@ -87,6 +87,7 @@ def execute():
gl = qb.DocType("GL Entry")
account = qb.DocType("Account")
ifelse = CustomFunction("IF", ["condition", "then", "else"])
gl_entries = (
qb.from_(gl)
@ -96,8 +97,12 @@ def execute():
gl.star,
ConstantColumn(1).as_("docstatus"),
account.account_type.as_("account_type"),
IfNull(gl.against_voucher_type, gl.voucher_type).as_("against_voucher_type"),
IfNull(gl.against_voucher, gl.voucher_no).as_("against_voucher_no"),
IfNull(
ifelse(gl.against_voucher_type == "", None, gl.against_voucher_type), gl.voucher_type
).as_("against_voucher_type"),
IfNull(ifelse(gl.against_voucher == "", None, gl.against_voucher), gl.voucher_no).as_(
"against_voucher_no"
),
# convert debit/credit to amount
Case()
.when(account.account_type == "Receivable", gl.debit - gl.credit)

View File

@ -55,6 +55,7 @@ frappe.query_reports["Sales Order Analysis"] = {
for (let option of status){
options.push({
"value": option,
"label": __(option),
"description": ""
})
}

View File

@ -596,21 +596,6 @@ class StockEntry(StockController):
title=_("Insufficient Stock"),
)
def set_serial_nos(self, work_order):
previous_se = frappe.db.get_value(
"Stock Entry",
{"work_order": work_order, "purpose": "Material Transfer for Manufacture"},
"name",
)
for d in self.get("items"):
transferred_serial_no = frappe.db.get_value(
"Stock Entry Detail", {"parent": previous_se, "item_code": d.item_code}, "serial_no"
)
if transferred_serial_no:
d.serial_no = transferred_serial_no
@frappe.whitelist()
def get_stock_and_rate(self):
"""
@ -1321,7 +1306,7 @@ class StockEntry(StockController):
and not self.pro_doc.skip_transfer
and self.flags.backflush_based_on == "Material Transferred for Manufacture"
):
self.get_transfered_raw_materials()
self.add_transfered_raw_materials_in_items()
elif (
self.work_order
@ -1365,7 +1350,6 @@ class StockEntry(StockController):
# fetch the serial_no of the first stock entry for the second stock entry
if self.work_order and self.purpose == "Manufacture":
self.set_serial_nos(self.work_order)
work_order = frappe.get_doc("Work Order", self.work_order)
add_additional_cost(self, work_order)
@ -1655,119 +1639,78 @@ class StockEntry(StockController):
}
)
def get_transfered_raw_materials(self):
transferred_materials = frappe.db.sql(
"""
select
item_name, original_item, item_code, sum(qty) as qty, sed.t_warehouse as warehouse,
description, stock_uom, expense_account, cost_center
from `tabStock Entry` se,`tabStock Entry Detail` sed
where
se.name = sed.parent and se.docstatus=1 and se.purpose='Material Transfer for Manufacture'
and se.work_order= %s and ifnull(sed.t_warehouse, '') != ''
group by sed.item_code, sed.t_warehouse
""",
def add_transfered_raw_materials_in_items(self) -> None:
available_materials = get_available_materials(self.work_order)
wo_data = frappe.db.get_value(
"Work Order",
self.work_order,
["qty", "produced_qty", "material_transferred_for_manufacturing as trans_qty"],
as_dict=1,
)
materials_already_backflushed = frappe.db.sql(
"""
select
item_code, sed.s_warehouse as warehouse, sum(qty) as qty
from
`tabStock Entry` se, `tabStock Entry Detail` sed
where
se.name = sed.parent and se.docstatus=1
and (se.purpose='Manufacture' or se.purpose='Material Consumption for Manufacture')
and se.work_order= %s and ifnull(sed.s_warehouse, '') != ''
group by sed.item_code, sed.s_warehouse
""",
self.work_order,
as_dict=1,
)
for key, row in available_materials.items():
remaining_qty_to_produce = flt(wo_data.trans_qty) - flt(wo_data.produced_qty)
if remaining_qty_to_produce <= 0:
continue
backflushed_materials = {}
for d in materials_already_backflushed:
backflushed_materials.setdefault(d.item_code, []).append({d.warehouse: d.qty})
po_qty = frappe.db.sql(
"""select qty, produced_qty, material_transferred_for_manufacturing from
`tabWork Order` where name=%s""",
self.work_order,
as_dict=1,
)[0]
manufacturing_qty = flt(po_qty.qty) or 1
produced_qty = flt(po_qty.produced_qty)
trans_qty = flt(po_qty.material_transferred_for_manufacturing) or 1
for item in transferred_materials:
qty = item.qty
item_code = item.original_item or item.item_code
req_items = frappe.get_all(
"Work Order Item",
filters={"parent": self.work_order, "item_code": item_code},
fields=["required_qty", "consumed_qty"],
)
req_qty = flt(req_items[0].required_qty) if req_items else flt(4)
req_qty_each = flt(req_qty / manufacturing_qty)
consumed_qty = flt(req_items[0].consumed_qty) if req_items else 0
if trans_qty and manufacturing_qty > (produced_qty + flt(self.fg_completed_qty)):
if qty >= req_qty:
qty = (req_qty / trans_qty) * flt(self.fg_completed_qty)
else:
qty = qty - consumed_qty
if self.purpose == "Manufacture":
# If Material Consumption is booked, must pull only remaining components to finish product
if consumed_qty != 0:
remaining_qty = consumed_qty - (produced_qty * req_qty_each)
exhaust_qty = req_qty_each * produced_qty
if remaining_qty > exhaust_qty:
if (remaining_qty / (req_qty_each * flt(self.fg_completed_qty))) >= 1:
qty = 0
else:
qty = (req_qty_each * flt(self.fg_completed_qty)) - remaining_qty
else:
if self.flags.backflush_based_on == "Material Transferred for Manufacture":
qty = (item.qty / trans_qty) * flt(self.fg_completed_qty)
else:
qty = req_qty_each * flt(self.fg_completed_qty)
elif backflushed_materials.get(item.item_code):
precision = frappe.get_precision("Stock Entry Detail", "qty")
for d in backflushed_materials.get(item.item_code):
if d.get(item.warehouse) > 0:
if qty > req_qty:
qty = (
(flt(qty, precision) - flt(d.get(item.warehouse), precision))
/ (flt(trans_qty, precision) - flt(produced_qty, precision))
) * flt(self.fg_completed_qty)
d[item.warehouse] -= qty
qty = (flt(row.qty) * flt(self.fg_completed_qty)) / remaining_qty_to_produce
item = row.item_details
if cint(frappe.get_cached_value("UOM", item.stock_uom, "must_be_whole_number")):
qty = frappe.utils.ceil(qty)
if qty > 0:
self.add_to_stock_entry_detail(
{
item.item_code: {
"from_warehouse": item.warehouse,
"to_warehouse": "",
"qty": qty,
"item_name": item.item_name,
"description": item.description,
"stock_uom": item.stock_uom,
"expense_account": item.expense_account,
"cost_center": item.buying_cost_center,
"original_item": item.original_item,
}
}
)
if row.batch_details:
for batch_no, batch_qty in row.batch_details.items():
if qty <= 0 or batch_qty <= 0:
continue
if batch_qty > qty:
batch_qty = qty
item.batch_no = batch_no
self.update_item_in_stock_entry_detail(row, item, batch_qty)
row.batch_details[batch_no] -= batch_qty
qty -= batch_qty
else:
self.update_item_in_stock_entry_detail(row, item, qty)
def update_item_in_stock_entry_detail(self, row, item, qty) -> None:
ste_item_details = {
"from_warehouse": item.warehouse,
"to_warehouse": "",
"qty": qty,
"item_name": item.item_name,
"batch_no": item.batch_no,
"description": item.description,
"stock_uom": item.stock_uom,
"expense_account": item.expense_account,
"cost_center": item.buying_cost_center,
"original_item": item.original_item,
}
if row.serial_nos:
serial_nos = row.serial_nos
if item.batch_no:
serial_nos = self.get_serial_nos_based_on_transferred_batch(item.batch_no, row.serial_nos)
serial_nos = serial_nos[0 : cint(qty)]
ste_item_details["serial_no"] = "\n".join(serial_nos)
# remove consumed serial nos from list
for sn in serial_nos:
row.serial_nos.remove(sn)
self.add_to_stock_entry_detail({item.item_code: ste_item_details})
@staticmethod
def get_serial_nos_based_on_transferred_batch(batch_no, serial_nos) -> list:
serial_nos = frappe.get_all(
"Serial No", filters={"batch_no": batch_no, "name": ("in", serial_nos)}, order_by="creation"
)
return [d.name for d in serial_nos]
def get_pending_raw_materials(self, backflush_based_on=None):
"""
@ -2528,3 +2471,81 @@ def get_supplied_items(purchase_order):
)
return supplied_item_details
def get_available_materials(work_order) -> dict:
data = get_stock_entry_data(work_order)
available_materials = {}
for row in data:
key = (row.item_code, row.warehouse)
if row.purpose != "Material Transfer for Manufacture":
key = (row.item_code, row.s_warehouse)
if key not in available_materials:
available_materials.setdefault(
key,
frappe._dict(
{"item_details": row, "batch_details": defaultdict(float), "qty": 0, "serial_nos": []}
),
)
item_data = available_materials[key]
if row.purpose == "Material Transfer for Manufacture":
item_data.qty += row.qty
if row.batch_no:
item_data.batch_details[row.batch_no] += row.qty
if row.serial_no:
item_data.serial_nos.extend(get_serial_nos(row.serial_no))
item_data.serial_nos.sort()
else:
# Consume raw material qty in case of 'Manufacture' or 'Material Consumption for Manufacture'
item_data.qty -= row.qty
if row.batch_no:
item_data.batch_details[row.batch_no] -= row.qty
if row.serial_no:
for serial_no in get_serial_nos(row.serial_no):
item_data.serial_nos.remove(serial_no)
return available_materials
def get_stock_entry_data(work_order):
stock_entry = frappe.qb.DocType("Stock Entry")
stock_entry_detail = frappe.qb.DocType("Stock Entry Detail")
return (
frappe.qb.from_(stock_entry)
.from_(stock_entry_detail)
.select(
stock_entry_detail.item_name,
stock_entry_detail.original_item,
stock_entry_detail.item_code,
stock_entry_detail.qty,
(stock_entry_detail.t_warehouse).as_("warehouse"),
(stock_entry_detail.s_warehouse).as_("s_warehouse"),
stock_entry_detail.description,
stock_entry_detail.stock_uom,
stock_entry_detail.expense_account,
stock_entry_detail.cost_center,
stock_entry_detail.batch_no,
stock_entry_detail.serial_no,
stock_entry.purpose,
)
.where(
(stock_entry.name == stock_entry_detail.parent)
& (stock_entry.work_order == work_order)
& (stock_entry.docstatus == 1)
& (stock_entry_detail.s_warehouse.isnotnull())
& (
stock_entry.purpose.isin(
["Manufacture", "Material Consumption for Manufacture", "Material Transfer for Manufacture"]
)
)
)
.orderby(stock_entry.creation, stock_entry_detail.item_code, stock_entry_detail.idx)
).run(as_dict=1)