from typing import Callable, List, NewType, Optional, Tuple from frappe.utils import flt FifoBin = NewType("FifoBin", List[float]) # Indexes of values inside FIFO bin 2-tuple QTY = 0 RATE = 1 class FifoValuation: """Valuation method where a queue of all the incoming stock is maintained. New stock is added at end of the queue. Qty consumption happens on First In First Out basis. Queue is implemented using "bins" of [qty, rate]. ref: https://en.wikipedia.org/wiki/FIFO_and_LIFO_accounting """ def __init__(self, state: Optional[List[FifoBin]]): self.queue: List[FifoBin] = state if state is not None else [] def __repr__(self): return str(self.queue) def __iter__(self): return iter(self.queue) def __eq__(self, other): if isinstance(other, list): return self.queue == other return self.queue == other.queue def get_state(self) -> List[FifoBin]: """Get current state of queue.""" return self.queue def get_total_stock_and_value(self) -> Tuple[float, float]: total_qty = 0.0 total_value = 0.0 for qty, rate in self.queue: total_qty += flt(qty) total_value += flt(qty) * flt(rate) return _round_off_if_near_zero(total_qty), _round_off_if_near_zero(total_value) def add_stock(self, qty: float, rate: float) -> None: """Update fifo queue with new stock. args: qty: new quantity to add rate: incoming rate of new quantity""" if not len(self.queue): self.queue.append([0, 0]) # last row has the same rate, merge new bin. if self.queue[-1][RATE] == rate: self.queue[-1][QTY] += qty else: # Item has a positive balance qty, add new entry if self.queue[-1][QTY] > 0: self.queue.append([qty, rate]) else: # negative balance qty qty = self.queue[-1][QTY] + qty if qty > 0: # new balance qty is positive self.queue[-1] = [qty, rate] else: # new balance qty is still negative, maintain same rate self.queue[-1][QTY] = qty def remove_stock( self, qty: float, rate: float, rate_generator: Callable[[], float] ) -> List[FifoBin]: """Remove stock from the queue and return popped bins. args: qty: quantity to remove rate: outgoing rate rate_generator: function to be called if queue is not found and rate is required. """ consumed_bins = [] while qty: if not len(self.queue): # rely on rate generator. self.queue.append([0, rate_generator()]) index = None if rate > 0: # Find the entry where rate matched with outgoing rate for idx, fifo_bin in enumerate(self.queue): if fifo_bin[RATE] == rate: index = idx break # If no entry found with outgoing rate, collapse stack if index is None: # nosemgrep new_stock_value = sum(d[QTY] * d[RATE] for d in self.queue) - qty * rate new_stock_qty = sum(d[QTY] for d in self.queue) - qty self.queue = [[new_stock_qty, new_stock_value / new_stock_qty if new_stock_qty > 0 else rate]] break else: index = 0 # select first bin or the bin with same rate fifo_bin = self.queue[index] if qty >= fifo_bin[QTY]: # consume current bin qty = _round_off_if_near_zero(qty - fifo_bin[QTY]) to_consume = self.queue.pop(index) consumed_bins.append(list(to_consume)) if not self.queue and qty: # stock finished, qty still remains to be withdrawn # negative stock, keep in as a negative bin self.queue.append([-qty, rate or fifo_bin[RATE]]) consumed_bins.append([qty, rate or fifo_bin[RATE]]) break else: # qty found in current bin consume it and exit fifo_bin[QTY] = _round_off_if_near_zero(fifo_bin[QTY] - qty) consumed_bins.append([qty, fifo_bin[RATE]]) qty = 0 return consumed_bins def _round_off_if_near_zero(number: float, precision: int = 7) -> float: """Rounds off the number to zero only if number is close to zero for decimal specified in precision. Precision defaults to 7. """ if abs(0.0 - flt(number)) < (1.0 / (10 ** precision)): return 0.0 return flt(number)