145 lines
5.9 KiB
Python
145 lines
5.9 KiB
Python
|
|
import json
|
|
|
|
def map_field_name(frontend_field):
|
|
field_mapping = {
|
|
"customer_name": "custom_customer_to_bill",
|
|
"address": "address_line1",
|
|
"appointment_scheduled_status": "custom_onsite_meeting_scheduled",
|
|
"estimate_sent_status": "custom_estimate_sent_status",
|
|
"payment_received_status": "custom_payment_received_status",
|
|
"job_status": "custom_job_status",
|
|
"installation_address": "custom_installation_address",
|
|
"warranty_id": "name",
|
|
"customer": "customer_name",
|
|
"fromCompany": "from_company",
|
|
"warranty_status": "warranty_amc_status"
|
|
}
|
|
return field_mapping.get(frontend_field, frontend_field)
|
|
|
|
def process_filters(filters):
|
|
processed_filters = {}
|
|
if filters:
|
|
filters = json.loads(filters) if isinstance(filters, str) else filters
|
|
for field_name, filter_obj in filters.items():
|
|
if isinstance(filter_obj, dict) and "value" in filter_obj:
|
|
if filter_obj["value"] is not None and filter_obj["value"] != "":
|
|
# Map frontend field names to backend field names
|
|
address_fields = ["address_line1", "address_line2", "city", "state", "pincode"] if field_name == "address" else []
|
|
|
|
mapped_field_name = map_field_name(field_name)
|
|
|
|
# Handle different match modes
|
|
match_mode = filter_obj.get("match_mode", "contains")
|
|
if isinstance(match_mode, str):
|
|
match_mode = match_mode.lower()
|
|
# Special handling for address to search accross multiple fields
|
|
if address_fields:
|
|
address_filters = []
|
|
for addr_field in address_fields:
|
|
if match_mode in ("contains", "contains"):
|
|
address_filters.append([addr_field, "like", f"%{filter_obj['value']}%"])
|
|
elif match_mode in ("startswith", "starts_with"):
|
|
address_filters.append([addr_field, "like", f"{filter_obj['value']}%"])
|
|
elif match_mode in ("endswith", "ends_with"):
|
|
address_filters.append([addr_field, "like", f"%{filter_obj['value']}"])
|
|
elif match_mode in ("equals", "equals"):
|
|
address_filters.append([addr_field, "=", filter_obj["value"]])
|
|
else:
|
|
address_filters.append([addr_field, "like", f"%{filter_obj['value']}%"])
|
|
processed_filters = address_filters
|
|
|
|
continue # Skip the rest of the loop for address field
|
|
|
|
if match_mode in ("contains", "contains"):
|
|
processed_filters[mapped_field_name] = ["like", f"%{filter_obj['value']}%"]
|
|
elif match_mode in ("startswith", "starts_with"):
|
|
processed_filters[mapped_field_name] = ["like", f"{filter_obj['value']}%"]
|
|
elif match_mode in ("endswith", "ends_with"):
|
|
processed_filters[mapped_field_name] = ["like", f"%{filter_obj['value']}"]
|
|
elif match_mode in ("equals", "equals"):
|
|
processed_filters[mapped_field_name] = filter_obj["value"]
|
|
else:
|
|
# Default to contains
|
|
processed_filters[mapped_field_name] = ["like", f"%{filter_obj['value']}%"]
|
|
print("DEBUG: Processed filters:", processed_filters)
|
|
return processed_filters
|
|
|
|
def process_sorting(sortings):
|
|
sortings = json.loads(sortings) if isinstance(sortings, str) else sortings
|
|
order_by = ""
|
|
print("DEBUG: Original sorting:", sortings)
|
|
if sortings and len(sortings) > 0:
|
|
for sorting in sortings:
|
|
mapped_field = map_field_name(sorting[0].strip())
|
|
sort_direction = sorting[1].strip().lower()
|
|
order_by += f"{mapped_field} {sort_direction}, "
|
|
order_by = order_by.rstrip(", ")
|
|
else:
|
|
order_by = "modified desc"
|
|
print("DEBUG: Processed sorting:", order_by)
|
|
return order_by
|
|
|
|
|
|
def process_query_conditions(filters, sortings, page, page_size):
|
|
processed_filters = process_filters(filters)
|
|
processed_sortings = process_sorting(sortings)
|
|
is_or_filters = isinstance(processed_filters, list)
|
|
page_int = int(page)
|
|
page_size_int = int(page_size)
|
|
return processed_filters, processed_sortings, is_or_filters, page_int, page_size_int
|
|
|
|
|
|
def build_datatable_dict(data, count, page, page_size):
|
|
return {
|
|
"pagination": {
|
|
"total": count,
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"total_pages": (count + page_size - 1) // page_size
|
|
},
|
|
"data": data
|
|
}
|
|
|
|
def get_count_or_filters(doctype, or_filters):
|
|
where_clauses = []
|
|
values = []
|
|
for field, operator, val in or_filters:
|
|
if operator.lower() == "like":
|
|
where_clauses.append(f"`{field}` LIKE %s")
|
|
else:
|
|
where_clauses.append(f"`{field}` {operator} %s")
|
|
values.append(val)
|
|
where_sql = " OR ".join(where_clauses)
|
|
sql = f"SELECT COUNT(*) FROM `tab{doctype}` WHERE {where_sql}"
|
|
return sql, values
|
|
|
|
def build_error_response(message, status_code=400):
|
|
return {
|
|
"status": "error",
|
|
"message": message,
|
|
"status_code": status_code
|
|
}
|
|
|
|
def build_success_response(data):
|
|
return {
|
|
"status": "success",
|
|
"data": data
|
|
}
|
|
|
|
def build_full_address(doc):
|
|
first_parts = [
|
|
doc.address_line1,
|
|
doc.address_line2,
|
|
doc.city
|
|
]
|
|
second_parts = [
|
|
doc.state,
|
|
doc.pincode
|
|
]
|
|
first = " ".join([p for p in first_parts if p])
|
|
second = " ".join([p for p in second_parts if p])
|
|
if first and second:
|
|
return f"{first}, {second}"
|
|
return first or second or ""
|
|
|