933 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			933 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
 | |
| # License: GNU General Public License v3. See license.txt
 | |
| 
 | |
| 
 | |
| import json
 | |
| from collections import OrderedDict, defaultdict
 | |
| 
 | |
| import frappe
 | |
| from frappe import qb, scrub
 | |
| from frappe.desk.reportview import get_filters_cond, get_match_cond
 | |
| from frappe.query_builder import Criterion, CustomFunction
 | |
| from frappe.query_builder.functions import Concat, Locate, Sum
 | |
| from frappe.utils import nowdate, today, unique
 | |
| from pypika import Order
 | |
| 
 | |
| import erpnext
 | |
| from erpnext.stock.get_item_details import _get_item_tax_template
 | |
| 
 | |
| 
 | |
| # searches for active employees
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def employee_query(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	doctype = "Employee"
 | |
| 	conditions = []
 | |
| 	fields = get_fields(doctype, ["name", "employee_name"])
 | |
| 
 | |
| 	return frappe.db.sql(
 | |
| 		"""select {fields} from `tabEmployee`
 | |
| 		where status in ('Active', 'Suspended')
 | |
| 			and docstatus < 2
 | |
| 			and ({key} like %(txt)s
 | |
| 				or employee_name like %(txt)s)
 | |
| 			{fcond} {mcond}
 | |
| 		order by
 | |
| 			(case when locate(%(_txt)s, name) > 0 then locate(%(_txt)s, name) else 99999 end),
 | |
| 			(case when locate(%(_txt)s, employee_name) > 0 then locate(%(_txt)s, employee_name) else 99999 end),
 | |
| 			idx desc,
 | |
| 			name, employee_name
 | |
| 		limit %(page_len)s offset %(start)s""".format(
 | |
| 			**{
 | |
| 				"fields": ", ".join(fields),
 | |
| 				"key": searchfield,
 | |
| 				"fcond": get_filters_cond(doctype, filters, conditions),
 | |
| 				"mcond": get_match_cond(doctype),
 | |
| 			}
 | |
| 		),
 | |
| 		{"txt": "%%%s%%" % txt, "_txt": txt.replace("%", ""), "start": start, "page_len": page_len},
 | |
| 	)
 | |
| 
 | |
| 
 | |
| # searches for leads which are not converted
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def lead_query(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	doctype = "Lead"
 | |
| 	fields = get_fields(doctype, ["name", "lead_name", "company_name"])
 | |
| 
 | |
| 	searchfields = frappe.get_meta(doctype).get_search_fields()
 | |
| 	searchfields = " or ".join(field + " like %(txt)s" for field in searchfields)
 | |
| 
 | |
| 	return frappe.db.sql(
 | |
| 		"""select {fields} from `tabLead`
 | |
| 		where docstatus < 2
 | |
| 			and ifnull(status, '') != 'Converted'
 | |
| 			and ({key} like %(txt)s
 | |
| 				or lead_name like %(txt)s
 | |
| 				or company_name like %(txt)s
 | |
| 				or {scond})
 | |
| 			{mcond}
 | |
| 		order by
 | |
| 			(case when locate(%(_txt)s, name) > 0 then locate(%(_txt)s, name) else 99999 end),
 | |
| 			(case when locate(%(_txt)s, lead_name) > 0 then locate(%(_txt)s, lead_name) else 99999 end),
 | |
| 			(case when locate(%(_txt)s, company_name) > 0 then locate(%(_txt)s, company_name) else 99999 end),
 | |
| 			idx desc,
 | |
| 			name, lead_name
 | |
| 		limit %(page_len)s offset %(start)s""".format(
 | |
| 			**{
 | |
| 				"fields": ", ".join(fields),
 | |
| 				"key": searchfield,
 | |
| 				"scond": searchfields,
 | |
| 				"mcond": get_match_cond(doctype),
 | |
| 			}
 | |
| 		),
 | |
| 		{"txt": "%%%s%%" % txt, "_txt": txt.replace("%", ""), "start": start, "page_len": page_len},
 | |
| 	)
 | |
| 
 | |
| 	# searches for customer
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def customer_query(doctype, txt, searchfield, start, page_len, filters, as_dict=False):
 | |
| 	doctype = "Customer"
 | |
| 	conditions = []
 | |
| 	cust_master_name = frappe.defaults.get_user_default("cust_master_name")
 | |
| 
 | |
| 	fields = ["name"]
 | |
| 	if cust_master_name != "Customer Name":
 | |
| 		fields.append("customer_name")
 | |
| 
 | |
| 	fields = get_fields(doctype, fields)
 | |
| 	searchfields = frappe.get_meta(doctype).get_search_fields()
 | |
| 	searchfields = " or ".join(field + " like %(txt)s" for field in searchfields)
 | |
| 
 | |
| 	return frappe.db.sql(
 | |
| 		"""select {fields} from `tabCustomer`
 | |
| 		where docstatus < 2
 | |
| 			and ({scond}) and disabled=0
 | |
| 			{fcond} {mcond}
 | |
| 		order by
 | |
| 			(case when locate(%(_txt)s, name) > 0 then locate(%(_txt)s, name) else 99999 end),
 | |
| 			(case when locate(%(_txt)s, customer_name) > 0 then locate(%(_txt)s, customer_name) else 99999 end),
 | |
| 			idx desc,
 | |
| 			name, customer_name
 | |
| 		limit %(page_len)s offset %(start)s""".format(
 | |
| 			**{
 | |
| 				"fields": ", ".join(fields),
 | |
| 				"scond": searchfields,
 | |
| 				"mcond": get_match_cond(doctype),
 | |
| 				"fcond": get_filters_cond(doctype, filters, conditions).replace("%", "%%"),
 | |
| 			}
 | |
| 		),
 | |
| 		{"txt": "%%%s%%" % txt, "_txt": txt.replace("%", ""), "start": start, "page_len": page_len},
 | |
| 		as_dict=as_dict,
 | |
| 	)
 | |
| 
 | |
| 
 | |
| # searches for supplier
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def supplier_query(doctype, txt, searchfield, start, page_len, filters, as_dict=False):
 | |
| 	doctype = "Supplier"
 | |
| 	supp_master_name = frappe.defaults.get_user_default("supp_master_name")
 | |
| 
 | |
| 	fields = ["name"]
 | |
| 	if supp_master_name != "Supplier Name":
 | |
| 		fields.append("supplier_name")
 | |
| 
 | |
| 	fields = get_fields(doctype, fields)
 | |
| 
 | |
| 	return frappe.db.sql(
 | |
| 		"""select {field} from `tabSupplier`
 | |
| 		where docstatus < 2
 | |
| 			and ({key} like %(txt)s
 | |
| 			or supplier_name like %(txt)s) and disabled=0
 | |
| 			and (on_hold = 0 or (on_hold = 1 and CURRENT_DATE > release_date))
 | |
| 			{mcond}
 | |
| 		order by
 | |
| 			(case when locate(%(_txt)s, name) > 0 then locate(%(_txt)s, name) else 99999 end),
 | |
| 			(case when locate(%(_txt)s, supplier_name) > 0 then locate(%(_txt)s, supplier_name) else 99999 end),
 | |
| 			idx desc,
 | |
| 			name, supplier_name
 | |
| 		limit %(page_len)s offset %(start)s""".format(
 | |
| 			**{"field": ", ".join(fields), "key": searchfield, "mcond": get_match_cond(doctype)}
 | |
| 		),
 | |
| 		{"txt": "%%%s%%" % txt, "_txt": txt.replace("%", ""), "start": start, "page_len": page_len},
 | |
| 		as_dict=as_dict,
 | |
| 	)
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def tax_account_query(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	doctype = "Account"
 | |
| 	company_currency = erpnext.get_company_currency(filters.get("company"))
 | |
| 
 | |
| 	def get_accounts(with_account_type_filter):
 | |
| 		account_type_condition = ""
 | |
| 		if with_account_type_filter:
 | |
| 			account_type_condition = "AND account_type in %(account_types)s"
 | |
| 
 | |
| 		accounts = frappe.db.sql(
 | |
| 			"""
 | |
| 			SELECT name, parent_account
 | |
| 			FROM `tabAccount`
 | |
| 			WHERE `tabAccount`.docstatus!=2
 | |
| 				{account_type_condition}
 | |
| 				AND is_group = 0
 | |
| 				AND company = %(company)s
 | |
| 				AND disabled = %(disabled)s
 | |
| 				AND (account_currency = %(currency)s or ifnull(account_currency, '') = '')
 | |
| 				AND `{searchfield}` LIKE %(txt)s
 | |
| 				{mcond}
 | |
| 			ORDER BY idx DESC, name
 | |
| 			LIMIT %(limit)s offset %(offset)s
 | |
| 		""".format(
 | |
| 				account_type_condition=account_type_condition,
 | |
| 				searchfield=searchfield,
 | |
| 				mcond=get_match_cond(doctype),
 | |
| 			),
 | |
| 			dict(
 | |
| 				account_types=filters.get("account_type"),
 | |
| 				company=filters.get("company"),
 | |
| 				disabled=filters.get("disabled", 0),
 | |
| 				currency=company_currency,
 | |
| 				txt="%{}%".format(txt),
 | |
| 				offset=start,
 | |
| 				limit=page_len,
 | |
| 			),
 | |
| 		)
 | |
| 
 | |
| 		return accounts
 | |
| 
 | |
| 	tax_accounts = get_accounts(True)
 | |
| 
 | |
| 	if not tax_accounts:
 | |
| 		tax_accounts = get_accounts(False)
 | |
| 
 | |
| 	return tax_accounts
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def item_query(doctype, txt, searchfield, start, page_len, filters, as_dict=False):
 | |
| 	doctype = "Item"
 | |
| 	conditions = []
 | |
| 
 | |
| 	if isinstance(filters, str):
 | |
| 		filters = json.loads(filters)
 | |
| 
 | |
| 	# Get searchfields from meta and use in Item Link field query
 | |
| 	meta = frappe.get_meta(doctype, cached=True)
 | |
| 	searchfields = meta.get_search_fields()
 | |
| 
 | |
| 	columns = ""
 | |
| 	extra_searchfields = [field for field in searchfields if field not in ["name", "description"]]
 | |
| 
 | |
| 	if extra_searchfields:
 | |
| 		columns += ", " + ", ".join(extra_searchfields)
 | |
| 
 | |
| 	if "description" in searchfields:
 | |
| 		columns += """, if(length(tabItem.description) > 40, \
 | |
| 			concat(substr(tabItem.description, 1, 40), "..."), description) as description"""
 | |
| 
 | |
| 	searchfields = searchfields + [
 | |
| 		field
 | |
| 		for field in [
 | |
| 			searchfield or "name",
 | |
| 			"item_code",
 | |
| 			"item_group",
 | |
| 			"item_name",
 | |
| 		]
 | |
| 		if field not in searchfields
 | |
| 	]
 | |
| 	searchfields = " or ".join([field + " like %(txt)s" for field in searchfields])
 | |
| 
 | |
| 	if filters and isinstance(filters, dict):
 | |
| 		if filters.get("customer") or filters.get("supplier"):
 | |
| 			party = filters.get("customer") or filters.get("supplier")
 | |
| 			item_rules_list = frappe.get_all(
 | |
| 				"Party Specific Item", filters={"party": party}, fields=["restrict_based_on", "based_on_value"]
 | |
| 			)
 | |
| 
 | |
| 			filters_dict = {}
 | |
| 			for rule in item_rules_list:
 | |
| 				if rule["restrict_based_on"] == "Item":
 | |
| 					rule["restrict_based_on"] = "name"
 | |
| 				filters_dict[rule.restrict_based_on] = []
 | |
| 
 | |
| 			for rule in item_rules_list:
 | |
| 				filters_dict[rule.restrict_based_on].append(rule.based_on_value)
 | |
| 
 | |
| 			for filter in filters_dict:
 | |
| 				filters[scrub(filter)] = ["in", filters_dict[filter]]
 | |
| 
 | |
| 			if filters.get("customer"):
 | |
| 				del filters["customer"]
 | |
| 			else:
 | |
| 				del filters["supplier"]
 | |
| 		else:
 | |
| 			filters.pop("customer", None)
 | |
| 			filters.pop("supplier", None)
 | |
| 
 | |
| 	description_cond = ""
 | |
| 	if frappe.db.count(doctype, cache=True) < 50000:
 | |
| 		# scan description only if items are less than 50000
 | |
| 		description_cond = "or tabItem.description LIKE %(txt)s"
 | |
| 
 | |
| 	return frappe.db.sql(
 | |
| 		"""select
 | |
| 			tabItem.name {columns}
 | |
| 		from tabItem
 | |
| 		where tabItem.docstatus < 2
 | |
| 			and tabItem.disabled=0
 | |
| 			and tabItem.has_variants=0
 | |
| 			and (tabItem.end_of_life > %(today)s or ifnull(tabItem.end_of_life, '0000-00-00')='0000-00-00')
 | |
| 			and ({scond} or tabItem.item_code IN (select parent from `tabItem Barcode` where barcode LIKE %(txt)s)
 | |
| 				{description_cond})
 | |
| 			{fcond} {mcond}
 | |
| 		order by
 | |
| 			if(locate(%(_txt)s, name), locate(%(_txt)s, name), 99999),
 | |
| 			if(locate(%(_txt)s, item_name), locate(%(_txt)s, item_name), 99999),
 | |
| 			idx desc,
 | |
| 			name, item_name
 | |
| 		limit %(start)s, %(page_len)s """.format(
 | |
| 			columns=columns,
 | |
| 			scond=searchfields,
 | |
| 			fcond=get_filters_cond(doctype, filters, conditions).replace("%", "%%"),
 | |
| 			mcond=get_match_cond(doctype).replace("%", "%%"),
 | |
| 			description_cond=description_cond,
 | |
| 		),
 | |
| 		{
 | |
| 			"today": nowdate(),
 | |
| 			"txt": "%%%s%%" % txt,
 | |
| 			"_txt": txt.replace("%", ""),
 | |
| 			"start": start,
 | |
| 			"page_len": page_len,
 | |
| 		},
 | |
| 		as_dict=as_dict,
 | |
| 	)
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def bom(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	doctype = "BOM"
 | |
| 	conditions = []
 | |
| 	fields = get_fields(doctype, ["name", "item"])
 | |
| 
 | |
| 	return frappe.db.sql(
 | |
| 		"""select {fields}
 | |
| 		from `tabBOM`
 | |
| 		where `tabBOM`.docstatus=1
 | |
| 			and `tabBOM`.is_active=1
 | |
| 			and `tabBOM`.`{key}` like %(txt)s
 | |
| 			{fcond} {mcond}
 | |
| 		order by
 | |
| 			(case when locate(%(_txt)s, name) > 0 then locate(%(_txt)s, name) else 99999 end),
 | |
| 			idx desc, name
 | |
| 		limit %(page_len)s offset %(start)s""".format(
 | |
| 			fields=", ".join(fields),
 | |
| 			fcond=get_filters_cond(doctype, filters, conditions).replace("%", "%%"),
 | |
| 			mcond=get_match_cond(doctype).replace("%", "%%"),
 | |
| 			key=searchfield,
 | |
| 		),
 | |
| 		{
 | |
| 			"txt": "%" + txt + "%",
 | |
| 			"_txt": txt.replace("%", ""),
 | |
| 			"start": start or 0,
 | |
| 			"page_len": page_len or 20,
 | |
| 		},
 | |
| 	)
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def get_project_name(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	proj = qb.DocType("Project")
 | |
| 	qb_filter_and_conditions = []
 | |
| 	qb_filter_or_conditions = []
 | |
| 	ifelse = CustomFunction("IF", ["condition", "then", "else"])
 | |
| 
 | |
| 	if filters and filters.get("customer"):
 | |
| 		qb_filter_and_conditions.append(proj.customer == filters.get("customer"))
 | |
| 
 | |
| 	qb_filter_and_conditions.append(proj.status.notin(["Completed", "Cancelled"]))
 | |
| 
 | |
| 	q = qb.from_(proj)
 | |
| 
 | |
| 	fields = get_fields(doctype, ["name", "project_name"])
 | |
| 	for x in fields:
 | |
| 		q = q.select(proj[x])
 | |
| 
 | |
| 	# don't consider 'customer' and 'status' fields for pattern search, as they must be exactly matched
 | |
| 	searchfields = [
 | |
| 		x for x in frappe.get_meta(doctype).get_search_fields() if x not in ["customer", "status"]
 | |
| 	]
 | |
| 
 | |
| 	# pattern search
 | |
| 	if txt:
 | |
| 		for x in searchfields:
 | |
| 			qb_filter_or_conditions.append(proj[x].like(f"%{txt}%"))
 | |
| 
 | |
| 	q = q.where(Criterion.all(qb_filter_and_conditions)).where(Criterion.any(qb_filter_or_conditions))
 | |
| 
 | |
| 	# ordering
 | |
| 	if txt:
 | |
| 		# project_name containing search string 'txt' will be given higher precedence
 | |
| 		q = q.orderby(ifelse(Locate(txt, proj.project_name) > 0, Locate(txt, proj.project_name), 99999))
 | |
| 	q = q.orderby(proj.idx, order=Order.desc).orderby(proj.name)
 | |
| 
 | |
| 	if page_len:
 | |
| 		q = q.limit(page_len)
 | |
| 
 | |
| 	if start:
 | |
| 		q = q.offset(start)
 | |
| 	return q.run()
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def get_delivery_notes_to_be_billed(doctype, txt, searchfield, start, page_len, filters, as_dict):
 | |
| 	doctype = "Delivery Note"
 | |
| 	fields = get_fields(doctype, ["name", "customer", "posting_date"])
 | |
| 
 | |
| 	return frappe.db.sql(
 | |
| 		"""
 | |
| 		select %(fields)s
 | |
| 		from `tabDelivery Note`
 | |
| 		where `tabDelivery Note`.`%(key)s` like %(txt)s and
 | |
| 			`tabDelivery Note`.docstatus = 1
 | |
| 			and status not in ('Stopped', 'Closed') %(fcond)s
 | |
| 			and (
 | |
| 				(`tabDelivery Note`.is_return = 0 and `tabDelivery Note`.per_billed < 100)
 | |
| 				or (`tabDelivery Note`.grand_total = 0 and `tabDelivery Note`.per_billed < 100)
 | |
| 				or (
 | |
| 					`tabDelivery Note`.is_return = 1
 | |
| 					and return_against in (select name from `tabDelivery Note` where per_billed < 100)
 | |
| 				)
 | |
| 			)
 | |
| 			%(mcond)s order by `tabDelivery Note`.`%(key)s` asc limit %(page_len)s offset %(start)s
 | |
| 	"""
 | |
| 		% {
 | |
| 			"fields": ", ".join(["`tabDelivery Note`.{0}".format(f) for f in fields]),
 | |
| 			"key": searchfield,
 | |
| 			"fcond": get_filters_cond(doctype, filters, []),
 | |
| 			"mcond": get_match_cond(doctype),
 | |
| 			"start": start,
 | |
| 			"page_len": page_len,
 | |
| 			"txt": "%(txt)s",
 | |
| 		},
 | |
| 		{"txt": ("%%%s%%" % txt)},
 | |
| 		as_dict=as_dict,
 | |
| 	)
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def get_batch_no(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	doctype = "Batch"
 | |
| 	meta = frappe.get_meta(doctype, cached=True)
 | |
| 	searchfields = meta.get_search_fields()
 | |
| 
 | |
| 	batches = get_batches_from_stock_ledger_entries(searchfields, txt, filters, start, page_len)
 | |
| 	batches.extend(
 | |
| 		get_batches_from_serial_and_batch_bundle(searchfields, txt, filters, start, page_len)
 | |
| 	)
 | |
| 
 | |
| 	filtered_batches = get_filterd_batches(batches)
 | |
| 
 | |
| 	return filtered_batches
 | |
| 
 | |
| 
 | |
| def get_filterd_batches(data):
 | |
| 	batches = OrderedDict()
 | |
| 
 | |
| 	for batch_data in data:
 | |
| 		if batch_data[0] not in batches:
 | |
| 			batches[batch_data[0]] = list(batch_data)
 | |
| 		else:
 | |
| 			batches[batch_data[0]][1] += batch_data[1]
 | |
| 
 | |
| 	filterd_batch = []
 | |
| 	for batch, batch_data in batches.items():
 | |
| 		if batch_data[1] > 0:
 | |
| 			filterd_batch.append(tuple(batch_data))
 | |
| 
 | |
| 	return filterd_batch
 | |
| 
 | |
| 
 | |
| def get_batches_from_stock_ledger_entries(searchfields, txt, filters, start=0, page_len=100):
 | |
| 	stock_ledger_entry = frappe.qb.DocType("Stock Ledger Entry")
 | |
| 	batch_table = frappe.qb.DocType("Batch")
 | |
| 
 | |
| 	expiry_date = filters.get("posting_date") or today()
 | |
| 
 | |
| 	query = (
 | |
| 		frappe.qb.from_(stock_ledger_entry)
 | |
| 		.inner_join(batch_table)
 | |
| 		.on(batch_table.name == stock_ledger_entry.batch_no)
 | |
| 		.select(
 | |
| 			stock_ledger_entry.batch_no,
 | |
| 			Sum(stock_ledger_entry.actual_qty).as_("qty"),
 | |
| 		)
 | |
| 		.where(((batch_table.expiry_date >= expiry_date) | (batch_table.expiry_date.isnull())))
 | |
| 		.where(stock_ledger_entry.is_cancelled == 0)
 | |
| 		.where(
 | |
| 			(stock_ledger_entry.item_code == filters.get("item_code"))
 | |
| 			& (batch_table.disabled == 0)
 | |
| 			& (stock_ledger_entry.batch_no.isnotnull())
 | |
| 		)
 | |
| 		.groupby(stock_ledger_entry.batch_no, stock_ledger_entry.warehouse)
 | |
| 		.offset(start)
 | |
| 		.limit(page_len)
 | |
| 	)
 | |
| 
 | |
| 	query = query.select(
 | |
| 		Concat("MFG-", batch_table.manufacturing_date).as_("manufacturing_date"),
 | |
| 		Concat("EXP-", batch_table.expiry_date).as_("expiry_date"),
 | |
| 	)
 | |
| 
 | |
| 	if filters.get("warehouse"):
 | |
| 		query = query.where(stock_ledger_entry.warehouse == filters.get("warehouse"))
 | |
| 
 | |
| 	for field in searchfields:
 | |
| 		query = query.select(batch_table[field])
 | |
| 
 | |
| 	if txt:
 | |
| 		txt_condition = batch_table.name.like("%{0}%".format(txt))
 | |
| 		for field in searchfields + ["name"]:
 | |
| 			txt_condition |= batch_table[field].like("%{0}%".format(txt))
 | |
| 
 | |
| 		query = query.where(txt_condition)
 | |
| 
 | |
| 	return query.run(as_list=1) or []
 | |
| 
 | |
| 
 | |
| def get_batches_from_serial_and_batch_bundle(searchfields, txt, filters, start=0, page_len=100):
 | |
| 	bundle = frappe.qb.DocType("Serial and Batch Entry")
 | |
| 	stock_ledger_entry = frappe.qb.DocType("Stock Ledger Entry")
 | |
| 	batch_table = frappe.qb.DocType("Batch")
 | |
| 
 | |
| 	expiry_date = filters.get("posting_date") or today()
 | |
| 
 | |
| 	bundle_query = (
 | |
| 		frappe.qb.from_(bundle)
 | |
| 		.inner_join(stock_ledger_entry)
 | |
| 		.on(bundle.parent == stock_ledger_entry.serial_and_batch_bundle)
 | |
| 		.inner_join(batch_table)
 | |
| 		.on(batch_table.name == bundle.batch_no)
 | |
| 		.select(
 | |
| 			bundle.batch_no,
 | |
| 			Sum(bundle.qty).as_("qty"),
 | |
| 		)
 | |
| 		.where(((batch_table.expiry_date >= expiry_date) | (batch_table.expiry_date.isnull())))
 | |
| 		.where(stock_ledger_entry.is_cancelled == 0)
 | |
| 		.where(
 | |
| 			(stock_ledger_entry.item_code == filters.get("item_code"))
 | |
| 			& (batch_table.disabled == 0)
 | |
| 			& (stock_ledger_entry.serial_and_batch_bundle.isnotnull())
 | |
| 		)
 | |
| 		.groupby(bundle.batch_no, bundle.warehouse)
 | |
| 		.offset(start)
 | |
| 		.limit(page_len)
 | |
| 	)
 | |
| 
 | |
| 	bundle_query = bundle_query.select(
 | |
| 		Concat("MFG-", batch_table.manufacturing_date),
 | |
| 		Concat("EXP-", batch_table.expiry_date),
 | |
| 	)
 | |
| 
 | |
| 	if filters.get("warehouse"):
 | |
| 		bundle_query = bundle_query.where(stock_ledger_entry.warehouse == filters.get("warehouse"))
 | |
| 
 | |
| 	for field in searchfields:
 | |
| 		bundle_query = bundle_query.select(batch_table[field])
 | |
| 
 | |
| 	if txt:
 | |
| 		txt_condition = batch_table.name.like("%{0}%".format(txt))
 | |
| 		for field in searchfields + ["name"]:
 | |
| 			txt_condition |= batch_table[field].like("%{0}%".format(txt))
 | |
| 
 | |
| 		bundle_query = bundle_query.where(txt_condition)
 | |
| 
 | |
| 	return bundle_query.run(as_list=1)
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def get_account_list(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	doctype = "Account"
 | |
| 	filter_list = []
 | |
| 
 | |
| 	if isinstance(filters, dict):
 | |
| 		for key, val in filters.items():
 | |
| 			if isinstance(val, (list, tuple)):
 | |
| 				filter_list.append([doctype, key, val[0], val[1]])
 | |
| 			else:
 | |
| 				filter_list.append([doctype, key, "=", val])
 | |
| 	elif isinstance(filters, list):
 | |
| 		filter_list.extend(filters)
 | |
| 
 | |
| 	if "is_group" not in [d[1] for d in filter_list]:
 | |
| 		filter_list.append(["Account", "is_group", "=", "0"])
 | |
| 
 | |
| 	if searchfield and txt:
 | |
| 		filter_list.append([doctype, searchfield, "like", "%%%s%%" % txt])
 | |
| 
 | |
| 	return frappe.desk.reportview.execute(
 | |
| 		doctype,
 | |
| 		filters=filter_list,
 | |
| 		fields=["name", "parent_account"],
 | |
| 		limit_start=start,
 | |
| 		limit_page_length=page_len,
 | |
| 		as_list=True,
 | |
| 	)
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def get_blanket_orders(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	return frappe.db.sql(
 | |
| 		"""select distinct bo.name, bo.blanket_order_type, bo.to_date
 | |
| 		from `tabBlanket Order` bo, `tabBlanket Order Item` boi
 | |
| 		where
 | |
| 			boi.parent = bo.name
 | |
| 			and boi.item_code = {item_code}
 | |
| 			and bo.blanket_order_type = '{blanket_order_type}'
 | |
| 			and bo.company = {company}
 | |
| 			and bo.docstatus = 1""".format(
 | |
| 			item_code=frappe.db.escape(filters.get("item")),
 | |
| 			blanket_order_type=filters.get("blanket_order_type"),
 | |
| 			company=frappe.db.escape(filters.get("company")),
 | |
| 		)
 | |
| 	)
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def get_income_account(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	from erpnext.controllers.queries import get_match_cond
 | |
| 
 | |
| 	# income account can be any Credit account,
 | |
| 	# but can also be a Asset account with account_type='Income Account' in special circumstances.
 | |
| 	# Hence the first condition is an "OR"
 | |
| 	if not filters:
 | |
| 		filters = {}
 | |
| 
 | |
| 	doctype = "Account"
 | |
| 	condition = ""
 | |
| 	if filters.get("company"):
 | |
| 		condition += "and tabAccount.company = %(company)s"
 | |
| 
 | |
| 	condition += f"and tabAccount.disabled = {filters.get('disabled', 0)}"
 | |
| 
 | |
| 	return frappe.db.sql(
 | |
| 		"""select tabAccount.name from `tabAccount`
 | |
| 			where (tabAccount.report_type = "Profit and Loss"
 | |
| 					or tabAccount.account_type in ("Income Account", "Temporary"))
 | |
| 				and tabAccount.is_group=0
 | |
| 				and tabAccount.`{key}` LIKE %(txt)s
 | |
| 				{condition} {match_condition}
 | |
| 			order by idx desc, name""".format(
 | |
| 			condition=condition, match_condition=get_match_cond(doctype), key=searchfield
 | |
| 		),
 | |
| 		{"txt": "%" + txt + "%", "company": filters.get("company", "")},
 | |
| 	)
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def get_filtered_dimensions(
 | |
| 	doctype, txt, searchfield, start, page_len, filters, reference_doctype=None
 | |
| ):
 | |
| 	from erpnext.accounts.doctype.accounting_dimension_filter.accounting_dimension_filter import (
 | |
| 		get_dimension_filter_map,
 | |
| 	)
 | |
| 
 | |
| 	dimension_filters = get_dimension_filter_map()
 | |
| 	dimension_filters = dimension_filters.get((filters.get("dimension"), filters.get("account")))
 | |
| 	query_filters = []
 | |
| 	or_filters = []
 | |
| 	fields = ["name"]
 | |
| 
 | |
| 	searchfields = frappe.get_meta(doctype).get_search_fields()
 | |
| 
 | |
| 	meta = frappe.get_meta(doctype)
 | |
| 	if meta.is_tree:
 | |
| 		query_filters.append(["is_group", "=", 0])
 | |
| 
 | |
| 	if meta.has_field("disabled"):
 | |
| 		query_filters.append(["disabled", "!=", 1])
 | |
| 
 | |
| 	if meta.has_field("company"):
 | |
| 		query_filters.append(["company", "=", filters.get("company")])
 | |
| 
 | |
| 	for field in searchfields:
 | |
| 		or_filters.append([field, "LIKE", "%%%s%%" % txt])
 | |
| 		fields.append(field)
 | |
| 
 | |
| 	if dimension_filters:
 | |
| 		if dimension_filters["allow_or_restrict"] == "Allow":
 | |
| 			query_selector = "in"
 | |
| 		else:
 | |
| 			query_selector = "not in"
 | |
| 
 | |
| 		if len(dimension_filters["allowed_dimensions"]) == 1:
 | |
| 			dimensions = tuple(dimension_filters["allowed_dimensions"] * 2)
 | |
| 		else:
 | |
| 			dimensions = tuple(dimension_filters["allowed_dimensions"])
 | |
| 
 | |
| 		query_filters.append(["name", query_selector, dimensions])
 | |
| 
 | |
| 	output = frappe.get_list(
 | |
| 		doctype,
 | |
| 		fields=fields,
 | |
| 		filters=query_filters,
 | |
| 		or_filters=or_filters,
 | |
| 		as_list=1,
 | |
| 		reference_doctype=reference_doctype,
 | |
| 	)
 | |
| 
 | |
| 	return [tuple(d) for d in set(output)]
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def get_expense_account(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	from erpnext.controllers.queries import get_match_cond
 | |
| 
 | |
| 	if not filters:
 | |
| 		filters = {}
 | |
| 
 | |
| 	doctype = "Account"
 | |
| 	condition = ""
 | |
| 	if filters.get("company"):
 | |
| 		condition += "and tabAccount.company = %(company)s"
 | |
| 
 | |
| 	return frappe.db.sql(
 | |
| 		"""select tabAccount.name from `tabAccount`
 | |
| 		where (tabAccount.report_type = "Profit and Loss"
 | |
| 				or tabAccount.account_type in ("Expense Account", "Fixed Asset", "Temporary", "Asset Received But Not Billed", "Capital Work in Progress"))
 | |
| 			and tabAccount.is_group=0
 | |
| 			and tabAccount.docstatus!=2
 | |
| 			and tabAccount.{key} LIKE %(txt)s
 | |
| 			{condition} {match_condition}""".format(
 | |
| 			condition=condition, key=searchfield, match_condition=get_match_cond(doctype)
 | |
| 		),
 | |
| 		{"company": filters.get("company", ""), "txt": "%" + txt + "%"},
 | |
| 	)
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def warehouse_query(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	# Should be used when item code is passed in filters.
 | |
| 	doctype = "Warehouse"
 | |
| 	conditions, bin_conditions = [], []
 | |
| 	filter_dict = get_doctype_wise_filters(filters)
 | |
| 
 | |
| 	query = """select `tabWarehouse`.name,
 | |
| 		CONCAT_WS(' : ', 'Actual Qty', ifnull(round(`tabBin`.actual_qty, 2), 0 )) actual_qty
 | |
| 		from `tabWarehouse` left join `tabBin`
 | |
| 		on `tabBin`.warehouse = `tabWarehouse`.name {bin_conditions}
 | |
| 		where
 | |
| 			`tabWarehouse`.`{key}` like {txt}
 | |
| 			{fcond} {mcond}
 | |
| 		order by ifnull(`tabBin`.actual_qty, 0) desc
 | |
| 		limit
 | |
| 			{page_len} offset {start}
 | |
| 		""".format(
 | |
| 		bin_conditions=get_filters_cond(
 | |
| 			doctype, filter_dict.get("Bin"), bin_conditions, ignore_permissions=True
 | |
| 		),
 | |
| 		key=searchfield,
 | |
| 		fcond=get_filters_cond(doctype, filter_dict.get("Warehouse"), conditions),
 | |
| 		mcond=get_match_cond(doctype),
 | |
| 		start=start,
 | |
| 		page_len=page_len,
 | |
| 		txt=frappe.db.escape("%{0}%".format(txt)),
 | |
| 	)
 | |
| 
 | |
| 	return frappe.db.sql(query)
 | |
| 
 | |
| 
 | |
| def get_doctype_wise_filters(filters):
 | |
| 	# Helper function to seperate filters doctype_wise
 | |
| 	filter_dict = defaultdict(list)
 | |
| 	for row in filters:
 | |
| 		filter_dict[row[0]].append(row)
 | |
| 	return filter_dict
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def get_batch_numbers(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	query = """select batch_id from `tabBatch`
 | |
| 			where disabled = 0
 | |
| 			and (expiry_date >= CURRENT_DATE or expiry_date IS NULL)
 | |
| 			and name like {txt}""".format(
 | |
| 		txt=frappe.db.escape("%{0}%".format(txt))
 | |
| 	)
 | |
| 
 | |
| 	if filters and filters.get("item"):
 | |
| 		query += " and item = {item}".format(item=frappe.db.escape(filters.get("item")))
 | |
| 
 | |
| 	return frappe.db.sql(query, filters)
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def item_manufacturer_query(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	item_filters = [
 | |
| 		["manufacturer", "like", "%" + txt + "%"],
 | |
| 		["item_code", "=", filters.get("item_code")],
 | |
| 	]
 | |
| 
 | |
| 	item_manufacturers = frappe.get_all(
 | |
| 		"Item Manufacturer",
 | |
| 		fields=["manufacturer", "manufacturer_part_no"],
 | |
| 		filters=item_filters,
 | |
| 		limit_start=start,
 | |
| 		limit_page_length=page_len,
 | |
| 		as_list=1,
 | |
| 	)
 | |
| 	return item_manufacturers
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def get_purchase_receipts(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	query = """
 | |
| 		select pr.name
 | |
| 		from `tabPurchase Receipt` pr, `tabPurchase Receipt Item` pritem
 | |
| 		where pr.docstatus = 1 and pritem.parent = pr.name
 | |
| 		and pr.name like {txt}""".format(
 | |
| 		txt=frappe.db.escape("%{0}%".format(txt))
 | |
| 	)
 | |
| 
 | |
| 	if filters and filters.get("item_code"):
 | |
| 		query += " and pritem.item_code = {item_code}".format(
 | |
| 			item_code=frappe.db.escape(filters.get("item_code"))
 | |
| 		)
 | |
| 
 | |
| 	return frappe.db.sql(query, filters)
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def get_purchase_invoices(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	query = """
 | |
| 		select pi.name
 | |
| 		from `tabPurchase Invoice` pi, `tabPurchase Invoice Item` piitem
 | |
| 		where pi.docstatus = 1 and piitem.parent = pi.name
 | |
| 		and pi.name like {txt}""".format(
 | |
| 		txt=frappe.db.escape("%{0}%".format(txt))
 | |
| 	)
 | |
| 
 | |
| 	if filters and filters.get("item_code"):
 | |
| 		query += " and piitem.item_code = {item_code}".format(
 | |
| 			item_code=frappe.db.escape(filters.get("item_code"))
 | |
| 		)
 | |
| 
 | |
| 	return frappe.db.sql(query, filters)
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def get_doctypes_for_closing(doctype, txt, searchfield, start, page_len, filters):
 | |
| 	doctypes = frappe.get_hooks("period_closing_doctypes")
 | |
| 	if txt:
 | |
| 		doctypes = [d for d in doctypes if txt.lower() in d.lower()]
 | |
| 	return [(d,) for d in set(doctypes)]
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def get_tax_template(doctype, txt, searchfield, start, page_len, filters):
 | |
| 
 | |
| 	item_doc = frappe.get_cached_doc("Item", filters.get("item_code"))
 | |
| 	item_group = filters.get("item_group")
 | |
| 	company = filters.get("company")
 | |
| 	taxes = item_doc.taxes or []
 | |
| 
 | |
| 	while item_group:
 | |
| 		item_group_doc = frappe.get_cached_doc("Item Group", item_group)
 | |
| 		taxes += item_group_doc.taxes or []
 | |
| 		item_group = item_group_doc.parent_item_group
 | |
| 
 | |
| 	if not taxes:
 | |
| 		return frappe.get_all(
 | |
| 			"Item Tax Template", filters={"disabled": 0, "company": company}, as_list=True
 | |
| 		)
 | |
| 	else:
 | |
| 		valid_from = filters.get("valid_from")
 | |
| 		valid_from = valid_from[1] if isinstance(valid_from, list) else valid_from
 | |
| 
 | |
| 		args = {
 | |
| 			"item_code": filters.get("item_code"),
 | |
| 			"posting_date": valid_from,
 | |
| 			"tax_category": filters.get("tax_category"),
 | |
| 			"company": company,
 | |
| 		}
 | |
| 
 | |
| 		taxes = _get_item_tax_template(args, taxes, for_validate=True)
 | |
| 		return [(d,) for d in set(taxes)]
 | |
| 
 | |
| 
 | |
| def get_fields(doctype, fields=None):
 | |
| 	if fields is None:
 | |
| 		fields = []
 | |
| 	meta = frappe.get_meta(doctype)
 | |
| 	fields.extend(meta.get_search_fields())
 | |
| 
 | |
| 	if meta.title_field and meta.title_field.strip() not in fields:
 | |
| 		fields.insert(1, meta.title_field.strip())
 | |
| 
 | |
| 	return unique(fields)
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def get_payment_terms_for_references(doctype, txt, searchfield, start, page_len, filters) -> list:
 | |
| 	terms = []
 | |
| 	if filters:
 | |
| 		terms = frappe.db.get_all(
 | |
| 			"Payment Schedule",
 | |
| 			filters={"parent": filters.get("reference")},
 | |
| 			fields=["payment_term"],
 | |
| 			limit=page_len,
 | |
| 			as_list=1,
 | |
| 		)
 | |
| 	return terms
 | |
| 
 | |
| 
 | |
| @frappe.whitelist()
 | |
| @frappe.validate_and_sanitize_search_inputs
 | |
| def get_filtered_child_rows(doctype, txt, searchfield, start, page_len, filters) -> list:
 | |
| 	table = frappe.qb.DocType(doctype)
 | |
| 	query = (
 | |
| 		frappe.qb.from_(table)
 | |
| 		.select(
 | |
| 			table.name,
 | |
| 			Concat("#", table.idx, ", ", table.item_code),
 | |
| 		)
 | |
| 		.orderby(table.idx)
 | |
| 		.offset(start)
 | |
| 		.limit(page_len)
 | |
| 	)
 | |
| 
 | |
| 	if filters:
 | |
| 		for field, value in filters.items():
 | |
| 			query = query.where(table[field] == value)
 | |
| 
 | |
| 	if txt:
 | |
| 		txt += "%"
 | |
| 		query = query.where(
 | |
| 			((table.idx.like(txt.replace("#", ""))) | (table.item_code.like(txt))) | (table.name.like(txt))
 | |
| 		)
 | |
| 
 | |
| 	return query.run(as_dict=False)
 |