From 3711119a665fa924c48ec98de2984e5c0a411823 Mon Sep 17 00:00:00 2001 From: Rucha Mahabal Date: Thu, 17 Feb 2022 18:32:48 +0530 Subject: [PATCH] refactor: overlapping shifts validation - convert raw query to frappe.qb - check for overlapping timings if dates overlap - translation friendly error messages with link to overlapping doc --- .../shift_assignment/shift_assignment.py | 116 ++++++++++-------- 1 file changed, 66 insertions(+), 50 deletions(-) diff --git a/erpnext/hr/doctype/shift_assignment/shift_assignment.py b/erpnext/hr/doctype/shift_assignment/shift_assignment.py index 5a1248698c..f51a860c92 100644 --- a/erpnext/hr/doctype/shift_assignment/shift_assignment.py +++ b/erpnext/hr/doctype/shift_assignment/shift_assignment.py @@ -7,79 +7,95 @@ from datetime import datetime, timedelta import frappe from frappe import _ from frappe.model.document import Document -from frappe.utils import cstr, getdate, now_datetime, nowdate +from frappe.query_builder import Criterion +from frappe.utils import cstr, get_link_to_form, getdate, now_datetime, nowdate from erpnext.hr.doctype.employee.employee import get_holiday_list_for_employee from erpnext.hr.doctype.holiday_list.holiday_list import is_holiday from erpnext.hr.utils import validate_active_employee +class OverlappingShiftError(frappe.ValidationError): + pass class ShiftAssignment(Document): def validate(self): validate_active_employee(self.employee) - self.validate_overlapping_dates() + self.validate_overlapping_shifts() if self.end_date: self.validate_from_to_dates("start_date", "end_date") - def validate_overlapping_dates(self): + def validate_overlapping_shifts(self): + overlapping_dates = self.get_overlapping_dates() + if len(overlapping_dates): + # if dates are overlapping, check if timings are overlapping, else allow + overlapping_timings = self.has_overlapping_timings(overlapping_dates[0].shift_type) + if overlapping_timings: + self.throw_overlap_error(overlapping_dates[0]) + + def get_overlapping_dates(self): if not self.name: self.name = "New Shift Assignment" - condition = """and ( - end_date is null - or - %(start_date)s between start_date and end_date - """ - - if self.end_date: - condition += """ or - %(end_date)s between start_date and end_date - or - start_date between %(start_date)s and %(end_date)s - ) """ - else: - condition += """ ) """ - - assigned_shifts = frappe.db.sql( - """ - select name, shift_type, start_date ,end_date, docstatus, status - from `tabShift Assignment` - where - employee=%(employee)s and docstatus = 1 - and name != %(name)s - and status = "Active" - {0} - """.format( - condition - ), - { - "employee": self.employee, - "shift_type": self.shift_type, - "start_date": self.start_date, - "end_date": self.end_date, - "name": self.name, - }, - as_dict=1, + shift = frappe.qb.DocType("Shift Assignment") + query = ( + frappe.qb.from_(shift) + .select(shift.name, shift.shift_type, shift.start_date, shift.end_date, shift.docstatus, shift.status) + .where( + (shift.employee == self.employee) + & (shift.docstatus == 1) + & (shift.name != self.name) + & (shift.status == "Active") + ) ) - if len(assigned_shifts): - self.throw_overlap_error(assigned_shifts[0]) + if self.end_date: + query = query.where( + Criterion.any([ + Criterion.any([ + shift.end_date.isnull(), + ((self.start_date >= shift.start_date) & (self.start_date <= shift.end_date)) + ]), + Criterion.any([ + ((self.end_date >= shift.start_date) & (self.end_date <= shift.end_date)), + shift.start_date.between(self.start_date, self.end_date) + ]) + ]) + ) + else: + query = query.where( + shift.end_date.isnull() + | ((self.start_date >= shift.start_date) & (self.start_date <= shift.end_date)) + ) + + return query.run(as_dict=True) + + def has_overlapping_timings(self, overlapping_shift): + curr_shift = frappe.db.get_value("Shift Type", self.shift_type, ["start_time", "end_time"], as_dict=True) + overlapping_shift = frappe.db.get_value("Shift Type", overlapping_shift, ["start_time", "end_time"], as_dict=True) + + if ((curr_shift.start_time > overlapping_shift.start_time and curr_shift.start_time < overlapping_shift.end_time) or + (curr_shift.end_time > overlapping_shift.start_time and curr_shift.end_time < overlapping_shift.end_time) or + (curr_shift.start_time <= overlapping_shift.start_time and curr_shift.end_time >= overlapping_shift.end_time)): + return True + return False def throw_overlap_error(self, shift_details): shift_details = frappe._dict(shift_details) + msg = None if shift_details.docstatus == 1 and shift_details.status == "Active": - msg = _("Employee {0} already has Active Shift {1}: {2}").format( - frappe.bold(self.employee), frappe.bold(self.shift_type), frappe.bold(shift_details.name) - ) - if shift_details.start_date: - msg += _(" from {0}").format(getdate(self.start_date).strftime("%d-%m-%Y")) - title = "Ongoing Shift" - if shift_details.end_date: - msg += _(" to {0}").format(getdate(self.end_date).strftime("%d-%m-%Y")) - title = "Active Shift" + if shift_details.start_date and shift_details.end_date: + msg = _("Employee {0} already has an active Shift {1}: {2} from {3} to {4}").format(frappe.bold(self.employee), frappe.bold(self.shift_type), + get_link_to_form("Shift Assignment", shift_details.name), + getdate(self.start_date).strftime("%d-%m-%Y"), + getdate(self.end_date).strftime("%d-%m-%Y")) + else: + msg = _("Employee {0} already has an active Shift {1}: {2} from {3}").format(frappe.bold(self.employee), frappe.bold(self.shift_type), + get_link_to_form("Shift Assignment", shift_details.name), + getdate(self.start_date).strftime("%d-%m-%Y")) + if msg: - frappe.throw(msg, title=title) + frappe.throw(msg, title=_("Overlapping Shifts"), exc=OverlappingShiftError) @frappe.whitelist()