Merge branch 'develop' into crm-carry-forward-communication-comments

This commit is contained in:
Anupam Kumar 2021-12-20 13:37:00 +05:30 committed by GitHub
commit ecafea9064
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 333 additions and 78 deletions

1
dev-requirements.txt Normal file
View File

@ -0,0 +1 @@
hypothesis~=6.31.0

View File

@ -16,6 +16,7 @@ from erpnext.stock.utils import (
get_or_make_bin,
get_valuation_method,
)
from erpnext.stock.valuation import FIFOValuation
class NegativeStockError(frappe.ValidationError): pass
@ -456,9 +457,8 @@ class update_entries_after(object):
self.wh_data.qty_after_transaction += flt(sle.actual_qty)
self.wh_data.stock_value = flt(self.wh_data.qty_after_transaction) * flt(self.wh_data.valuation_rate)
else:
self.get_fifo_values(sle)
self.update_fifo_values(sle)
self.wh_data.qty_after_transaction += flt(sle.actual_qty)
self.wh_data.stock_value = sum(flt(batch[0]) * flt(batch[1]) for batch in self.wh_data.stock_queue)
# rounding as per precision
self.wh_data.stock_value = flt(self.wh_data.stock_value, self.precision)
@ -696,87 +696,39 @@ class update_entries_after(object):
sle.voucher_type, sle.voucher_no, self.allow_zero_rate,
currency=erpnext.get_company_currency(sle.company), company=sle.company)
def get_fifo_values(self, sle):
def update_fifo_values(self, sle):
incoming_rate = flt(sle.incoming_rate)
actual_qty = flt(sle.actual_qty)
outgoing_rate = flt(sle.outgoing_rate)
fifo_queue = FIFOValuation(self.wh_data.stock_queue)
if actual_qty > 0:
if not self.wh_data.stock_queue:
self.wh_data.stock_queue.append([0, 0])
# last row has the same rate, just updated the qty
if self.wh_data.stock_queue[-1][1]==incoming_rate:
self.wh_data.stock_queue[-1][0] += actual_qty
else:
# Item has a positive balance qty, add new entry
if self.wh_data.stock_queue[-1][0] > 0:
self.wh_data.stock_queue.append([actual_qty, incoming_rate])
else: # negative balance qty
qty = self.wh_data.stock_queue[-1][0] + actual_qty
if qty > 0: # new balance qty is positive
self.wh_data.stock_queue[-1] = [qty, incoming_rate]
else: # new balance qty is still negative, maintain same rate
self.wh_data.stock_queue[-1][0] = qty
fifo_queue.add_stock(qty=actual_qty, rate=incoming_rate)
else:
qty_to_pop = abs(actual_qty)
while qty_to_pop:
if not self.wh_data.stock_queue:
# Get valuation rate from last sle if exists or from valuation rate field in item master
allow_zero_valuation_rate = self.check_if_allow_zero_valuation_rate(sle.voucher_type, sle.voucher_detail_no)
if not allow_zero_valuation_rate:
_rate = get_valuation_rate(sle.item_code, sle.warehouse,
sle.voucher_type, sle.voucher_no, self.allow_zero_rate,
currency=erpnext.get_company_currency(sle.company), company=sle.company)
else:
_rate = 0
self.wh_data.stock_queue.append([0, _rate])
index = None
if outgoing_rate > 0:
# Find the entry where rate matched with outgoing rate
for i, v in enumerate(self.wh_data.stock_queue):
if v[1] == outgoing_rate:
index = i
break
# If no entry found with outgoing rate, collapse stack
if index is None: # nosemgrep
new_stock_value = sum(d[0]*d[1] for d in self.wh_data.stock_queue) - qty_to_pop*outgoing_rate
new_stock_qty = sum(d[0] for d in self.wh_data.stock_queue) - qty_to_pop
self.wh_data.stock_queue = [[new_stock_qty, new_stock_value/new_stock_qty if new_stock_qty > 0 else outgoing_rate]]
break
def rate_generator() -> float:
allow_zero_valuation_rate = self.check_if_allow_zero_valuation_rate(sle.voucher_type, sle.voucher_detail_no)
if not allow_zero_valuation_rate:
return get_valuation_rate(sle.item_code, sle.warehouse,
sle.voucher_type, sle.voucher_no, self.allow_zero_rate,
currency=erpnext.get_company_currency(sle.company), company=sle.company)
else:
index = 0
return 0.0
# select first batch or the batch with same rate
batch = self.wh_data.stock_queue[index]
if qty_to_pop >= batch[0]:
# consume current batch
qty_to_pop = _round_off_if_near_zero(qty_to_pop - batch[0])
self.wh_data.stock_queue.pop(index)
if not self.wh_data.stock_queue and qty_to_pop:
# stock finished, qty still remains to be withdrawn
# negative stock, keep in as a negative batch
self.wh_data.stock_queue.append([-qty_to_pop, outgoing_rate or batch[1]])
break
fifo_queue.remove_stock(qty=abs(actual_qty), outgoing_rate=outgoing_rate, rate_generator=rate_generator)
else:
# qty found in current batch
# consume it and exit
batch[0] = batch[0] - qty_to_pop
qty_to_pop = 0
stock_value = _round_off_if_near_zero(sum(flt(batch[0]) * flt(batch[1]) for batch in self.wh_data.stock_queue))
stock_qty = _round_off_if_near_zero(sum(flt(batch[0]) for batch in self.wh_data.stock_queue))
stock_qty, stock_value = fifo_queue.get_total_stock_and_value()
self.wh_data.stock_queue = fifo_queue.get_state()
self.wh_data.stock_value = stock_value
if stock_qty:
self.wh_data.valuation_rate = stock_value / flt(stock_qty)
self.wh_data.valuation_rate = stock_value / stock_qty
if not self.wh_data.stock_queue:
self.wh_data.stock_queue.append([0, sle.incoming_rate or sle.outgoing_rate or self.wh_data.valuation_rate])
def check_if_allow_zero_valuation_rate(self, voucher_type, voucher_detail_no):
ref_item_dt = ""
@ -1158,13 +1110,3 @@ def get_future_sle_with_negative_batch_qty(args):
and timestamp(posting_date, posting_time) >= timestamp(%(posting_date)s, %(posting_time)s)
limit 1
""", args, as_dict=1)
def _round_off_if_near_zero(number: float, precision: int = 6) -> float:
""" Rounds off the number to zero only if number is close to zero for decimal
specified in precision. Precision defaults to 6.
"""
if flt(number) < (1.0 / (10**precision)):
return 0
return flt(number)

View File

@ -0,0 +1,166 @@
import unittest
from hypothesis import given
from hypothesis import strategies as st
from erpnext.stock.valuation import FIFOValuation, _round_off_if_near_zero
qty_gen = st.floats(min_value=-1e6, max_value=1e6)
value_gen = st.floats(min_value=1, max_value=1e6)
stock_queue_generator = st.lists(st.tuples(qty_gen, value_gen), min_size=10)
class TestFifoValuation(unittest.TestCase):
def setUp(self):
self.queue = FIFOValuation([])
def tearDown(self):
qty, value = self.queue.get_total_stock_and_value()
self.assertTotalQty(qty)
self.assertTotalValue(value)
def assertTotalQty(self, qty):
self.assertAlmostEqual(sum(q for q, _ in self.queue), qty, msg=f"queue: {self.queue}", places=4)
def assertTotalValue(self, value):
self.assertAlmostEqual(sum(q * r for q, r in self.queue), value, msg=f"queue: {self.queue}", places=2)
def test_simple_addition(self):
self.queue.add_stock(1, 10)
self.assertTotalQty(1)
def test_simple_removal(self):
self.queue.add_stock(1, 10)
self.queue.remove_stock(1)
self.assertTotalQty(0)
def test_merge_new_stock(self):
self.queue.add_stock(1, 10)
self.queue.add_stock(1, 10)
self.assertEqual(self.queue, [[2, 10]])
def test_adding_negative_stock_keeps_rate(self):
self.queue = FIFOValuation([[-5.0, 100]])
self.queue.add_stock(1, 10)
self.assertEqual(self.queue, [[-4, 100]])
def test_adding_negative_stock_updates_rate(self):
self.queue = FIFOValuation([[-5.0, 100]])
self.queue.add_stock(6, 10)
self.assertEqual(self.queue, [[1, 10]])
def test_negative_stock(self):
self.queue.remove_stock(1, 5)
self.assertEqual(self.queue, [[-1, 5]])
# XXX
self.queue.remove_stock(1, 10)
self.assertTotalQty(-2)
self.queue.add_stock(2, 10)
self.assertTotalQty(0)
self.assertTotalValue(0)
def test_removing_specified_rate(self):
self.queue.add_stock(1, 10)
self.queue.add_stock(1, 20)
self.queue.remove_stock(1, 20)
self.assertEqual(self.queue, [[1, 10]])
def test_remove_multiple_bins(self):
self.queue.add_stock(1, 10)
self.queue.add_stock(2, 20)
self.queue.add_stock(1, 20)
self.queue.add_stock(5, 20)
self.queue.remove_stock(4)
self.assertEqual(self.queue, [[5, 20]])
def test_remove_multiple_bins_with_rate(self):
self.queue.add_stock(1, 10)
self.queue.add_stock(2, 20)
self.queue.add_stock(1, 20)
self.queue.add_stock(5, 20)
self.queue.remove_stock(3, 20)
self.assertEqual(self.queue, [[1, 10], [5, 20]])
def test_collapsing_of_queue(self):
self.queue.add_stock(1, 1)
self.queue.add_stock(1, 2)
self.queue.add_stock(1, 3)
self.queue.add_stock(1, 4)
self.assertTotalValue(10)
self.queue.remove_stock(3, 1)
# XXX
self.assertEqual(self.queue, [[1, 7]])
def test_rounding_off(self):
self.queue.add_stock(1.0, 1.0)
self.queue.remove_stock(1.0 - 1e-9)
self.assertTotalQty(0)
def test_rounding_off_near_zero(self):
self.assertEqual(_round_off_if_near_zero(0), 0)
self.assertEqual(_round_off_if_near_zero(1), 1)
self.assertEqual(_round_off_if_near_zero(-1), -1)
self.assertEqual(_round_off_if_near_zero(-1e-8), 0)
self.assertEqual(_round_off_if_near_zero(1e-8), 0)
def test_totals(self):
self.queue.add_stock(1, 10)
self.queue.add_stock(2, 13)
self.queue.add_stock(1, 17)
self.queue.remove_stock(1)
self.queue.remove_stock(1)
self.queue.remove_stock(1)
self.queue.add_stock(5, 17)
self.queue.add_stock(8, 11)
@given(stock_queue_generator)
def test_fifo_qty_hypothesis(self, stock_queue):
self.queue = FIFOValuation([])
total_qty = 0
for qty, rate in stock_queue:
if qty == 0:
continue
if qty > 0:
self.queue.add_stock(qty, rate)
total_qty += qty
else:
qty = abs(qty)
consumed = self.queue.remove_stock(qty)
self.assertAlmostEqual(qty, sum(q for q, _ in consumed), msg=f"incorrect consumption {consumed}")
total_qty -= qty
self.assertTotalQty(total_qty)
@given(stock_queue_generator)
def test_fifo_qty_value_nonneg_hypothesis(self, stock_queue):
self.queue = FIFOValuation([])
total_qty = 0.0
total_value = 0.0
for qty, rate in stock_queue:
# don't allow negative stock
if qty == 0 or total_qty + qty < 0 or abs(qty) < 0.1:
continue
if qty > 0:
self.queue.add_stock(qty, rate)
total_qty += qty
total_value += qty * rate
else:
qty = abs(qty)
consumed = self.queue.remove_stock(qty)
self.assertAlmostEqual(qty, sum(q for q, _ in consumed), msg=f"incorrect consumption {consumed}")
total_qty -= qty
total_value -= sum(q * r for q, r in consumed)
self.assertTotalQty(total_qty)
self.assertTotalValue(total_value)

146
erpnext/stock/valuation.py Normal file
View File

@ -0,0 +1,146 @@
from typing import Callable, List, NewType, Optional, Tuple
from frappe.utils import flt
FifoBin = NewType("FifoBin", List[float])
# Indexes of values inside FIFO bin 2-tuple
QTY = 0
RATE = 1
class FIFOValuation:
"""Valuation method where a queue of all the incoming stock is maintained.
New stock is added at end of the queue.
Qty consumption happens on First In First Out basis.
Queue is implemented using "bins" of [qty, rate].
ref: https://en.wikipedia.org/wiki/FIFO_and_LIFO_accounting
"""
# specifying the attributes to save resources
# ref: https://docs.python.org/3/reference/datamodel.html#slots
__slots__ = ["queue",]
def __init__(self, state: Optional[List[FifoBin]]):
self.queue: List[FifoBin] = state if state is not None else []
def __repr__(self):
return str(self.queue)
def __iter__(self):
return iter(self.queue)
def __eq__(self, other):
if isinstance(other, list):
return self.queue == other
return self.queue == other.queue
def get_state(self) -> List[FifoBin]:
"""Get current state of queue."""
return self.queue
def get_total_stock_and_value(self) -> Tuple[float, float]:
total_qty = 0.0
total_value = 0.0
for qty, rate in self.queue:
total_qty += flt(qty)
total_value += flt(qty) * flt(rate)
return _round_off_if_near_zero(total_qty), _round_off_if_near_zero(total_value)
def add_stock(self, qty: float, rate: float) -> None:
"""Update fifo queue with new stock.
args:
qty: new quantity to add
rate: incoming rate of new quantity"""
if not len(self.queue):
self.queue.append([0, 0])
# last row has the same rate, merge new bin.
if self.queue[-1][RATE] == rate:
self.queue[-1][QTY] += qty
else:
# Item has a positive balance qty, add new entry
if self.queue[-1][QTY] > 0:
self.queue.append([qty, rate])
else: # negative balance qty
qty = self.queue[-1][QTY] + qty
if qty > 0: # new balance qty is positive
self.queue[-1] = [qty, rate]
else: # new balance qty is still negative, maintain same rate
self.queue[-1][QTY] = qty
def remove_stock(
self, qty: float, outgoing_rate: float = 0.0, rate_generator: Callable[[], float] = None
) -> List[FifoBin]:
"""Remove stock from the queue and return popped bins.
args:
qty: quantity to remove
rate: outgoing rate
rate_generator: function to be called if queue is not found and rate is required.
"""
if not rate_generator:
rate_generator = lambda : 0.0 # noqa
consumed_bins = []
while qty:
if not len(self.queue):
# rely on rate generator.
self.queue.append([0, rate_generator()])
index = None
if outgoing_rate > 0:
# Find the entry where rate matched with outgoing rate
for idx, fifo_bin in enumerate(self.queue):
if fifo_bin[RATE] == outgoing_rate:
index = idx
break
# If no entry found with outgoing rate, collapse queue
if index is None: # nosemgrep
new_stock_value = sum(d[QTY] * d[RATE] for d in self.queue) - qty * outgoing_rate
new_stock_qty = sum(d[QTY] for d in self.queue) - qty
self.queue = [[new_stock_qty, new_stock_value / new_stock_qty if new_stock_qty > 0 else outgoing_rate]]
consumed_bins.append([qty, outgoing_rate])
break
else:
index = 0
# select first bin or the bin with same rate
fifo_bin = self.queue[index]
if qty >= fifo_bin[QTY]:
# consume current bin
qty = _round_off_if_near_zero(qty - fifo_bin[QTY])
to_consume = self.queue.pop(index)
consumed_bins.append(list(to_consume))
if not self.queue and qty:
# stock finished, qty still remains to be withdrawn
# negative stock, keep in as a negative bin
self.queue.append([-qty, outgoing_rate or fifo_bin[RATE]])
consumed_bins.append([qty, outgoing_rate or fifo_bin[RATE]])
break
else:
# qty found in current bin consume it and exit
fifo_bin[QTY] = _round_off_if_near_zero(fifo_bin[QTY] - qty)
consumed_bins.append([qty, fifo_bin[RATE]])
qty = 0
return consumed_bins
def _round_off_if_near_zero(number: float, precision: int = 7) -> float:
"""Rounds off the number to zero only if number is close to zero for decimal
specified in precision. Precision defaults to 7.
"""
if abs(0.0 - flt(number)) < (1.0 / (10 ** precision)):
return 0.0
return flt(number)