brotherton-erpnext/erpnext/stock/utils.py

325 lines
11 KiB
Python
Raw Normal View History

# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
from __future__ import unicode_literals
2017-12-15 06:43:50 +00:00
import frappe, erpnext
2014-04-14 13:50:45 +00:00
from frappe import _
import json
from frappe.utils import flt, cstr, nowdate, nowtime
from six import string_types
2014-02-14 10:17:51 +00:00
class InvalidWarehouseCompany(frappe.ValidationError): pass
def get_stock_value_from_bin(warehouse=None, item_code=None):
values = {}
conditions = ""
if warehouse:
conditions += """ and `tabBin`.warehouse in (
select w2.name from `tabWarehouse` w1
join `tabWarehouse` w2 on
w1.name = %(warehouse)s
and w2.lft between w1.lft and w1.rgt
) """
values['warehouse'] = warehouse
if item_code:
conditions += " and `tabBin`.item_code = %(item_code)s"
values['item_code'] = item_code
query = """select sum(stock_value) from `tabBin`, `tabItem` where 1 = 1
and `tabItem`.name = `tabBin`.item_code and ifnull(`tabItem`.disabled, 0) = 0 %s""" % conditions
stock_value = frappe.db.sql(query, values)
return stock_value
def get_stock_value_on(warehouse=None, posting_date=None, item_code=None):
if not posting_date: posting_date = nowdate()
values, condition = [posting_date], ""
if warehouse:
lft, rgt, is_group = frappe.db.get_value("Warehouse", warehouse, ["lft", "rgt", "is_group"])
2016-06-26 17:20:11 +00:00
if is_group:
values.extend([lft, rgt])
condition += "and exists (\
select name from `tabWarehouse` wh where wh.name = sle.warehouse\
and wh.lft >= %s and wh.rgt <= %s)"
else:
values.append(warehouse)
condition += " AND warehouse = %s"
if item_code:
values.append(item_code)
condition += " AND item_code = %s"
2014-02-26 07:05:33 +00:00
stock_ledger_entries = frappe.db.sql("""
SELECT item_code, stock_value, name, warehouse
FROM `tabStock Ledger Entry` sle
WHERE posting_date <= %s {0}
ORDER BY timestamp(posting_date, posting_time) DESC, creation DESC
""".format(condition), values, as_dict=1)
sle_map = {}
for sle in stock_ledger_entries:
if not (sle.item_code, sle.warehouse) in sle_map:
sle_map[(sle.item_code, sle.warehouse)] = flt(sle.stock_value)
return sum(sle_map.values())
@frappe.whitelist()
2015-02-17 07:20:20 +00:00
def get_stock_balance(item_code, warehouse, posting_date=None, posting_time=None, with_valuation_rate=False):
"""Returns stock balance quantity at given warehouse on given posting date or current date.
If `with_valuation_rate` is True, will return tuple (qty, rate)"""
2015-02-20 09:41:56 +00:00
from erpnext.stock.stock_ledger import get_previous_sle
if not posting_date: posting_date = nowdate()
if not posting_time: posting_time = nowtime()
2015-02-20 09:41:56 +00:00
last_entry = get_previous_sle({
"item_code": item_code,
"warehouse":warehouse,
"posting_date": posting_date,
"posting_time": posting_time })
2015-02-17 07:20:20 +00:00
if with_valuation_rate:
2015-02-20 09:41:56 +00:00
return (last_entry.qty_after_transaction, last_entry.valuation_rate) if last_entry else (0.0, 0.0)
else:
2017-06-20 11:43:29 +00:00
return last_entry.qty_after_transaction if last_entry else 0.0
@frappe.whitelist()
def get_latest_stock_qty(item_code, warehouse=None):
values, condition = [item_code], ""
if warehouse:
lft, rgt, is_group = frappe.db.get_value("Warehouse", warehouse, ["lft", "rgt", "is_group"])
if is_group:
values.extend([lft, rgt])
condition += "and exists (\
select name from `tabWarehouse` wh where wh.name = tabBin.warehouse\
and wh.lft >= %s and wh.rgt <= %s)"
else:
values.append(warehouse)
condition += " AND warehouse = %s"
actual_qty = frappe.db.sql("""select sum(actual_qty) from tabBin
where item_code=%s {0}""".format(condition), values)[0][0]
return actual_qty
def get_latest_stock_balance():
bin_map = {}
for d in frappe.db.sql("""SELECT item_code, warehouse, stock_value as stock_value
FROM tabBin""", as_dict=1):
bin_map.setdefault(d.warehouse, {}).setdefault(d.item_code, flt(d.stock_value))
return bin_map
def get_bin(item_code, warehouse):
2014-02-26 07:05:33 +00:00
bin = frappe.db.get_value("Bin", {"item_code": item_code, "warehouse": warehouse})
if not bin:
bin_obj = frappe.get_doc({
"doctype": "Bin",
"item_code": item_code,
"warehouse": warehouse,
})
bin_obj.flags.ignore_permissions = 1
bin_obj.insert()
else:
bin_obj = frappe.get_doc('Bin', bin)
bin_obj.flags.ignore_permissions = True
return bin_obj
def update_bin(args, allow_negative_stock=False, via_landed_cost_voucher=False):
2014-02-26 07:05:33 +00:00
is_stock_item = frappe.db.get_value('Item', args.get("item_code"), 'is_stock_item')
if is_stock_item:
bin = get_bin(args.get("item_code"), args.get("warehouse"))
bin.update_stock(args, allow_negative_stock, via_landed_cost_voucher)
return bin
else:
2014-04-14 13:50:45 +00:00
frappe.msgprint(_("Item {0} ignored since it is not a stock item").format(args.get("item_code")))
@frappe.whitelist()
def get_incoming_rate(args, raise_error_if_no_rate=True):
"""Get Incoming Rate based on valuation method"""
2017-12-15 06:43:50 +00:00
from erpnext.stock.stock_ledger import get_previous_sle, get_valuation_rate
if isinstance(args, string_types):
args = json.loads(args)
in_rate = 0
2014-08-29 10:58:31 +00:00
if (args.get("serial_no") or "").strip():
in_rate = get_avg_purchase_rate(args.get("serial_no"))
else:
valuation_method = get_valuation_method(args.get("item_code"))
previous_sle = get_previous_sle(args)
if valuation_method == 'FIFO':
2017-12-15 06:43:50 +00:00
if previous_sle:
previous_stock_queue = json.loads(previous_sle.get('stock_queue', '[]') or '[]')
in_rate = get_fifo_rate(previous_stock_queue, args.get("qty") or 0) if previous_stock_queue else 0
elif valuation_method == 'Moving Average':
in_rate = previous_sle.get('valuation_rate') or 0
2017-12-15 06:43:50 +00:00
if not in_rate:
voucher_no = args.get('voucher_no') or args.get('name')
in_rate = get_valuation_rate(args.get('item_code'), args.get('warehouse'),
args.get('voucher_type'), voucher_no, args.get('allow_zero_valuation'),
currency=erpnext.get_company_currency(args.get('company')), company=args.get('company'),
2019-06-01 08:52:46 +00:00
raise_error_if_no_rate=raise_error_if_no_rate)
2017-12-15 06:43:50 +00:00
return in_rate
def get_avg_purchase_rate(serial_nos):
"""get average value of serial numbers"""
serial_nos = get_valid_serial_nos(serial_nos)
return flt(frappe.db.sql("""select avg(purchase_rate) from `tabSerial No`
where name in (%s)""" % ", ".join(["%s"] * len(serial_nos)),
tuple(serial_nos))[0][0])
def get_valuation_method(item_code):
"""get valuation method from item or default"""
2014-02-26 07:05:33 +00:00
val_method = frappe.db.get_value('Item', item_code, 'valuation_method')
if not val_method:
val_method = frappe.db.get_value("Stock Settings", None, "valuation_method") or "FIFO"
return val_method
def get_fifo_rate(previous_stock_queue, qty):
"""get FIFO (average) Rate from Queue"""
if qty >= 0:
total = sum(f[0] for f in previous_stock_queue)
2014-10-21 14:53:39 +00:00
return sum(flt(f[0]) * flt(f[1]) for f in previous_stock_queue) / flt(total) if total else 0.0
else:
available_qty_for_outgoing, outgoing_cost = 0, 0
qty_to_pop = abs(qty)
2013-01-17 06:52:59 +00:00
while qty_to_pop and previous_stock_queue:
batch = previous_stock_queue[0]
2015-08-05 13:27:26 +00:00
if 0 < batch[0] <= qty_to_pop:
# if batch qty > 0
# not enough or exactly same qty in current batch, clear batch
available_qty_for_outgoing += flt(batch[0])
outgoing_cost += flt(batch[0]) * flt(batch[1])
qty_to_pop -= batch[0]
previous_stock_queue.pop(0)
else:
# all from current batch
available_qty_for_outgoing += flt(qty_to_pop)
outgoing_cost += flt(qty_to_pop) * flt(batch[1])
batch[0] -= qty_to_pop
qty_to_pop = 0
return outgoing_cost / available_qty_for_outgoing
def get_valid_serial_nos(sr_nos, qty=0, item_code=''):
"""split serial nos, validate and return list of valid serial nos"""
# TODO: remove duplicates in client side
serial_nos = cstr(sr_nos).strip().replace(',', '\n').split('\n')
valid_serial_nos = []
for val in serial_nos:
if val:
val = val.strip()
if val in valid_serial_nos:
2014-04-14 13:50:45 +00:00
frappe.throw(_("Serial number {0} entered more than once").format(val))
else:
valid_serial_nos.append(val)
if qty and len(valid_serial_nos) != abs(qty):
2014-04-14 13:50:45 +00:00
frappe.throw(_("{0} valid serial nos for Item {1}").format(abs(qty), item_code))
return valid_serial_nos
2013-03-06 13:20:53 +00:00
2013-10-10 10:34:40 +00:00
def validate_warehouse_company(warehouse, company):
2014-02-26 07:05:33 +00:00
warehouse_company = frappe.db.get_value("Warehouse", warehouse, "company")
2013-10-10 10:34:40 +00:00
if warehouse_company and warehouse_company != company:
2014-04-14 13:50:45 +00:00
frappe.throw(_("Warehouse {0} does not belong to company {1}").format(warehouse, company),
InvalidWarehouseCompany)
def is_group_warehouse(warehouse):
2016-06-26 17:20:11 +00:00
if frappe.db.get_value("Warehouse", warehouse, "is_group"):
frappe.throw(_("Group node warehouse is not allowed to select for transactions"))
def update_included_uom_in_report(columns, result, include_uom, conversion_factors):
if not include_uom or not conversion_factors:
return
convertible_cols = {}
is_dict_obj = False
if isinstance(result[0], dict):
is_dict_obj = True
convertible_columns = {}
for idx, d in enumerate(columns):
key = d.get("fieldname") if is_dict_obj else idx
if d.get("convertible"):
convertible_columns.setdefault(key, d.get("convertible"))
# Add new column to show qty/rate as per the selected UOM
columns.insert(idx+1, {
'label': "{0} (per {1})".format(d.get("label"), include_uom),
'fieldname': "{0}_{1}".format(d.get("fieldname"), frappe.scrub(include_uom)),
'fieldtype': 'Currency' if d.get("convertible") == 'rate' else 'Float'
})
for row_idx, row in enumerate(result):
data = row.items() if is_dict_obj else enumerate(row)
for key, value in data:
if not key in convertible_columns or not conversion_factors[row_idx]:
continue
if convertible_columns.get(key) == 'rate':
new_value = flt(value) * conversion_factors[row_idx]
else:
new_value = flt(value) / conversion_factors[row_idx]
if not is_dict_obj:
row.insert(key+1, new_value)
else:
new_key = "{0}_{1}".format(key, frappe.scrub(include_uom))
row[new_key] = new_value
2019-04-28 13:09:18 +00:00
def get_available_serial_nos(item_code, warehouse):
return frappe.get_all("Serial No", filters = {'item_code': item_code,
'warehouse': warehouse, 'delivery_document_no': ''}) or []
def add_additional_uom_columns(columns, result, include_uom, conversion_factors):
if not include_uom or not conversion_factors:
return
convertible_column_map = {}
for col_idx in list(reversed(range(0, len(columns)))):
col = columns[col_idx]
if isinstance(col, dict) and col.get('convertible') in ['rate', 'qty']:
next_col = col_idx + 1
columns.insert(next_col, col.copy())
columns[next_col]['fieldname'] += '_alt'
convertible_column_map[col.get('fieldname')] = frappe._dict({
'converted_col': columns[next_col]['fieldname'],
'for_type': col.get('convertible')
})
if col.get('convertible') == 'rate':
columns[next_col]['label'] += ' (per {})'.format(include_uom)
else:
columns[next_col]['label'] += ' ({})'.format(include_uom)
for row_idx, row in enumerate(result):
for convertible_col, data in convertible_column_map.items():
conversion_factor = conversion_factors[row.get('item_code')] or 1
for_type = data.for_type
value_before_conversion = row.get(convertible_col)
if for_type == 'rate':
row[data.converted_col] = flt(value_before_conversion) * conversion_factor
else:
row[data.converted_col] = flt(value_before_conversion) / conversion_factor
result[row_idx] = row