custom_ui/custom_ui/db_utils.py

197 lines
8.6 KiB
Python

import json
def map_field_name(frontend_field):
field_mapping = {
"customer_name": "custom_customer_to_bill",
"address": "address_line1",
"appointment_scheduled_status": "custom_onsite_meeting_scheduled",
"estimate_sent_status": "custom_estimate_sent_status",
"payment_received_status": "custom_payment_received_status",
"job_status": "custom_job_status",
"installation_address": "custom_installation_address",
"warranty_id": "name",
"customer": "customer_name",
"fromCompany": "from_company",
"warranty_status": "warranty_amc_status"
}
return field_mapping.get(frontend_field, frontend_field)
def process_filters(filters):
processed_filters = {}
if filters:
filters = json.loads(filters) if isinstance(filters, str) else filters
for field_name, filter_obj in filters.items():
if isinstance(filter_obj, dict) and "value" in filter_obj:
if filter_obj["value"] is not None and filter_obj["value"] != "":
# Map frontend field names to backend field names
address_fields = ["address_line1", "address_line2", "city", "state", "pincode"] if field_name == "address" else []
mapped_field_name = map_field_name(field_name)
# Handle different match modes
match_mode = filter_obj.get("match_mode", "contains")
if isinstance(match_mode, str):
match_mode = match_mode.lower()
# Special handling for address to search accross multiple fields
if address_fields:
address_filters = []
for addr_field in address_fields:
if match_mode in ("contains", "contains"):
address_filters.append([addr_field, "like", f"%{filter_obj['value']}%"])
elif match_mode in ("startswith", "starts_with"):
address_filters.append([addr_field, "like", f"{filter_obj['value']}%"])
elif match_mode in ("endswith", "ends_with"):
address_filters.append([addr_field, "like", f"%{filter_obj['value']}"])
elif match_mode in ("equals", "equals"):
address_filters.append([addr_field, "=", filter_obj["value"]])
else:
address_filters.append([addr_field, "like", f"%{filter_obj['value']}%"])
processed_filters = address_filters
continue # Skip the rest of the loop for address field
customer_name_fields = ["custom_customer_to_bill", "lead_name"] if field_name == "customer_name" else []
if customer_name_fields:
customer_name_filters = []
for cust_field in customer_name_fields:
if match_mode in ("contains", "contains"):
customer_name_filters.append([cust_field, "like", f"%{filter_obj['value']}%"])
elif match_mode in ("startswith", "starts_with"):
customer_name_filters.append([cust_field, "like", f"{filter_obj['value']}%"])
elif match_mode in ("endswith", "ends_with"):
customer_name_filters.append([cust_field, "like", f"%{filter_obj['value']}"])
elif match_mode in ("equals", "equals"):
customer_name_filters.append([cust_field, "=", filter_obj["value"]])
else:
customer_name_filters.append([cust_field, "like", f"%{filter_obj['value']}%"])
processed_filters = customer_name_filters
continue # Skip the rest of the loop for customer_name field
if match_mode in ("contains", "contains"):
processed_filters[mapped_field_name] = ["like", f"%{filter_obj['value']}%"]
elif match_mode in ("startswith", "starts_with"):
processed_filters[mapped_field_name] = ["like", f"{filter_obj['value']}%"]
elif match_mode in ("endswith", "ends_with"):
processed_filters[mapped_field_name] = ["like", f"%{filter_obj['value']}"]
elif match_mode in ("equals", "equals"):
processed_filters[mapped_field_name] = filter_obj["value"]
else:
# Default to contains
processed_filters[mapped_field_name] = ["like", f"%{filter_obj['value']}%"]
print("DEBUG: Processed filters:", processed_filters)
return processed_filters
def process_sorting(sortings):
sortings = json.loads(sortings) if isinstance(sortings, str) else sortings
order_by = ""
print("DEBUG: Original sorting:", sortings)
if sortings and len(sortings) > 0:
for sorting in sortings:
mapped_field = map_field_name(sorting[0].strip())
sort_direction = sorting[1].strip().lower()
order_by += f"{mapped_field} {sort_direction}, "
order_by = order_by.rstrip(", ")
else:
order_by = "modified desc"
print("DEBUG: Processed sorting:", order_by)
return order_by
def process_query_conditions(filters, sortings, page, page_size):
processed_filters = process_filters(filters)
processed_sortings = process_sorting(sortings)
is_or_filters = isinstance(processed_filters, list)
page_int = int(page)
page_size_int = int(page_size)
return processed_filters, processed_sortings, is_or_filters, page_int, page_size_int
def build_datatable_dict(data, count, page, page_size):
return {
"pagination": {
"total": count,
"page": page,
"page_size": page_size,
"total_pages": (count + page_size - 1) // page_size
},
"data": data
}
def get_count_or_filters(doctype, or_filters):
where_clauses = []
values = []
for field, operator, val in or_filters:
if operator.lower() == "like":
where_clauses.append(f"`{field}` LIKE %s")
else:
where_clauses.append(f"`{field}` {operator} %s")
values.append(val)
where_sql = " OR ".join(where_clauses)
sql = f"SELECT COUNT(*) FROM `tab{doctype}` WHERE {where_sql}"
return sql, values
def build_error_response(message, status_code=400):
return {
"status": "error",
"message": message,
"status_code": status_code
}
def build_success_response(data):
return {
"status": "success",
"data": data
}
def build_full_address(doc):
first_parts = [
doc.address_line1,
doc.address_line2,
doc.city
]
second_parts = [
doc.state,
doc.pincode
]
first = " ".join([p for p in first_parts if p])
second = " ".join([p for p in second_parts if p])
if first and second:
return f"{first}, {second}"
return first or second or ""
def build_address_title(customer_name, address_data):
title_parts = [customer_name]
if address_data.get("address_line1"):
title_parts.append(address_data["address_line1"])
if address_data.get("type"):
title_parts.append(address_data["type"])
return " - ".join(title_parts)
def map_lead_client(client_data):
mappings = {
"lead_name": "customer_name",
"customer_type": "customer_type",
"territory": "territory",
"company_name": "company"
}
for lead_field, client_field in mappings.items():
if lead_field in client_data:
print(f"DEBUG: Mapping field {lead_field} to {client_field} with value {client_data[lead_field]}")
client_data[client_field] = client_data[lead_field]
client_data["customer_group"] = ""
print("####DEBUG: Mapped client data:", client_data)
return client_data
def map_lead_update(client_data):
mappings = {
"customer_name": "lead_name",
"customer_type": "customer_type",
"territory": "territory",
"company": "company_name"
}
for client_field, lead_field in mappings.items():
if client_field in client_data:
print(f"DEBUG: Mapping field {client_field} to {lead_field} with value {client_data[client_field]}")
client_data[lead_field] = client_data[client_field]
return client_data