265 lines
7.6 KiB
Python
265 lines
7.6 KiB
Python
from abc import ABC, abstractmethod, abstractproperty
|
|
from typing import Callable, List, NewType, Optional, Tuple
|
|
|
|
from frappe.utils import flt
|
|
|
|
StockBin = NewType("StockBin", List[float]) # [[qty, rate], ...]
|
|
|
|
# Indexes of values inside FIFO bin 2-tuple
|
|
QTY = 0
|
|
RATE = 1
|
|
|
|
|
|
class BinWiseValuation(ABC):
|
|
@abstractmethod
|
|
def add_stock(self, qty: float, rate: float) -> None:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def remove_stock(
|
|
self, qty: float, outgoing_rate: float = 0.0, rate_generator: Callable[[], float] = None
|
|
) -> List[StockBin]:
|
|
pass
|
|
|
|
@abstractproperty
|
|
def state(self) -> List[StockBin]:
|
|
pass
|
|
|
|
def get_total_stock_and_value(self) -> Tuple[float, float]:
|
|
total_qty = 0.0
|
|
total_value = 0.0
|
|
|
|
for qty, rate in self.state:
|
|
total_qty += flt(qty)
|
|
total_value += flt(qty) * flt(rate)
|
|
|
|
return round_off_if_near_zero(total_qty), round_off_if_near_zero(total_value)
|
|
|
|
def __repr__(self):
|
|
return str(self.state)
|
|
|
|
def __iter__(self):
|
|
return iter(self.state)
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, list):
|
|
return self.state == other
|
|
return type(self) == type(other) and self.state == other.state
|
|
|
|
|
|
class FIFOValuation(BinWiseValuation):
|
|
"""Valuation method where a queue of all the incoming stock is maintained.
|
|
|
|
New stock is added at end of the queue.
|
|
Qty consumption happens on First In First Out basis.
|
|
|
|
Queue is implemented using "bins" of [qty, rate].
|
|
|
|
ref: https://en.wikipedia.org/wiki/FIFO_and_LIFO_accounting
|
|
"""
|
|
|
|
# specifying the attributes to save resources
|
|
# ref: https://docs.python.org/3/reference/datamodel.html#slots
|
|
__slots__ = [
|
|
"queue",
|
|
]
|
|
|
|
def __init__(self, state: Optional[List[StockBin]]):
|
|
self.queue: List[StockBin] = state if state is not None else []
|
|
|
|
@property
|
|
def state(self) -> List[StockBin]:
|
|
"""Get current state of queue."""
|
|
return self.queue
|
|
|
|
def add_stock(self, qty: float, rate: float) -> None:
|
|
"""Update fifo queue with new stock.
|
|
|
|
args:
|
|
qty: new quantity to add
|
|
rate: incoming rate of new quantity"""
|
|
|
|
if not len(self.queue):
|
|
self.queue.append([0, 0])
|
|
|
|
# last row has the same rate, merge new bin.
|
|
if self.queue[-1][RATE] == rate:
|
|
self.queue[-1][QTY] += qty
|
|
else:
|
|
# Item has a positive balance qty, add new entry
|
|
if self.queue[-1][QTY] > 0:
|
|
self.queue.append([qty, rate])
|
|
else: # negative balance qty
|
|
qty = self.queue[-1][QTY] + qty
|
|
if qty > 0: # new balance qty is positive
|
|
self.queue[-1] = [qty, rate]
|
|
else: # new balance qty is still negative, maintain same rate
|
|
self.queue[-1][QTY] = qty
|
|
|
|
def remove_stock(
|
|
self, qty: float, outgoing_rate: float = 0.0, rate_generator: Callable[[], float] = None
|
|
) -> List[StockBin]:
|
|
"""Remove stock from the queue and return popped bins.
|
|
|
|
args:
|
|
qty: quantity to remove
|
|
rate: outgoing rate
|
|
rate_generator: function to be called if queue is not found and rate is required.
|
|
"""
|
|
if not rate_generator:
|
|
rate_generator = lambda: 0.0 # noqa
|
|
|
|
consumed_bins = []
|
|
while qty:
|
|
if not len(self.queue):
|
|
# rely on rate generator.
|
|
self.queue.append([0, rate_generator()])
|
|
|
|
index = None
|
|
if outgoing_rate > 0:
|
|
# Find the entry where rate matched with outgoing rate
|
|
for idx, fifo_bin in enumerate(self.queue):
|
|
if fifo_bin[RATE] == outgoing_rate:
|
|
index = idx
|
|
break
|
|
|
|
# If no entry found with outgoing rate, collapse queue
|
|
if index is None: # nosemgrep
|
|
new_stock_value = sum(d[QTY] * d[RATE] for d in self.queue) - qty * outgoing_rate
|
|
new_stock_qty = sum(d[QTY] for d in self.queue) - qty
|
|
self.queue = [
|
|
[new_stock_qty, new_stock_value / new_stock_qty if new_stock_qty > 0 else outgoing_rate]
|
|
]
|
|
consumed_bins.append([qty, outgoing_rate])
|
|
break
|
|
else:
|
|
index = 0
|
|
|
|
# select first bin or the bin with same rate
|
|
fifo_bin = self.queue[index]
|
|
if qty >= fifo_bin[QTY]:
|
|
# consume current bin
|
|
qty = round_off_if_near_zero(qty - fifo_bin[QTY])
|
|
to_consume = self.queue.pop(index)
|
|
consumed_bins.append(list(to_consume))
|
|
|
|
if not self.queue and qty:
|
|
# stock finished, qty still remains to be withdrawn
|
|
# negative stock, keep in as a negative bin
|
|
self.queue.append([-qty, outgoing_rate or fifo_bin[RATE]])
|
|
consumed_bins.append([qty, outgoing_rate or fifo_bin[RATE]])
|
|
break
|
|
else:
|
|
# qty found in current bin consume it and exit
|
|
fifo_bin[QTY] = round_off_if_near_zero(fifo_bin[QTY] - qty)
|
|
consumed_bins.append([qty, fifo_bin[RATE]])
|
|
qty = 0
|
|
|
|
return consumed_bins
|
|
|
|
|
|
class LIFOValuation(BinWiseValuation):
|
|
"""Valuation method where a *stack* of all the incoming stock is maintained.
|
|
|
|
New stock is added at top of the stack.
|
|
Qty consumption happens on Last In First Out basis.
|
|
|
|
Stack is implemented using "bins" of [qty, rate].
|
|
|
|
ref: https://en.wikipedia.org/wiki/FIFO_and_LIFO_accounting
|
|
Implementation detail: appends and pops both at end of list.
|
|
"""
|
|
|
|
# specifying the attributes to save resources
|
|
# ref: https://docs.python.org/3/reference/datamodel.html#slots
|
|
__slots__ = [
|
|
"stack",
|
|
]
|
|
|
|
def __init__(self, state: Optional[List[StockBin]]):
|
|
self.stack: List[StockBin] = state if state is not None else []
|
|
|
|
@property
|
|
def state(self) -> List[StockBin]:
|
|
"""Get current state of stack."""
|
|
return self.stack
|
|
|
|
def add_stock(self, qty: float, rate: float) -> None:
|
|
"""Update lifo stack with new stock.
|
|
|
|
args:
|
|
qty: new quantity to add
|
|
rate: incoming rate of new quantity.
|
|
|
|
Behaviour of this is same as FIFO valuation.
|
|
"""
|
|
if not len(self.stack):
|
|
self.stack.append([0, 0])
|
|
|
|
# last row has the same rate, merge new bin.
|
|
if self.stack[-1][RATE] == rate:
|
|
self.stack[-1][QTY] += qty
|
|
else:
|
|
# Item has a positive balance qty, add new entry
|
|
if self.stack[-1][QTY] > 0:
|
|
self.stack.append([qty, rate])
|
|
else: # negative balance qty
|
|
qty = self.stack[-1][QTY] + qty
|
|
if qty > 0: # new balance qty is positive
|
|
self.stack[-1] = [qty, rate]
|
|
else: # new balance qty is still negative, maintain same rate
|
|
self.stack[-1][QTY] = qty
|
|
|
|
def remove_stock(
|
|
self, qty: float, outgoing_rate: float = 0.0, rate_generator: Callable[[], float] = None
|
|
) -> List[StockBin]:
|
|
"""Remove stock from the stack and return popped bins.
|
|
|
|
args:
|
|
qty: quantity to remove
|
|
rate: outgoing rate - ignored. Kept for backwards compatibility.
|
|
rate_generator: function to be called if stack is not found and rate is required.
|
|
"""
|
|
if not rate_generator:
|
|
rate_generator = lambda: 0.0 # noqa
|
|
|
|
consumed_bins = []
|
|
while qty:
|
|
if not len(self.stack):
|
|
# rely on rate generator.
|
|
self.stack.append([0, rate_generator()])
|
|
|
|
# start at the end.
|
|
index = -1
|
|
|
|
stock_bin = self.stack[index]
|
|
if qty >= stock_bin[QTY]:
|
|
# consume current bin
|
|
qty = round_off_if_near_zero(qty - stock_bin[QTY])
|
|
to_consume = self.stack.pop(index)
|
|
consumed_bins.append(list(to_consume))
|
|
|
|
if not self.stack and qty:
|
|
# stock finished, qty still remains to be withdrawn
|
|
# negative stock, keep in as a negative bin
|
|
self.stack.append([-qty, outgoing_rate or stock_bin[RATE]])
|
|
consumed_bins.append([qty, outgoing_rate or stock_bin[RATE]])
|
|
break
|
|
else:
|
|
# qty found in current bin consume it and exit
|
|
stock_bin[QTY] = round_off_if_near_zero(stock_bin[QTY] - qty)
|
|
consumed_bins.append([qty, stock_bin[RATE]])
|
|
qty = 0
|
|
|
|
return consumed_bins
|
|
|
|
|
|
def round_off_if_near_zero(number: float, precision: int = 7) -> float:
|
|
"""Rounds off the number to zero only if number is close to zero for decimal
|
|
specified in precision. Precision defaults to 7.
|
|
"""
|
|
if abs(0.0 - flt(number)) < (1.0 / (10**precision)):
|
|
return 0.0
|
|
|
|
return flt(number)
|