custom_ui/custom_ui/db_utils.py

225 lines
9.2 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import frappe
import json
def map_field_name(frontend_field):
field_mapping = {
"customer_name": "custom_customer_to_bill",
"address": "address_line1",
"appointment_scheduled_status": "custom_onsite_meeting_scheduled",
"estimate_sent_status": "custom_estimate_sent_status",
"payment_received_status": "custom_payment_received_status",
"job_status": "custom_job_status",
"installation_address": "custom_installation_address",
"warranty_id": "name",
"customer": "customer_name",
"fromCompany": "from_company",
"warranty_status": "warranty_amc_status"
}
return field_mapping.get(frontend_field, frontend_field)
def process_filters(filters):
processed_filters = {}
if filters:
filters = json.loads(filters) if isinstance(filters, str) else filters
for field_name, filter_obj in filters.items():
if isinstance(filter_obj, dict) and "value" in filter_obj:
if filter_obj["value"] is not None and filter_obj["value"] != "":
# Map frontend field names to backend field names
address_fields = ["address_line1", "address_line2", "city", "state", "pincode"] if field_name == "address" else []
mapped_field_name = map_field_name(field_name)
# Handle different match modes
match_mode = filter_obj.get("match_mode", "contains")
if isinstance(match_mode, str):
match_mode = match_mode.lower()
# Special handling for address to search accross multiple fields
if address_fields:
address_filters = []
for addr_field in address_fields:
if match_mode in ("contains", "contains"):
address_filters.append([addr_field, "like", f"%{filter_obj['value']}%"])
elif match_mode in ("startswith", "starts_with"):
address_filters.append([addr_field, "like", f"{filter_obj['value']}%"])
elif match_mode in ("endswith", "ends_with"):
address_filters.append([addr_field, "like", f"%{filter_obj['value']}"])
elif match_mode in ("equals", "equals"):
address_filters.append([addr_field, "=", filter_obj["value"]])
else:
address_filters.append([addr_field, "like", f"%{filter_obj['value']}%"])
processed_filters = address_filters
continue # Skip the rest of the loop for address field
customer_name_fields = ["custom_customer_to_bill", "lead_name"] if field_name == "customer_name" else []
if customer_name_fields:
customer_name_filters = []
for cust_field in customer_name_fields:
if match_mode in ("contains", "contains"):
customer_name_filters.append([cust_field, "like", f"%{filter_obj['value']}%"])
elif match_mode in ("startswith", "starts_with"):
customer_name_filters.append([cust_field, "like", f"{filter_obj['value']}%"])
elif match_mode in ("endswith", "ends_with"):
customer_name_filters.append([cust_field, "like", f"%{filter_obj['value']}"])
elif match_mode in ("equals", "equals"):
customer_name_filters.append([cust_field, "=", filter_obj["value"]])
else:
customer_name_filters.append([cust_field, "like", f"%{filter_obj['value']}%"])
processed_filters = customer_name_filters
continue # Skip the rest of the loop for customer_name field
if match_mode in ("contains", "contains"):
processed_filters[mapped_field_name] = ["like", f"%{filter_obj['value']}%"]
elif match_mode in ("startswith", "starts_with"):
processed_filters[mapped_field_name] = ["like", f"{filter_obj['value']}%"]
elif match_mode in ("endswith", "ends_with"):
processed_filters[mapped_field_name] = ["like", f"%{filter_obj['value']}"]
elif match_mode in ("equals", "equals"):
processed_filters[mapped_field_name] = filter_obj["value"]
else:
# Default to contains
processed_filters[mapped_field_name] = ["like", f"%{filter_obj['value']}%"]
print("DEBUG: Processed filters:", processed_filters)
return processed_filters
def process_sorting(sortings):
sortings = json.loads(sortings) if isinstance(sortings, str) else sortings
order_by = ""
print("DEBUG: Original sorting:", sortings)
if sortings and len(sortings) > 0:
for sorting in sortings:
mapped_field = map_field_name(sorting[0].strip())
sort_direction = sorting[1].strip().lower()
order_by += f"{mapped_field} {sort_direction}, "
order_by = order_by.rstrip(", ")
else:
order_by = "modified desc"
print("DEBUG: Processed sorting:", order_by)
return order_by
def process_query_conditions(filters, sortings, page, page_size):
processed_filters = process_filters(filters)
processed_sortings = process_sorting(sortings)
is_or_filters = isinstance(processed_filters, list)
page_int = int(page)
page_size_int = int(page_size)
return processed_filters, processed_sortings, is_or_filters, page_int, page_size_int
def build_datatable_dict(data, count, page, page_size):
return {
"pagination": {
"total": count,
"page": page,
"page_size": page_size,
"total_pages": (count + page_size - 1) // page_size
},
"data": data
}
def get_count_or_filters(doctype, or_filters):
where_clauses = []
values = []
for field, operator, val in or_filters:
if operator.lower() == "like":
where_clauses.append(f"`{field}` LIKE %s")
else:
where_clauses.append(f"`{field}` {operator} %s")
values.append(val)
where_sql = " OR ".join(where_clauses)
sql = f"SELECT COUNT(*) FROM `tab{doctype}` WHERE {where_sql}"
return sql, values
def build_error_response(message, status_code=400):
return {
"status": "error",
"message": message,
"status_code": status_code
}
def build_success_response(data):
return {
"status": "success",
"data": data
}
def build_full_address(doc):
first_parts = [
doc.address_line1,
doc.address_line2,
doc.city
]
second_parts = [
doc.state,
doc.pincode
]
first = " ".join([p for p in first_parts if p])
second = " ".join([p for p in second_parts if p])
if first and second:
return f"{first}, {second}"
return first or second or ""
def build_address_title(customer_name, address_data):
title_parts = [customer_name]
if address_data.get("address_line1"):
title_parts.append(address_data["address_line1"])
if address_data.get("type"):
title_parts.append(address_data["type"])
return " - ".join(title_parts)
def map_lead_client(client_data):
mappings = {
"lead_name": "customer_name",
"customer_type": "customer_type",
"territory": "territory",
"company_name": "company"
}
for lead_field, client_field in mappings.items():
if lead_field in client_data:
print(f"DEBUG: Mapping field {lead_field} to {client_field} with value {client_data[lead_field]}")
client_data[client_field] = client_data[lead_field]
client_data["customer_group"] = ""
print("####DEBUG: Mapped client data:", client_data)
return client_data
def map_lead_update(client_data):
mappings = {
"customer_name": "lead_name",
"customer_type": "customer_type",
"territory": "territory",
"company": "company_name"
}
for client_field, lead_field in mappings.items():
if client_field in client_data:
print(f"DEBUG: Mapping field {client_field} to {lead_field} with value {client_data[client_field]}")
client_data[lead_field] = client_data[client_field]
return client_data
def search_any_field(doctype, text):
meta = frappe.get_meta(doctype)
like = f"%{text}%"
conditions = []
# 1⃣ Explicitly include `name`
conditions.append("`name` LIKE %s")
# 2⃣ Include searchable DocFields
for field in meta.fields:
if field.fieldtype in ("Data", "Small Text", "Text", "Link"):
conditions.append(f"`{field.fieldname}` LIKE %s")
query = f"""
SELECT name
FROM `tab{doctype}`
WHERE {" OR ".join(conditions)}
LIMIT 20
"""
return frappe.db.sql(
query,
[like] * len(conditions),
as_dict=True
)