Merge branch 'develop' into fix-ignore-pricing-rule

This commit is contained in:
Saqib Ansari 2022-02-09 10:31:22 +05:30 committed by GitHub
commit 8ff7587aaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 542 additions and 81 deletions

View File

@ -14,6 +14,10 @@ frappe.ui.form.on("Bank Reconciliation Tool", {
}); });
}, },
onload: function (frm) {
frm.trigger('bank_account');
},
refresh: function (frm) { refresh: function (frm) {
frappe.require("bank-reconciliation-tool.bundle.js", () => frappe.require("bank-reconciliation-tool.bundle.js", () =>
frm.trigger("make_reconciliation_tool") frm.trigger("make_reconciliation_tool")
@ -51,7 +55,7 @@ frappe.ui.form.on("Bank Reconciliation Tool", {
bank_account: function (frm) { bank_account: function (frm) {
frappe.db.get_value( frappe.db.get_value(
"Bank Account", "Bank Account",
frm.bank_account, frm.doc.bank_account,
"account", "account",
(r) => { (r) => {
frappe.db.get_value( frappe.db.get_value(

View File

@ -546,7 +546,7 @@ class TestLeaveApplication(unittest.TestCase):
from erpnext.hr.utils import allocate_earned_leaves from erpnext.hr.utils import allocate_earned_leaves
i = 0 i = 0
while(i<14): while(i<14):
allocate_earned_leaves() allocate_earned_leaves(ignore_duplicates=True)
i += 1 i += 1
self.assertEqual(get_leave_balance_on(employee.name, leave_type, nowdate()), 6) self.assertEqual(get_leave_balance_on(employee.name, leave_type, nowdate()), 6)
@ -554,7 +554,7 @@ class TestLeaveApplication(unittest.TestCase):
frappe.db.set_value('Leave Type', leave_type, 'max_leaves_allowed', 0) frappe.db.set_value('Leave Type', leave_type, 'max_leaves_allowed', 0)
i = 0 i = 0
while(i<6): while(i<6):
allocate_earned_leaves() allocate_earned_leaves(ignore_duplicates=True)
i += 1 i += 1
self.assertEqual(get_leave_balance_on(employee.name, leave_type, nowdate()), 9) self.assertEqual(get_leave_balance_on(employee.name, leave_type, nowdate()), 9)

View File

@ -8,7 +8,7 @@ from math import ceil
import frappe import frappe
from frappe import _, bold from frappe import _, bold
from frappe.model.document import Document from frappe.model.document import Document
from frappe.utils import date_diff, flt, formatdate, get_datetime, getdate from frappe.utils import date_diff, flt, formatdate, get_datetime, get_last_day, getdate
class LeavePolicyAssignment(Document): class LeavePolicyAssignment(Document):
@ -108,8 +108,8 @@ class LeavePolicyAssignment(Document):
def get_leaves_for_passed_months(self, leave_type, new_leaves_allocated, leave_type_details, date_of_joining): def get_leaves_for_passed_months(self, leave_type, new_leaves_allocated, leave_type_details, date_of_joining):
from erpnext.hr.utils import get_monthly_earned_leave from erpnext.hr.utils import get_monthly_earned_leave
current_month = get_datetime().month current_month = get_datetime(frappe.flags.current_date).month or get_datetime().month
current_year = get_datetime().year current_year = get_datetime(frappe.flags.current_date).year or get_datetime().year
from_date = frappe.db.get_value("Leave Period", self.leave_period, "from_date") from_date = frappe.db.get_value("Leave Period", self.leave_period, "from_date")
if getdate(date_of_joining) > getdate(from_date): if getdate(date_of_joining) > getdate(from_date):
@ -119,10 +119,14 @@ class LeavePolicyAssignment(Document):
from_date_year = get_datetime(from_date).year from_date_year = get_datetime(from_date).year
months_passed = 0 months_passed = 0
if current_year == from_date_year and current_month > from_date_month: if current_year == from_date_year and current_month > from_date_month:
months_passed = current_month - from_date_month months_passed = current_month - from_date_month
months_passed = add_current_month_if_applicable(months_passed)
elif current_year > from_date_year: elif current_year > from_date_year:
months_passed = (12 - from_date_month) + current_month months_passed = (12 - from_date_month) + current_month
months_passed = add_current_month_if_applicable(months_passed)
if months_passed > 0: if months_passed > 0:
monthly_earned_leave = get_monthly_earned_leave(new_leaves_allocated, monthly_earned_leave = get_monthly_earned_leave(new_leaves_allocated,
@ -134,6 +138,17 @@ class LeavePolicyAssignment(Document):
return new_leaves_allocated return new_leaves_allocated
def add_current_month_if_applicable(months_passed):
date = getdate(frappe.flags.current_date) or getdate()
last_day_of_month = get_last_day(date)
# if its the last day of the month, then that month should also be considered
if last_day_of_month == date:
months_passed += 1
return months_passed
@frappe.whitelist() @frappe.whitelist()
def create_assignment_for_multiple_employees(employees, data): def create_assignment_for_multiple_employees(employees, data):

View File

@ -4,7 +4,7 @@
import unittest import unittest
import frappe import frappe
from frappe.utils import add_months, get_first_day, getdate from frappe.utils import add_months, get_first_day, get_last_day, getdate
from erpnext.hr.doctype.leave_application.test_leave_application import ( from erpnext.hr.doctype.leave_application.test_leave_application import (
get_employee, get_employee,
@ -125,6 +125,121 @@ class TestLeavePolicyAssignment(unittest.TestCase):
}, "total_leaves_allocated") }, "total_leaves_allocated")
self.assertEqual(leaves_allocated, 0) self.assertEqual(leaves_allocated, 0)
def test_earned_leave_allocation_for_passed_months(self):
employee = get_employee()
leave_type = create_earned_leave_type("Test Earned Leave")
leave_period = create_leave_period("Test Earned Leave Period",
start_date=get_first_day(add_months(getdate(), -1)))
leave_policy = frappe.get_doc({
"doctype": "Leave Policy",
"title": "Test Leave Policy",
"leave_policy_details": [{"leave_type": leave_type.name, "annual_allocation": 12}]
}).insert()
# Case 1: assignment created one month after the leave period, should allocate 1 leave
frappe.flags.current_date = get_first_day(getdate())
data = {
"assignment_based_on": "Leave Period",
"leave_policy": leave_policy.name,
"leave_period": leave_period.name
}
leave_policy_assignments = create_assignment_for_multiple_employees([employee.name], frappe._dict(data))
leaves_allocated = frappe.db.get_value("Leave Allocation", {
"leave_policy_assignment": leave_policy_assignments[0]
}, "total_leaves_allocated")
self.assertEqual(leaves_allocated, 1)
def test_earned_leave_allocation_for_passed_months_on_month_end(self):
employee = get_employee()
leave_type = create_earned_leave_type("Test Earned Leave")
leave_period = create_leave_period("Test Earned Leave Period",
start_date=get_first_day(add_months(getdate(), -2)))
leave_policy = frappe.get_doc({
"doctype": "Leave Policy",
"title": "Test Leave Policy",
"leave_policy_details": [{"leave_type": leave_type.name, "annual_allocation": 12}]
}).insert()
# Case 2: assignment created on the last day of the leave period's latter month
# should allocate 1 leave for current month even though the month has not ended
# since the daily job might have already executed
frappe.flags.current_date = get_last_day(getdate())
data = {
"assignment_based_on": "Leave Period",
"leave_policy": leave_policy.name,
"leave_period": leave_period.name
}
leave_policy_assignments = create_assignment_for_multiple_employees([employee.name], frappe._dict(data))
leaves_allocated = frappe.db.get_value("Leave Allocation", {
"leave_policy_assignment": leave_policy_assignments[0]
}, "total_leaves_allocated")
self.assertEqual(leaves_allocated, 3)
# if the daily job is not completed yet, there is another check present
# to ensure leave is not already allocated to avoid duplication
from erpnext.hr.utils import allocate_earned_leaves
allocate_earned_leaves()
leaves_allocated = frappe.db.get_value("Leave Allocation", {
"leave_policy_assignment": leave_policy_assignments[0]
}, "total_leaves_allocated")
self.assertEqual(leaves_allocated, 3)
def test_earned_leave_allocation_for_passed_months_with_carry_forwarded_leaves(self):
from erpnext.hr.doctype.leave_allocation.test_leave_allocation import create_leave_allocation
employee = get_employee()
leave_type = create_earned_leave_type("Test Earned Leave")
leave_period = create_leave_period("Test Earned Leave Period",
start_date=get_first_day(add_months(getdate(), -2)))
leave_policy = frappe.get_doc({
"doctype": "Leave Policy",
"title": "Test Leave Policy",
"leave_policy_details": [{"leave_type": leave_type.name, "annual_allocation": 12}]
}).insert()
# initial leave allocation = 5
leave_allocation = create_leave_allocation(
employee=employee.name,
employee_name=employee.employee_name,
leave_type=leave_type.name,
from_date=add_months(getdate(), -12),
to_date=add_months(getdate(), -3),
new_leaves_allocated=5,
carry_forward=0)
leave_allocation.submit()
# Case 3: assignment created on the last day of the leave period's latter month with carry forwarding
frappe.flags.current_date = get_last_day(add_months(getdate(), -1))
data = {
"assignment_based_on": "Leave Period",
"leave_policy": leave_policy.name,
"leave_period": leave_period.name,
"carry_forward": 1
}
# carry forwarded leaves = 5, 3 leaves allocated for passed months
leave_policy_assignments = create_assignment_for_multiple_employees([employee.name], frappe._dict(data))
details = frappe.db.get_value("Leave Allocation", {
"leave_policy_assignment": leave_policy_assignments[0]
}, ["total_leaves_allocated", "new_leaves_allocated", "unused_leaves", "name"], as_dict=True)
self.assertEqual(details.new_leaves_allocated, 2)
self.assertEqual(details.unused_leaves, 5)
self.assertEqual(details.total_leaves_allocated, 7)
# if the daily job is not completed yet, there is another check present
# to ensure leave is not already allocated to avoid duplication
from erpnext.hr.utils import is_earned_leave_already_allocated
frappe.flags.current_date = get_last_day(getdate())
allocation = frappe.get_doc('Leave Allocation', details.name)
# 1 leave is still pending to be allocated, irrespective of carry forwarded leaves
self.assertFalse(is_earned_leave_already_allocated(allocation, leave_policy.leave_policy_details[0].annual_allocation))
def tearDown(self): def tearDown(self):
frappe.db.rollback() frappe.db.rollback()
@ -138,13 +253,14 @@ def create_earned_leave_type(leave_type):
is_earned_leave=1, is_earned_leave=1,
earned_leave_frequency="Monthly", earned_leave_frequency="Monthly",
rounding=0.5, rounding=0.5,
max_leaves_allowed=6 is_carry_forward=1
)).insert() )).insert()
def create_leave_period(name): def create_leave_period(name, start_date=None):
frappe.delete_doc_if_exists("Leave Period", name, force=1) frappe.delete_doc_if_exists("Leave Period", name, force=1)
start_date = get_first_day(getdate()) if not start_date:
start_date = get_first_day(getdate())
return frappe.get_doc(dict( return frappe.get_doc(dict(
name=name, name=name,

View File

@ -237,7 +237,7 @@ def generate_leave_encashment():
create_leave_encashment(leave_allocation=leave_allocation) create_leave_encashment(leave_allocation=leave_allocation)
def allocate_earned_leaves(): def allocate_earned_leaves(ignore_duplicates=False):
'''Allocate earned leaves to Employees''' '''Allocate earned leaves to Employees'''
e_leave_types = get_earned_leaves() e_leave_types = get_earned_leaves()
today = getdate() today = getdate()
@ -265,9 +265,9 @@ def allocate_earned_leaves():
from_date = frappe.db.get_value("Employee", allocation.employee, "date_of_joining") from_date = frappe.db.get_value("Employee", allocation.employee, "date_of_joining")
if check_effective_date(from_date, today, e_leave_type.earned_leave_frequency, e_leave_type.based_on_date_of_joining_date): if check_effective_date(from_date, today, e_leave_type.earned_leave_frequency, e_leave_type.based_on_date_of_joining_date):
update_previous_leave_allocation(allocation, annual_allocation, e_leave_type) update_previous_leave_allocation(allocation, annual_allocation, e_leave_type, ignore_duplicates)
def update_previous_leave_allocation(allocation, annual_allocation, e_leave_type): def update_previous_leave_allocation(allocation, annual_allocation, e_leave_type, ignore_duplicates=False):
earned_leaves = get_monthly_earned_leave(annual_allocation, e_leave_type.earned_leave_frequency, e_leave_type.rounding) earned_leaves = get_monthly_earned_leave(annual_allocation, e_leave_type.earned_leave_frequency, e_leave_type.rounding)
allocation = frappe.get_doc('Leave Allocation', allocation.name) allocation = frappe.get_doc('Leave Allocation', allocation.name)
@ -277,9 +277,12 @@ def update_previous_leave_allocation(allocation, annual_allocation, e_leave_type
new_allocation = e_leave_type.max_leaves_allowed new_allocation = e_leave_type.max_leaves_allowed
if new_allocation != allocation.total_leaves_allocated: if new_allocation != allocation.total_leaves_allocated:
allocation.db_set("total_leaves_allocated", new_allocation, update_modified=False)
today_date = today() today_date = today()
create_additional_leave_ledger_entry(allocation, earned_leaves, today_date)
if ignore_duplicates or not is_earned_leave_already_allocated(allocation, annual_allocation):
allocation.db_set("total_leaves_allocated", new_allocation, update_modified=False)
create_additional_leave_ledger_entry(allocation, earned_leaves, today_date)
def get_monthly_earned_leave(annual_leaves, frequency, rounding): def get_monthly_earned_leave(annual_leaves, frequency, rounding):
earned_leaves = 0.0 earned_leaves = 0.0
@ -297,6 +300,28 @@ def get_monthly_earned_leave(annual_leaves, frequency, rounding):
return earned_leaves return earned_leaves
def is_earned_leave_already_allocated(allocation, annual_allocation):
from erpnext.hr.doctype.leave_policy_assignment.leave_policy_assignment import (
get_leave_type_details,
)
leave_type_details = get_leave_type_details()
date_of_joining = frappe.db.get_value("Employee", allocation.employee, "date_of_joining")
assignment = frappe.get_doc("Leave Policy Assignment", allocation.leave_policy_assignment)
leaves_for_passed_months = assignment.get_leaves_for_passed_months(allocation.leave_type,
annual_allocation, leave_type_details, date_of_joining)
# exclude carry-forwarded leaves while checking for leave allocation for passed months
num_allocations = allocation.total_leaves_allocated
if allocation.unused_leaves:
num_allocations -= allocation.unused_leaves
if num_allocations >= leaves_for_passed_months:
return True
return False
def get_leave_allocations(date, leave_type): def get_leave_allocations(date, leave_type):
return frappe.db.sql("""select name, employee, from_date, to_date, leave_policy_assignment, leave_policy return frappe.db.sql("""select name, employee, from_date, to_date, leave_policy_assignment, leave_policy
from `tabLeave Allocation` from `tabLeave Allocation`

View File

@ -346,7 +346,7 @@
"fieldname": "valuation_method", "fieldname": "valuation_method",
"fieldtype": "Select", "fieldtype": "Select",
"label": "Valuation Method", "label": "Valuation Method",
"options": "\nFIFO\nMoving Average" "options": "\nFIFO\nMoving Average\nLIFO"
}, },
{ {
"depends_on": "is_stock_item", "depends_on": "is_stock_item",
@ -987,4 +987,4 @@
"states": [], "states": [],
"title_field": "item_name", "title_field": "item_name",
"track_changes": 1 "track_changes": 1
} }

View File

@ -99,7 +99,7 @@
"fieldname": "valuation_method", "fieldname": "valuation_method",
"fieldtype": "Select", "fieldtype": "Select",
"label": "Default Valuation Method", "label": "Default Valuation Method",
"options": "FIFO\nMoving Average" "options": "FIFO\nMoving Average\nLIFO"
}, },
{ {
"description": "The percentage you are allowed to receive or deliver more against the quantity ordered. For example, if you have ordered 100 units, and your Allowance is 10%, then you are allowed to receive 110 units.", "description": "The percentage you are allowed to receive or deliver more against the quantity ordered. For example, if you have ordered 100 units, and your Allowance is 10%, then you are allowed to receive 110 units.",
@ -346,7 +346,7 @@
"index_web_pages_for_search": 1, "index_web_pages_for_search": 1,
"issingle": 1, "issingle": 1,
"links": [], "links": [],
"modified": "2022-02-04 15:33:43.692736", "modified": "2022-02-05 15:33:43.692736",
"modified_by": "Administrator", "modified_by": "Administrator",
"module": "Stock", "module": "Stock",
"name": "Stock Settings", "name": "Stock Settings",

View File

@ -167,7 +167,7 @@ def get_columns():
{ {
"fieldname": "stock_queue", "fieldname": "stock_queue",
"fieldtype": "Data", "fieldtype": "Data",
"label": "FIFO Queue", "label": "FIFO/LIFO Queue",
}, },
{ {

View File

@ -16,7 +16,7 @@ from erpnext.stock.utils import (
get_or_make_bin, get_or_make_bin,
get_valuation_method, get_valuation_method,
) )
from erpnext.stock.valuation import FIFOValuation from erpnext.stock.valuation import FIFOValuation, LIFOValuation
class NegativeStockError(frappe.ValidationError): pass class NegativeStockError(frappe.ValidationError): pass
@ -461,7 +461,7 @@ class update_entries_after(object):
self.wh_data.qty_after_transaction += flt(sle.actual_qty) self.wh_data.qty_after_transaction += flt(sle.actual_qty)
self.wh_data.stock_value = flt(self.wh_data.qty_after_transaction) * flt(self.wh_data.valuation_rate) self.wh_data.stock_value = flt(self.wh_data.qty_after_transaction) * flt(self.wh_data.valuation_rate)
else: else:
self.update_fifo_values(sle) self.update_queue_values(sle)
self.wh_data.qty_after_transaction += flt(sle.actual_qty) self.wh_data.qty_after_transaction += flt(sle.actual_qty)
# rounding as per precision # rounding as per precision
@ -701,14 +701,18 @@ class update_entries_after(object):
sle.voucher_type, sle.voucher_no, self.allow_zero_rate, sle.voucher_type, sle.voucher_no, self.allow_zero_rate,
currency=erpnext.get_company_currency(sle.company), company=sle.company) currency=erpnext.get_company_currency(sle.company), company=sle.company)
def update_fifo_values(self, sle): def update_queue_values(self, sle):
incoming_rate = flt(sle.incoming_rate) incoming_rate = flt(sle.incoming_rate)
actual_qty = flt(sle.actual_qty) actual_qty = flt(sle.actual_qty)
outgoing_rate = flt(sle.outgoing_rate) outgoing_rate = flt(sle.outgoing_rate)
fifo_queue = FIFOValuation(self.wh_data.stock_queue) if self.valuation_method == "LIFO":
stock_queue = LIFOValuation(self.wh_data.stock_queue)
else:
stock_queue = FIFOValuation(self.wh_data.stock_queue)
if actual_qty > 0: if actual_qty > 0:
fifo_queue.add_stock(qty=actual_qty, rate=incoming_rate) stock_queue.add_stock(qty=actual_qty, rate=incoming_rate)
else: else:
def rate_generator() -> float: def rate_generator() -> float:
allow_zero_valuation_rate = self.check_if_allow_zero_valuation_rate(sle.voucher_type, sle.voucher_detail_no) allow_zero_valuation_rate = self.check_if_allow_zero_valuation_rate(sle.voucher_type, sle.voucher_detail_no)
@ -719,11 +723,11 @@ class update_entries_after(object):
else: else:
return 0.0 return 0.0
fifo_queue.remove_stock(qty=abs(actual_qty), outgoing_rate=outgoing_rate, rate_generator=rate_generator) stock_queue.remove_stock(qty=abs(actual_qty), outgoing_rate=outgoing_rate, rate_generator=rate_generator)
stock_qty, stock_value = fifo_queue.get_total_stock_and_value() stock_qty, stock_value = stock_queue.get_total_stock_and_value()
self.wh_data.stock_queue = fifo_queue.get_state() self.wh_data.stock_queue = stock_queue.state
self.wh_data.stock_value = stock_value self.wh_data.stock_value = stock_value
if stock_qty: if stock_qty:
self.wh_data.valuation_rate = stock_value / stock_qty self.wh_data.valuation_rate = stock_value / stock_qty

View File

@ -1,16 +1,21 @@
import json
import unittest import unittest
import frappe
from hypothesis import given from hypothesis import given
from hypothesis import strategies as st from hypothesis import strategies as st
from erpnext.stock.valuation import FIFOValuation, _round_off_if_near_zero from erpnext.stock.doctype.item.test_item import make_item
from erpnext.stock.doctype.stock_entry.stock_entry_utils import make_stock_entry
from erpnext.stock.valuation import FIFOValuation, LIFOValuation, _round_off_if_near_zero
from erpnext.tests.utils import ERPNextTestCase
qty_gen = st.floats(min_value=-1e6, max_value=1e6) qty_gen = st.floats(min_value=-1e6, max_value=1e6)
value_gen = st.floats(min_value=1, max_value=1e6) value_gen = st.floats(min_value=1, max_value=1e6)
stock_queue_generator = st.lists(st.tuples(qty_gen, value_gen), min_size=10) stock_queue_generator = st.lists(st.tuples(qty_gen, value_gen), min_size=10)
class TestFifoValuation(unittest.TestCase): class TestFIFOValuation(unittest.TestCase):
def setUp(self): def setUp(self):
self.queue = FIFOValuation([]) self.queue = FIFOValuation([])
@ -164,3 +169,184 @@ class TestFifoValuation(unittest.TestCase):
total_value -= sum(q * r for q, r in consumed) total_value -= sum(q * r for q, r in consumed)
self.assertTotalQty(total_qty) self.assertTotalQty(total_qty)
self.assertTotalValue(total_value) self.assertTotalValue(total_value)
class TestLIFOValuation(unittest.TestCase):
def setUp(self):
self.stack = LIFOValuation([])
def tearDown(self):
qty, value = self.stack.get_total_stock_and_value()
self.assertTotalQty(qty)
self.assertTotalValue(value)
def assertTotalQty(self, qty):
self.assertAlmostEqual(sum(q for q, _ in self.stack), qty, msg=f"stack: {self.stack}", places=4)
def assertTotalValue(self, value):
self.assertAlmostEqual(sum(q * r for q, r in self.stack), value, msg=f"stack: {self.stack}", places=2)
def test_simple_addition(self):
self.stack.add_stock(1, 10)
self.assertTotalQty(1)
def test_merge_new_stock(self):
self.stack.add_stock(1, 10)
self.stack.add_stock(1, 10)
self.assertEqual(self.stack, [[2, 10]])
def test_simple_removal(self):
self.stack.add_stock(1, 10)
self.stack.remove_stock(1)
self.assertTotalQty(0)
def test_adding_negative_stock_keeps_rate(self):
self.stack = LIFOValuation([[-5.0, 100]])
self.stack.add_stock(1, 10)
self.assertEqual(self.stack, [[-4, 100]])
def test_adding_negative_stock_updates_rate(self):
self.stack = LIFOValuation([[-5.0, 100]])
self.stack.add_stock(6, 10)
self.assertEqual(self.stack, [[1, 10]])
def test_rounding_off(self):
self.stack.add_stock(1.0, 1.0)
self.stack.remove_stock(1.0 - 1e-9)
self.assertTotalQty(0)
def test_lifo_consumption(self):
self.stack.add_stock(10, 10)
self.stack.add_stock(10, 20)
consumed = self.stack.remove_stock(15)
self.assertEqual(consumed, [[10, 20], [5, 10]])
self.assertTotalQty(5)
def test_lifo_consumption_going_negative(self):
self.stack.add_stock(10, 10)
self.stack.add_stock(10, 20)
consumed = self.stack.remove_stock(25)
self.assertEqual(consumed, [[10, 20], [10, 10], [5, 10]])
self.assertTotalQty(-5)
def test_lifo_consumption_multiple(self):
self.stack.add_stock(1, 1)
self.stack.add_stock(2, 2)
consumed = self.stack.remove_stock(1)
self.assertEqual(consumed, [[1, 2]])
self.stack.add_stock(3, 3)
consumed = self.stack.remove_stock(4)
self.assertEqual(consumed, [[3, 3], [1, 2]])
self.stack.add_stock(4, 4)
consumed = self.stack.remove_stock(5)
self.assertEqual(consumed, [[4, 4], [1, 1]])
self.stack.add_stock(5, 5)
consumed = self.stack.remove_stock(5)
self.assertEqual(consumed, [[5, 5]])
@given(stock_queue_generator)
def test_lifo_qty_hypothesis(self, stock_stack):
self.stack = LIFOValuation([])
total_qty = 0
for qty, rate in stock_stack:
if qty == 0:
continue
if qty > 0:
self.stack.add_stock(qty, rate)
total_qty += qty
else:
qty = abs(qty)
consumed = self.stack.remove_stock(qty)
self.assertAlmostEqual(qty, sum(q for q, _ in consumed), msg=f"incorrect consumption {consumed}")
total_qty -= qty
self.assertTotalQty(total_qty)
@given(stock_queue_generator)
def test_lifo_qty_value_nonneg_hypothesis(self, stock_stack):
self.stack = LIFOValuation([])
total_qty = 0.0
total_value = 0.0
for qty, rate in stock_stack:
# don't allow negative stock
if qty == 0 or total_qty + qty < 0 or abs(qty) < 0.1:
continue
if qty > 0:
self.stack.add_stock(qty, rate)
total_qty += qty
total_value += qty * rate
else:
qty = abs(qty)
consumed = self.stack.remove_stock(qty)
self.assertAlmostEqual(qty, sum(q for q, _ in consumed), msg=f"incorrect consumption {consumed}")
total_qty -= qty
total_value -= sum(q * r for q, r in consumed)
self.assertTotalQty(total_qty)
self.assertTotalValue(total_value)
class TestLIFOValuationSLE(ERPNextTestCase):
ITEM_CODE = "_Test LIFO item"
WAREHOUSE = "_Test Warehouse - _TC"
@classmethod
def setUpClass(cls) -> None:
super().setUpClass()
make_item(cls.ITEM_CODE, {"valuation_method": "LIFO"})
def _make_stock_entry(self, qty, rate=None):
kwargs = {
"item_code": self.ITEM_CODE,
"from_warehouse" if qty < 0 else "to_warehouse": self.WAREHOUSE,
"rate": rate,
"qty": abs(qty),
}
return make_stock_entry(**kwargs)
def assertStockQueue(self, se, expected_queue):
sle_name = frappe.db.get_value("Stock Ledger Entry", {"voucher_no": se.name, "is_cancelled": 0, "voucher_type": "Stock Entry"})
sle = frappe.get_doc("Stock Ledger Entry", sle_name)
stock_queue = json.loads(sle.stock_queue)
total_qty, total_value = LIFOValuation(stock_queue).get_total_stock_and_value()
self.assertEqual(sle.qty_after_transaction, total_qty)
self.assertEqual(sle.stock_value, total_value)
if total_qty > 0:
self.assertEqual(stock_queue, expected_queue)
def test_lifo_values(self):
in1 = self._make_stock_entry(1, 1)
self.assertStockQueue(in1, [[1, 1]])
in2 = self._make_stock_entry(2, 2)
self.assertStockQueue(in2, [[1, 1], [2, 2]])
out1 = self._make_stock_entry(-1)
self.assertStockQueue(out1, [[1, 1], [1, 2]])
in3 = self._make_stock_entry(3, 3)
self.assertStockQueue(in3, [[1, 1], [1, 2], [3, 3]])
out2 = self._make_stock_entry(-4)
self.assertStockQueue(out2, [[1, 1]])
in4 = self._make_stock_entry(4, 4)
self.assertStockQueue(in4, [[1, 1], [4,4]])
out3 = self._make_stock_entry(-5)
self.assertStockQueue(out3, [])
in5 = self._make_stock_entry(5, 5)
self.assertStockQueue(in5, [[5, 5]])
out5 = self._make_stock_entry(-5)
self.assertStockQueue(out5, [])

View File

@ -9,6 +9,7 @@ from frappe import _
from frappe.utils import cstr, flt, get_link_to_form, nowdate, nowtime from frappe.utils import cstr, flt, get_link_to_form, nowdate, nowtime
import erpnext import erpnext
from erpnext.stock.valuation import FIFOValuation, LIFOValuation
class InvalidWarehouseCompany(frappe.ValidationError): pass class InvalidWarehouseCompany(frappe.ValidationError): pass
@ -228,10 +229,10 @@ def get_incoming_rate(args, raise_error_if_no_rate=True):
else: else:
valuation_method = get_valuation_method(args.get("item_code")) valuation_method = get_valuation_method(args.get("item_code"))
previous_sle = get_previous_sle(args) previous_sle = get_previous_sle(args)
if valuation_method == 'FIFO': if valuation_method in ('FIFO', 'LIFO'):
if previous_sle: if previous_sle:
previous_stock_queue = json.loads(previous_sle.get('stock_queue', '[]') or '[]') previous_stock_queue = json.loads(previous_sle.get('stock_queue', '[]') or '[]')
in_rate = get_fifo_rate(previous_stock_queue, args.get("qty") or 0) if previous_stock_queue else 0 in_rate = _get_fifo_lifo_rate(previous_stock_queue, args.get("qty") or 0, valuation_method) if previous_stock_queue else 0
elif valuation_method == 'Moving Average': elif valuation_method == 'Moving Average':
in_rate = previous_sle.get('valuation_rate') or 0 in_rate = previous_sle.get('valuation_rate') or 0
@ -261,29 +262,25 @@ def get_valuation_method(item_code):
def get_fifo_rate(previous_stock_queue, qty): def get_fifo_rate(previous_stock_queue, qty):
"""get FIFO (average) Rate from Queue""" """get FIFO (average) Rate from Queue"""
if flt(qty) >= 0: return _get_fifo_lifo_rate(previous_stock_queue, qty, "FIFO")
total = sum(f[0] for f in previous_stock_queue)
return sum(flt(f[0]) * flt(f[1]) for f in previous_stock_queue) / flt(total) if total else 0.0
else:
available_qty_for_outgoing, outgoing_cost = 0, 0
qty_to_pop = abs(flt(qty))
while qty_to_pop and previous_stock_queue:
batch = previous_stock_queue[0]
if 0 < batch[0] <= qty_to_pop:
# if batch qty > 0
# not enough or exactly same qty in current batch, clear batch
available_qty_for_outgoing += flt(batch[0])
outgoing_cost += flt(batch[0]) * flt(batch[1])
qty_to_pop -= batch[0]
previous_stock_queue.pop(0)
else:
# all from current batch
available_qty_for_outgoing += flt(qty_to_pop)
outgoing_cost += flt(qty_to_pop) * flt(batch[1])
batch[0] -= qty_to_pop
qty_to_pop = 0
return outgoing_cost / available_qty_for_outgoing def get_lifo_rate(previous_stock_queue, qty):
"""get LIFO (average) Rate from Queue"""
return _get_fifo_lifo_rate(previous_stock_queue, qty, "LIFO")
def _get_fifo_lifo_rate(previous_stock_queue, qty, method):
ValuationKlass = LIFOValuation if method == "LIFO" else FIFOValuation
stock_queue = ValuationKlass(previous_stock_queue)
if flt(qty) >= 0:
total_qty, total_value = stock_queue.get_total_stock_and_value()
return total_value / total_qty if total_qty else 0.0
else:
popped_bins = stock_queue.remove_stock(abs(flt(qty)))
total_qty, total_value = ValuationKlass(popped_bins).get_total_stock_and_value()
return total_value / total_qty if total_qty else 0.0
def get_valid_serial_nos(sr_nos, qty=0, item_code=''): def get_valid_serial_nos(sr_nos, qty=0, item_code=''):
"""split serial nos, validate and return list of valid serial nos""" """split serial nos, validate and return list of valid serial nos"""

View File

@ -1,15 +1,54 @@
from abc import ABC, abstractmethod, abstractproperty
from typing import Callable, List, NewType, Optional, Tuple from typing import Callable, List, NewType, Optional, Tuple
from frappe.utils import flt from frappe.utils import flt
FifoBin = NewType("FifoBin", List[float]) StockBin = NewType("StockBin", List[float]) # [[qty, rate], ...]
# Indexes of values inside FIFO bin 2-tuple # Indexes of values inside FIFO bin 2-tuple
QTY = 0 QTY = 0
RATE = 1 RATE = 1
class FIFOValuation: class BinWiseValuation(ABC):
@abstractmethod
def add_stock(self, qty: float, rate: float) -> None:
pass
@abstractmethod
def remove_stock(
self, qty: float, outgoing_rate: float = 0.0, rate_generator: Callable[[], float] = None
) -> List[StockBin]:
pass
@abstractproperty
def state(self) -> List[StockBin]:
pass
def get_total_stock_and_value(self) -> Tuple[float, float]:
total_qty = 0.0
total_value = 0.0
for qty, rate in self.state:
total_qty += flt(qty)
total_value += flt(qty) * flt(rate)
return _round_off_if_near_zero(total_qty), _round_off_if_near_zero(total_value)
def __repr__(self):
return str(self.state)
def __iter__(self):
return iter(self.state)
def __eq__(self, other):
if isinstance(other, list):
return self.state == other
return type(self) == type(other) and self.state == other.state
class FIFOValuation(BinWiseValuation):
"""Valuation method where a queue of all the incoming stock is maintained. """Valuation method where a queue of all the incoming stock is maintained.
New stock is added at end of the queue. New stock is added at end of the queue.
@ -24,34 +63,14 @@ class FIFOValuation:
# ref: https://docs.python.org/3/reference/datamodel.html#slots # ref: https://docs.python.org/3/reference/datamodel.html#slots
__slots__ = ["queue",] __slots__ = ["queue",]
def __init__(self, state: Optional[List[FifoBin]]): def __init__(self, state: Optional[List[StockBin]]):
self.queue: List[FifoBin] = state if state is not None else [] self.queue: List[StockBin] = state if state is not None else []
def __repr__(self): @property
return str(self.queue) def state(self) -> List[StockBin]:
def __iter__(self):
return iter(self.queue)
def __eq__(self, other):
if isinstance(other, list):
return self.queue == other
return self.queue == other.queue
def get_state(self) -> List[FifoBin]:
"""Get current state of queue.""" """Get current state of queue."""
return self.queue return self.queue
def get_total_stock_and_value(self) -> Tuple[float, float]:
total_qty = 0.0
total_value = 0.0
for qty, rate in self.queue:
total_qty += flt(qty)
total_value += flt(qty) * flt(rate)
return _round_off_if_near_zero(total_qty), _round_off_if_near_zero(total_value)
def add_stock(self, qty: float, rate: float) -> None: def add_stock(self, qty: float, rate: float) -> None:
"""Update fifo queue with new stock. """Update fifo queue with new stock.
@ -78,7 +97,7 @@ class FIFOValuation:
def remove_stock( def remove_stock(
self, qty: float, outgoing_rate: float = 0.0, rate_generator: Callable[[], float] = None self, qty: float, outgoing_rate: float = 0.0, rate_generator: Callable[[], float] = None
) -> List[FifoBin]: ) -> List[StockBin]:
"""Remove stock from the queue and return popped bins. """Remove stock from the queue and return popped bins.
args: args:
@ -136,6 +155,101 @@ class FIFOValuation:
return consumed_bins return consumed_bins
class LIFOValuation(BinWiseValuation):
"""Valuation method where a *stack* of all the incoming stock is maintained.
New stock is added at top of the stack.
Qty consumption happens on Last In First Out basis.
Stack is implemented using "bins" of [qty, rate].
ref: https://en.wikipedia.org/wiki/FIFO_and_LIFO_accounting
Implementation detail: appends and pops both at end of list.
"""
# specifying the attributes to save resources
# ref: https://docs.python.org/3/reference/datamodel.html#slots
__slots__ = ["stack",]
def __init__(self, state: Optional[List[StockBin]]):
self.stack: List[StockBin] = state if state is not None else []
@property
def state(self) -> List[StockBin]:
"""Get current state of stack."""
return self.stack
def add_stock(self, qty: float, rate: float) -> None:
"""Update lifo stack with new stock.
args:
qty: new quantity to add
rate: incoming rate of new quantity.
Behaviour of this is same as FIFO valuation.
"""
if not len(self.stack):
self.stack.append([0, 0])
# last row has the same rate, merge new bin.
if self.stack[-1][RATE] == rate:
self.stack[-1][QTY] += qty
else:
# Item has a positive balance qty, add new entry
if self.stack[-1][QTY] > 0:
self.stack.append([qty, rate])
else: # negative balance qty
qty = self.stack[-1][QTY] + qty
if qty > 0: # new balance qty is positive
self.stack[-1] = [qty, rate]
else: # new balance qty is still negative, maintain same rate
self.stack[-1][QTY] = qty
def remove_stock(
self, qty: float, outgoing_rate: float = 0.0, rate_generator: Callable[[], float] = None
) -> List[StockBin]:
"""Remove stock from the stack and return popped bins.
args:
qty: quantity to remove
rate: outgoing rate - ignored. Kept for backwards compatibility.
rate_generator: function to be called if stack is not found and rate is required.
"""
if not rate_generator:
rate_generator = lambda : 0.0 # noqa
consumed_bins = []
while qty:
if not len(self.stack):
# rely on rate generator.
self.stack.append([0, rate_generator()])
# start at the end.
index = -1
stock_bin = self.stack[index]
if qty >= stock_bin[QTY]:
# consume current bin
qty = _round_off_if_near_zero(qty - stock_bin[QTY])
to_consume = self.stack.pop(index)
consumed_bins.append(list(to_consume))
if not self.stack and qty:
# stock finished, qty still remains to be withdrawn
# negative stock, keep in as a negative bin
self.stack.append([-qty, outgoing_rate or stock_bin[RATE]])
consumed_bins.append([qty, outgoing_rate or stock_bin[RATE]])
break
else:
# qty found in current bin consume it and exit
stock_bin[QTY] = _round_off_if_near_zero(stock_bin[QTY] - qty)
consumed_bins.append([qty, stock_bin[RATE]])
qty = 0
return consumed_bins
def _round_off_if_near_zero(number: float, precision: int = 7) -> float: def _round_off_if_near_zero(number: float, precision: int = 7) -> float:
"""Rounds off the number to zero only if number is close to zero for decimal """Rounds off the number to zero only if number is close to zero for decimal
specified in precision. Precision defaults to 7. specified in precision. Precision defaults to 7.