470 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			470 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import copy
 | |
| from collections import defaultdict
 | |
| 
 | |
| import frappe
 | |
| from frappe import _
 | |
| from frappe.utils import cint, flt, get_link_to_form
 | |
| 
 | |
| from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
 | |
| 
 | |
| 
 | |
| class Subcontracting:
 | |
| 	def set_materials_for_subcontracted_items(self, raw_material_table):
 | |
| 		if self.doctype == "Purchase Invoice" and not self.update_stock:
 | |
| 			return
 | |
| 
 | |
| 		self.raw_material_table = raw_material_table
 | |
| 		self.__identify_change_in_item_table()
 | |
| 		self.__prepare_supplied_items()
 | |
| 		self.__validate_supplied_items()
 | |
| 
 | |
| 	def __prepare_supplied_items(self):
 | |
| 		self.initialized_fields()
 | |
| 		self.__get_purchase_orders()
 | |
| 		self.__get_pending_qty_to_receive()
 | |
| 		self.get_available_materials()
 | |
| 		self.__remove_changed_rows()
 | |
| 		self.__set_supplied_items()
 | |
| 
 | |
| 	def initialized_fields(self):
 | |
| 		self.available_materials = frappe._dict()
 | |
| 		self.__transferred_items = frappe._dict()
 | |
| 		self.alternative_item_details = frappe._dict()
 | |
| 		self.__get_backflush_based_on()
 | |
| 
 | |
| 	def __get_backflush_based_on(self):
 | |
| 		self.backflush_based_on = frappe.db.get_single_value(
 | |
| 			"Buying Settings", "backflush_raw_materials_of_subcontract_based_on"
 | |
| 		)
 | |
| 
 | |
| 	def __get_purchase_orders(self):
 | |
| 		self.purchase_orders = []
 | |
| 
 | |
| 		if self.doctype == "Purchase Order":
 | |
| 			return
 | |
| 
 | |
| 		self.purchase_orders = [d.purchase_order for d in self.items if d.purchase_order]
 | |
| 
 | |
| 	def __identify_change_in_item_table(self):
 | |
| 		self.__changed_name = []
 | |
| 		self.__reference_name = []
 | |
| 
 | |
| 		if self.doctype == "Purchase Order" or self.is_new():
 | |
| 			self.set(self.raw_material_table, [])
 | |
| 			return
 | |
| 
 | |
| 		item_dict = self.__get_data_before_save()
 | |
| 		if not item_dict:
 | |
| 			return True
 | |
| 
 | |
| 		for n_row in self.items:
 | |
| 			self.__reference_name.append(n_row.name)
 | |
| 			if (n_row.name not in item_dict) or (n_row.item_code, n_row.qty) != item_dict[n_row.name]:
 | |
| 				self.__changed_name.append(n_row.name)
 | |
| 
 | |
| 			if item_dict.get(n_row.name):
 | |
| 				del item_dict[n_row.name]
 | |
| 
 | |
| 		self.__changed_name.extend(item_dict.keys())
 | |
| 
 | |
| 	def __get_data_before_save(self):
 | |
| 		item_dict = {}
 | |
| 		if self.doctype in ["Purchase Receipt", "Purchase Invoice"] and self._doc_before_save:
 | |
| 			for row in self._doc_before_save.get("items"):
 | |
| 				item_dict[row.name] = (row.item_code, row.qty)
 | |
| 
 | |
| 		return item_dict
 | |
| 
 | |
| 	def get_available_materials(self):
 | |
| 		"""Get the available raw materials which has been transferred to the supplier.
 | |
| 		available_materials = {
 | |
| 		        (item_code, subcontracted_item, purchase_order): {
 | |
| 		                'qty': 1, 'serial_no': [ABC], 'batch_no': {'batch1': 1}, 'data': item_details
 | |
| 		        }
 | |
| 		}
 | |
| 		"""
 | |
| 		if not self.purchase_orders:
 | |
| 			return
 | |
| 
 | |
| 		for row in self.__get_transferred_items():
 | |
| 			key = (row.rm_item_code, row.main_item_code, row.purchase_order)
 | |
| 
 | |
| 			if key not in self.available_materials:
 | |
| 				self.available_materials.setdefault(
 | |
| 					key,
 | |
| 					frappe._dict(
 | |
| 						{
 | |
| 							"qty": 0,
 | |
| 							"serial_no": [],
 | |
| 							"batch_no": defaultdict(float),
 | |
| 							"item_details": row,
 | |
| 							"po_details": [],
 | |
| 						}
 | |
| 					),
 | |
| 				)
 | |
| 
 | |
| 			details = self.available_materials[key]
 | |
| 			details.qty += row.qty
 | |
| 			details.po_details.append(row.po_detail)
 | |
| 
 | |
| 			if row.serial_no:
 | |
| 				details.serial_no.extend(get_serial_nos(row.serial_no))
 | |
| 
 | |
| 			if row.batch_no:
 | |
| 				details.batch_no[row.batch_no] += row.qty
 | |
| 
 | |
| 			self.__set_alternative_item_details(row)
 | |
| 
 | |
| 		self.__transferred_items = copy.deepcopy(self.available_materials)
 | |
| 		for doctype in ["Purchase Receipt", "Purchase Invoice"]:
 | |
| 			self.__update_consumed_materials(doctype)
 | |
| 
 | |
| 	def __update_consumed_materials(self, doctype, return_consumed_items=False):
 | |
| 		"""Deduct the consumed materials from the available materials."""
 | |
| 
 | |
| 		pr_items = self.__get_received_items(doctype)
 | |
| 		if not pr_items:
 | |
| 			return ([], {}) if return_consumed_items else None
 | |
| 
 | |
| 		pr_items = {d.name: d.get(self.get("po_field") or "purchase_order") for d in pr_items}
 | |
| 		consumed_materials = self.__get_consumed_items(doctype, pr_items.keys())
 | |
| 
 | |
| 		if return_consumed_items:
 | |
| 			return (consumed_materials, pr_items)
 | |
| 
 | |
| 		for row in consumed_materials:
 | |
| 			key = (row.rm_item_code, row.main_item_code, pr_items.get(row.reference_name))
 | |
| 			if not self.available_materials.get(key):
 | |
| 				continue
 | |
| 
 | |
| 			self.available_materials[key]["qty"] -= row.consumed_qty
 | |
| 			if row.serial_no:
 | |
| 				self.available_materials[key]["serial_no"] = list(
 | |
| 					set(self.available_materials[key]["serial_no"]) - set(get_serial_nos(row.serial_no))
 | |
| 				)
 | |
| 
 | |
| 			if row.batch_no:
 | |
| 				self.available_materials[key]["batch_no"][row.batch_no] -= row.consumed_qty
 | |
| 
 | |
| 	def __get_transferred_items(self):
 | |
| 		fields = ["`tabStock Entry`.`purchase_order`"]
 | |
| 		alias_dict = {
 | |
| 			"item_code": "rm_item_code",
 | |
| 			"subcontracted_item": "main_item_code",
 | |
| 			"basic_rate": "rate",
 | |
| 		}
 | |
| 
 | |
| 		child_table_fields = [
 | |
| 			"item_code",
 | |
| 			"item_name",
 | |
| 			"description",
 | |
| 			"qty",
 | |
| 			"basic_rate",
 | |
| 			"amount",
 | |
| 			"serial_no",
 | |
| 			"uom",
 | |
| 			"subcontracted_item",
 | |
| 			"stock_uom",
 | |
| 			"batch_no",
 | |
| 			"conversion_factor",
 | |
| 			"s_warehouse",
 | |
| 			"t_warehouse",
 | |
| 			"item_group",
 | |
| 			"po_detail",
 | |
| 		]
 | |
| 
 | |
| 		if self.backflush_based_on == "BOM":
 | |
| 			child_table_fields.append("original_item")
 | |
| 
 | |
| 		for field in child_table_fields:
 | |
| 			fields.append(f"`tabStock Entry Detail`.`{field}` As {alias_dict.get(field, field)}")
 | |
| 
 | |
| 		filters = [
 | |
| 			["Stock Entry", "docstatus", "=", 1],
 | |
| 			["Stock Entry", "purpose", "=", "Send to Subcontractor"],
 | |
| 			["Stock Entry", "purchase_order", "in", self.purchase_orders],
 | |
| 		]
 | |
| 
 | |
| 		return frappe.get_all("Stock Entry", fields=fields, filters=filters)
 | |
| 
 | |
| 	def __get_received_items(self, doctype):
 | |
| 		fields = []
 | |
| 		self.po_field = "purchase_order"
 | |
| 
 | |
| 		for field in ["name", self.po_field, "parent"]:
 | |
| 			fields.append(f"`tab{doctype} Item`.`{field}`")
 | |
| 
 | |
| 		filters = [
 | |
| 			[doctype, "docstatus", "=", 1],
 | |
| 			[f"{doctype} Item", self.po_field, "in", self.purchase_orders],
 | |
| 		]
 | |
| 		if doctype == "Purchase Invoice":
 | |
| 			filters.append(["Purchase Invoice", "update_stock", "=", 1])
 | |
| 
 | |
| 		return frappe.get_all(f"{doctype}", fields=fields, filters=filters)
 | |
| 
 | |
| 	def __get_consumed_items(self, doctype, pr_items):
 | |
| 		return frappe.get_all(
 | |
| 			"Purchase Receipt Item Supplied",
 | |
| 			fields=[
 | |
| 				"serial_no",
 | |
| 				"rm_item_code",
 | |
| 				"reference_name",
 | |
| 				"batch_no",
 | |
| 				"consumed_qty",
 | |
| 				"main_item_code",
 | |
| 			],
 | |
| 			filters={"docstatus": 1, "reference_name": ("in", list(pr_items)), "parenttype": doctype},
 | |
| 		)
 | |
| 
 | |
| 	def __set_alternative_item_details(self, row):
 | |
| 		if row.get("original_item"):
 | |
| 			self.alternative_item_details[row.get("original_item")] = row
 | |
| 
 | |
| 	def __get_pending_qty_to_receive(self):
 | |
| 		"""Get qty to be received against the purchase order."""
 | |
| 
 | |
| 		self.qty_to_be_received = defaultdict(float)
 | |
| 
 | |
| 		if (
 | |
| 			self.doctype != "Purchase Order" and self.backflush_based_on != "BOM" and self.purchase_orders
 | |
| 		):
 | |
| 			for row in frappe.get_all(
 | |
| 				"Purchase Order Item",
 | |
| 				fields=["item_code", "(qty - received_qty) as qty", "parent", "name"],
 | |
| 				filters={"docstatus": 1, "parent": ("in", self.purchase_orders)},
 | |
| 			):
 | |
| 
 | |
| 				self.qty_to_be_received[(row.item_code, row.parent)] += row.qty
 | |
| 
 | |
| 	def __get_materials_from_bom(self, item_code, bom_no, exploded_item=0):
 | |
| 		doctype = "BOM Item" if not exploded_item else "BOM Explosion Item"
 | |
| 		fields = [f"`tab{doctype}`.`stock_qty` / `tabBOM`.`quantity` as qty_consumed_per_unit"]
 | |
| 
 | |
| 		alias_dict = {
 | |
| 			"item_code": "rm_item_code",
 | |
| 			"name": "bom_detail_no",
 | |
| 			"source_warehouse": "reserve_warehouse",
 | |
| 		}
 | |
| 		for field in [
 | |
| 			"item_code",
 | |
| 			"name",
 | |
| 			"rate",
 | |
| 			"stock_uom",
 | |
| 			"source_warehouse",
 | |
| 			"description",
 | |
| 			"item_name",
 | |
| 			"stock_uom",
 | |
| 		]:
 | |
| 			fields.append(f"`tab{doctype}`.`{field}` As {alias_dict.get(field, field)}")
 | |
| 
 | |
| 		filters = [
 | |
| 			[doctype, "parent", "=", bom_no],
 | |
| 			[doctype, "docstatus", "=", 1],
 | |
| 			["BOM", "item", "=", item_code],
 | |
| 			[doctype, "sourced_by_supplier", "=", 0],
 | |
| 		]
 | |
| 
 | |
| 		return (
 | |
| 			frappe.get_all("BOM", fields=fields, filters=filters, order_by=f"`tab{doctype}`.`idx`") or []
 | |
| 		)
 | |
| 
 | |
| 	def __remove_changed_rows(self):
 | |
| 		if not self.__changed_name:
 | |
| 			return
 | |
| 
 | |
| 		i = 1
 | |
| 		self.set(self.raw_material_table, [])
 | |
| 		for d in self._doc_before_save.supplied_items:
 | |
| 			if d.reference_name in self.__changed_name:
 | |
| 				continue
 | |
| 
 | |
| 			if d.reference_name not in self.__reference_name:
 | |
| 				continue
 | |
| 
 | |
| 			d.idx = i
 | |
| 			self.append("supplied_items", d)
 | |
| 
 | |
| 			i += 1
 | |
| 
 | |
| 	def __set_supplied_items(self):
 | |
| 		self.bom_items = {}
 | |
| 
 | |
| 		has_supplied_items = True if self.get(self.raw_material_table) else False
 | |
| 		for row in self.items:
 | |
| 			if self.doctype != "Purchase Order" and (
 | |
| 				(self.__changed_name and row.name not in self.__changed_name)
 | |
| 				or (has_supplied_items and not self.__changed_name)
 | |
| 			):
 | |
| 				continue
 | |
| 
 | |
| 			if self.doctype == "Purchase Order" or self.backflush_based_on == "BOM":
 | |
| 				for bom_item in self.__get_materials_from_bom(
 | |
| 					row.item_code, row.bom, row.get("include_exploded_items")
 | |
| 				):
 | |
| 					qty = flt(bom_item.qty_consumed_per_unit) * flt(row.qty) * row.conversion_factor
 | |
| 					bom_item.main_item_code = row.item_code
 | |
| 					self.__update_reserve_warehouse(bom_item, row)
 | |
| 					self.__set_alternative_item(bom_item)
 | |
| 					self.__add_supplied_item(row, bom_item, qty)
 | |
| 
 | |
| 			elif self.backflush_based_on != "BOM":
 | |
| 				for key, transfer_item in self.available_materials.items():
 | |
| 					if (key[1], key[2]) == (row.item_code, row.purchase_order) and transfer_item.qty > 0:
 | |
| 						qty = self.__get_qty_based_on_material_transfer(row, transfer_item) or 0
 | |
| 						transfer_item.qty -= qty
 | |
| 						self.__add_supplied_item(row, transfer_item.get("item_details"), qty)
 | |
| 
 | |
| 				if self.qty_to_be_received:
 | |
| 					self.qty_to_be_received[(row.item_code, row.purchase_order)] -= row.qty
 | |
| 
 | |
| 	def __update_reserve_warehouse(self, row, item):
 | |
| 		if self.doctype == "Purchase Order":
 | |
| 			row.reserve_warehouse = self.set_reserve_warehouse or item.warehouse
 | |
| 
 | |
| 	def __get_qty_based_on_material_transfer(self, item_row, transfer_item):
 | |
| 		key = (item_row.item_code, item_row.purchase_order)
 | |
| 
 | |
| 		if self.qty_to_be_received == item_row.qty:
 | |
| 			return transfer_item.qty
 | |
| 
 | |
| 		if self.qty_to_be_received:
 | |
| 			qty = (flt(item_row.qty) * flt(transfer_item.qty)) / flt(self.qty_to_be_received.get(key, 0))
 | |
| 			transfer_item.item_details.required_qty = transfer_item.qty
 | |
| 
 | |
| 			if transfer_item.serial_no or frappe.get_cached_value(
 | |
| 				"UOM", transfer_item.item_details.stock_uom, "must_be_whole_number"
 | |
| 			):
 | |
| 				return frappe.utils.ceil(qty)
 | |
| 
 | |
| 			return qty
 | |
| 
 | |
| 	def __set_alternative_item(self, bom_item):
 | |
| 		if self.alternative_item_details.get(bom_item.rm_item_code):
 | |
| 			bom_item.update(self.alternative_item_details[bom_item.rm_item_code])
 | |
| 
 | |
| 	def __add_supplied_item(self, item_row, bom_item, qty):
 | |
| 		bom_item.conversion_factor = item_row.conversion_factor
 | |
| 		rm_obj = self.append(self.raw_material_table, bom_item)
 | |
| 		rm_obj.reference_name = item_row.name
 | |
| 
 | |
| 		if self.doctype == "Purchase Order":
 | |
| 			rm_obj.required_qty = qty
 | |
| 		else:
 | |
| 			rm_obj.consumed_qty = 0
 | |
| 			rm_obj.purchase_order = item_row.purchase_order
 | |
| 			self.__set_batch_nos(bom_item, item_row, rm_obj, qty)
 | |
| 
 | |
| 	def __set_batch_nos(self, bom_item, item_row, rm_obj, qty):
 | |
| 		key = (rm_obj.rm_item_code, item_row.item_code, item_row.purchase_order)
 | |
| 
 | |
| 		if self.available_materials.get(key) and self.available_materials[key]["batch_no"]:
 | |
| 			new_rm_obj = None
 | |
| 			for batch_no, batch_qty in self.available_materials[key]["batch_no"].items():
 | |
| 				if batch_qty >= qty:
 | |
| 					self.__set_batch_no_as_per_qty(item_row, rm_obj, batch_no, qty)
 | |
| 					self.available_materials[key]["batch_no"][batch_no] -= qty
 | |
| 					return
 | |
| 
 | |
| 				elif qty > 0 and batch_qty > 0:
 | |
| 					qty -= batch_qty
 | |
| 					new_rm_obj = self.append(self.raw_material_table, bom_item)
 | |
| 					new_rm_obj.reference_name = item_row.name
 | |
| 					self.__set_batch_no_as_per_qty(item_row, new_rm_obj, batch_no, batch_qty)
 | |
| 					self.available_materials[key]["batch_no"][batch_no] = 0
 | |
| 
 | |
| 			if abs(qty) > 0 and not new_rm_obj:
 | |
| 				self.__set_consumed_qty(rm_obj, qty)
 | |
| 		else:
 | |
| 			self.__set_consumed_qty(rm_obj, qty, bom_item.required_qty or qty)
 | |
| 			self.__set_serial_nos(item_row, rm_obj)
 | |
| 
 | |
| 	def __set_consumed_qty(self, rm_obj, consumed_qty, required_qty=0):
 | |
| 		rm_obj.required_qty = required_qty
 | |
| 		rm_obj.consumed_qty = consumed_qty
 | |
| 
 | |
| 	def __set_batch_no_as_per_qty(self, item_row, rm_obj, batch_no, qty):
 | |
| 		rm_obj.update(
 | |
| 			{
 | |
| 				"consumed_qty": qty,
 | |
| 				"batch_no": batch_no,
 | |
| 				"required_qty": qty,
 | |
| 				"purchase_order": item_row.purchase_order,
 | |
| 			}
 | |
| 		)
 | |
| 
 | |
| 		self.__set_serial_nos(item_row, rm_obj)
 | |
| 
 | |
| 	def __set_serial_nos(self, item_row, rm_obj):
 | |
| 		key = (rm_obj.rm_item_code, item_row.item_code, item_row.purchase_order)
 | |
| 		if self.available_materials.get(key) and self.available_materials[key]["serial_no"]:
 | |
| 			used_serial_nos = self.available_materials[key]["serial_no"][0 : cint(rm_obj.consumed_qty)]
 | |
| 			rm_obj.serial_no = "\n".join(used_serial_nos)
 | |
| 
 | |
| 			# Removed the used serial nos from the list
 | |
| 			for sn in used_serial_nos:
 | |
| 				self.available_materials[key]["serial_no"].remove(sn)
 | |
| 
 | |
| 	def set_consumed_qty_in_po(self):
 | |
| 		# Update consumed qty back in the purchase order
 | |
| 		if not self.is_subcontracted:
 | |
| 			return
 | |
| 
 | |
| 		self.__get_purchase_orders()
 | |
| 		itemwise_consumed_qty = defaultdict(float)
 | |
| 		for doctype in ["Purchase Receipt", "Purchase Invoice"]:
 | |
| 			consumed_items, pr_items = self.__update_consumed_materials(doctype, return_consumed_items=True)
 | |
| 
 | |
| 			for row in consumed_items:
 | |
| 				key = (row.rm_item_code, row.main_item_code, pr_items.get(row.reference_name))
 | |
| 				itemwise_consumed_qty[key] += row.consumed_qty
 | |
| 
 | |
| 		self.__update_consumed_qty_in_po(itemwise_consumed_qty)
 | |
| 
 | |
| 	def __update_consumed_qty_in_po(self, itemwise_consumed_qty):
 | |
| 		fields = ["main_item_code", "rm_item_code", "parent", "supplied_qty", "name"]
 | |
| 		filters = {"docstatus": 1, "parent": ("in", self.purchase_orders)}
 | |
| 
 | |
| 		for row in frappe.get_all(
 | |
| 			"Purchase Order Item Supplied", fields=fields, filters=filters, order_by="idx"
 | |
| 		):
 | |
| 			key = (row.rm_item_code, row.main_item_code, row.parent)
 | |
| 			consumed_qty = itemwise_consumed_qty.get(key, 0)
 | |
| 
 | |
| 			if row.supplied_qty < consumed_qty:
 | |
| 				consumed_qty = row.supplied_qty
 | |
| 
 | |
| 			itemwise_consumed_qty[key] -= consumed_qty
 | |
| 			frappe.db.set_value("Purchase Order Item Supplied", row.name, "consumed_qty", consumed_qty)
 | |
| 
 | |
| 	def __validate_supplied_items(self):
 | |
| 		if self.doctype not in ["Purchase Invoice", "Purchase Receipt"]:
 | |
| 			return
 | |
| 
 | |
| 		for row in self.get(self.raw_material_table):
 | |
| 			key = (row.rm_item_code, row.main_item_code, row.purchase_order)
 | |
| 			if not self.__transferred_items or not self.__transferred_items.get(key):
 | |
| 				return
 | |
| 
 | |
| 			self.__validate_batch_no(row, key)
 | |
| 			self.__validate_serial_no(row, key)
 | |
| 
 | |
| 	def __validate_batch_no(self, row, key):
 | |
| 		if row.get("batch_no") and row.get("batch_no") not in self.__transferred_items.get(key).get(
 | |
| 			"batch_no"
 | |
| 		):
 | |
| 			link = get_link_to_form("Purchase Order", row.purchase_order)
 | |
| 			msg = f'The Batch No {frappe.bold(row.get("batch_no"))} has not supplied against the Purchase Order {link}'
 | |
| 			frappe.throw(_(msg), title=_("Incorrect Batch Consumed"))
 | |
| 
 | |
| 	def __validate_serial_no(self, row, key):
 | |
| 		if row.get("serial_no"):
 | |
| 			serial_nos = get_serial_nos(row.get("serial_no"))
 | |
| 			incorrect_sn = set(serial_nos).difference(self.__transferred_items.get(key).get("serial_no"))
 | |
| 
 | |
| 			if incorrect_sn:
 | |
| 				incorrect_sn = "\n".join(incorrect_sn)
 | |
| 				link = get_link_to_form("Purchase Order", row.purchase_order)
 | |
| 				msg = f"The Serial Nos {incorrect_sn} has not supplied against the Purchase Order {link}"
 | |
| 				frappe.throw(_(msg), title=_("Incorrect Serial Number Consumed"))
 |