refactor: consider timeslots in get_employee_shift

This commit is contained in:
Rucha Mahabal 2022-02-20 22:10:52 +05:30
parent 3711119a66
commit 625a9f69f5

View File

@ -7,7 +7,7 @@ from datetime import datetime, timedelta
import frappe
from frappe import _
from frappe.model.document import Document
from frappe.query_builder import Criterion
from frappe.query_builder import Criterion, Column
from frappe.utils import cstr, get_link_to_form, getdate, now_datetime, nowdate
from erpnext.hr.doctype.employee.employee import get_holiday_list_for_employee
@ -171,102 +171,124 @@ def get_shift_type_timing(shift_types):
return shift_timing_map
def get_employee_shift(
employee, for_date=None, consider_default_shift=False, next_shift_direction=None
):
def get_shift_for_time(shifts, for_timestamp):
for entry in shifts:
shift_details = get_shift_details(entry.shift_type, for_date=for_timestamp.date())
if shift_details.actual_start <= for_timestamp <= shift_details.actual_end:
return shift_details
def get_shifts_for_date(employee, for_timestamp):
assignment = frappe.qb.DocType('Shift Assignment')
return (
frappe.qb.from_(assignment)
.select(assignment.name, assignment.shift_type)
.where(
(assignment.employee == employee)
& (assignment.docstatus == 1)
& (assignment.status == 'Active')
& (assignment.start_date <= getdate(for_timestamp.date()))
& (
Criterion.any([
assignment.end_date.isnull(),
(assignment.end_date.isnotnull() & (getdate(for_timestamp.date()) >= assignment.end_date))
])
)
)
).run(as_dict=True)
def get_shift_for_timestamp(employee, for_timestamp):
shifts = get_shifts_for_date(employee, for_timestamp)
if shifts:
return get_shift_for_time(shifts, for_timestamp)
return None
def get_employee_shift(employee, for_timestamp=None, consider_default_shift=False, next_shift_direction=None):
"""Returns a Shift Type for the given employee on the given date. (excluding the holidays)
:param employee: Employee for which shift is required.
:param for_date: Date on which shift are required
:param for_timestamp: DateTime on which shift is required
:param consider_default_shift: If set to true, default shift is taken when no shift assignment is found.
:param next_shift_direction: One of: None, 'forward', 'reverse'. Direction to look for next shift if shift not found on given date.
"""
if for_date is None:
for_date = nowdate()
default_shift = frappe.db.get_value("Employee", employee, "default_shift")
shift_type_name = None
shift_assignment_details = frappe.db.get_value(
"Shift Assignment",
{"employee": employee, "start_date": ("<=", for_date), "docstatus": "1", "status": "Active"},
["shift_type", "end_date"],
)
if for_timestamp is None:
for_timestamp = now_datetime()
if shift_assignment_details:
shift_type_name = shift_assignment_details[0]
shift_details = get_shift_for_timestamp(employee, for_timestamp)
# if end_date present means that shift is over after end_date else it is a ongoing shift.
if shift_assignment_details[1] and for_date >= shift_assignment_details[1]:
shift_type_name = None
# if shift assignment is not found, consider default shift
default_shift = frappe.db.get_value('Employee', employee, 'default_shift')
if not shift_details and consider_default_shift:
shift_details = get_shift_details(default_shift, for_timestamp.date())
if not shift_type_name and consider_default_shift:
shift_type_name = default_shift
if shift_type_name:
holiday_list_name = frappe.db.get_value("Shift Type", shift_type_name, "holiday_list")
if not holiday_list_name:
holiday_list_name = get_holiday_list_for_employee(employee, False)
if holiday_list_name and is_holiday(holiday_list_name, for_date):
shift_type_name = None
# if its a holiday, reset
if shift_details and is_holiday_date(employee, shift_details):
shift_details = None
if not shift_type_name and next_shift_direction:
MAX_DAYS = 366
if consider_default_shift and default_shift:
direction = -1 if next_shift_direction == "reverse" else +1
for i in range(MAX_DAYS):
date = for_date + timedelta(days=direction * (i + 1))
shift_details = get_employee_shift(employee, date, consider_default_shift, None)
# if no shift is found, find next or prev shift based on direction
if not shift_details and next_shift_direction:
shift_details = get_prev_or_next_shift(employee, for_timestamp, consider_default_shift, default_shift, next_shift_direction)
return shift_details
def get_prev_or_next_shift(employee, for_timestamp, consider_default_shift, default_shift, next_shift_direction):
MAX_DAYS = 366
shift_details = None
if consider_default_shift and default_shift:
direction = -1 if next_shift_direction == 'reverse' else 1
for i in range(MAX_DAYS):
date = for_timestamp + timedelta(days=direction*(i+1))
shift_details = get_employee_shift(employee, date, consider_default_shift, None)
if shift_details:
break
else:
direction = '<' if next_shift_direction == 'reverse' else '>'
sort_order = 'desc' if next_shift_direction == 'reverse' else 'asc'
dates = frappe.db.get_all('Shift Assignment',
['start_date', 'end_date'],
{'employee':employee, 'start_date': (direction, for_timestamp.date()), 'docstatus': '1', "status": "Active"},
as_list=True,
limit=MAX_DAYS, order_by='start_date ' + sort_order)
if dates:
for date in dates:
if date[1] and date[1] < for_timestamp.date():
continue
shift_details = get_employee_shift(employee, datetime.combine(date, for_timestamp.time()), consider_default_shift, None)
if shift_details:
shift_type_name = shift_details.shift_type.name
for_date = date
break
else:
direction = "<" if next_shift_direction == "reverse" else ">"
sort_order = "desc" if next_shift_direction == "reverse" else "asc"
dates = frappe.db.get_all(
"Shift Assignment",
["start_date", "end_date"],
{
"employee": employee,
"start_date": (direction, for_date),
"docstatus": "1",
"status": "Active",
},
as_list=True,
limit=MAX_DAYS,
order_by="start_date " + sort_order,
)
if dates:
for date in dates:
if date[1] and date[1] < for_date:
continue
shift_details = get_employee_shift(employee, date[0], consider_default_shift, None)
if shift_details:
shift_type_name = shift_details.shift_type.name
for_date = date[0]
break
return shift_details
return get_shift_details(shift_type_name, for_date)
def is_holiday_date(employee, shift_details):
holiday_list_name = frappe.db.get_value('Shift Type', shift_details.shift_type.name, 'holiday_list')
if not holiday_list_name:
holiday_list_name = get_holiday_list_for_employee(employee, False)
return holiday_list_name and is_holiday(holiday_list_name, shift_details.start_datetime.date())
def get_employee_shift_timings(employee, for_timestamp=None, consider_default_shift=False):
"""Returns previous shift, current/upcoming shift, next_shift for the given timestamp and employee"""
if for_timestamp is None:
for_timestamp = now_datetime()
# write and verify a test case for midnight shift.
prev_shift = curr_shift = next_shift = None
curr_shift = get_employee_shift(employee, for_timestamp.date(), consider_default_shift, "forward")
curr_shift = get_employee_shift(employee, for_timestamp, consider_default_shift, 'forward')
if curr_shift:
next_shift = get_employee_shift(
employee,
curr_shift.start_datetime.date() + timedelta(days=1),
consider_default_shift,
"forward",
)
prev_shift = get_employee_shift(
employee, for_timestamp.date() + timedelta(days=-1), consider_default_shift, "reverse"
)
next_shift = get_employee_shift(employee, curr_shift.start_datetime + timedelta(days=1), consider_default_shift, 'forward')
prev_shift = get_employee_shift(employee, for_timestamp + timedelta(days=-1), consider_default_shift, 'reverse')
if curr_shift:
# adjust actual start and end times if they are overlapping with grace period (before start and after end)
if prev_shift:
curr_shift.actual_start = (
prev_shift.end_datetime
@ -292,6 +314,38 @@ def get_employee_shift_timings(employee, for_timestamp=None, consider_default_sh
return prev_shift, curr_shift, next_shift
def get_actual_start_end_datetime_of_shift(employee, for_datetime, consider_default_shift=False):
"""Takes a datetime and returns the 'actual' start datetime and end datetime of the shift in which the timestamp belongs.
Here 'actual' means - taking in to account the "begin_check_in_before_shift_start_time" and "allow_check_out_after_shift_end_time".
None is returned if the timestamp is outside any actual shift timings.
Shift Details is also returned(current/upcoming i.e. if timestamp not in any actual shift then details of next shift returned)
"""
actual_shift_start = actual_shift_end = shift_details = None
shift_timings_as_per_timestamp = get_employee_shift_timings(employee, for_datetime, consider_default_shift)
timestamp_list = []
for shift in shift_timings_as_per_timestamp:
if shift:
timestamp_list.extend([shift.actual_start, shift.actual_end])
else:
timestamp_list.extend([None, None])
timestamp_index = None
for index, timestamp in enumerate(timestamp_list):
if timestamp and for_datetime <= timestamp:
timestamp_index = index
break
if timestamp_index and timestamp_index%2 == 1:
shift_details = shift_timings_as_per_timestamp[int((timestamp_index-1)/2)]
actual_shift_start = shift_details.actual_start
actual_shift_end = shift_details.actual_end
elif timestamp_index:
shift_details = shift_timings_as_per_timestamp[int(timestamp_index/2)]
return actual_shift_start, actual_shift_end, shift_details
def get_shift_details(shift_type_name, for_date=None):
"""Returns Shift Details which contain some additional information as described below.
'shift_details' contains the following keys:
@ -319,43 +373,10 @@ def get_shift_details(shift_type_name, for_date=None):
)
actual_end = end_datetime + timedelta(minutes=shift_type.allow_check_out_after_shift_end_time)
return frappe._dict(
{
"shift_type": shift_type,
"start_datetime": start_datetime,
"end_datetime": end_datetime,
"actual_start": actual_start,
"actual_end": actual_end,
}
)
def get_actual_start_end_datetime_of_shift(employee, for_datetime, consider_default_shift=False):
"""Takes a datetime and returns the 'actual' start datetime and end datetime of the shift in which the timestamp belongs.
Here 'actual' means - taking in to account the "begin_check_in_before_shift_start_time" and "allow_check_out_after_shift_end_time".
None is returned if the timestamp is outside any actual shift timings.
Shift Details is also returned(current/upcoming i.e. if timestamp not in any actual shift then details of next shift returned)
"""
actual_shift_start = actual_shift_end = shift_details = None
shift_timings_as_per_timestamp = get_employee_shift_timings(
employee, for_datetime, consider_default_shift
)
timestamp_list = []
for shift in shift_timings_as_per_timestamp:
if shift:
timestamp_list.extend([shift.actual_start, shift.actual_end])
else:
timestamp_list.extend([None, None])
timestamp_index = None
for index, timestamp in enumerate(timestamp_list):
if timestamp and for_datetime <= timestamp:
timestamp_index = index
break
if timestamp_index and timestamp_index % 2 == 1:
shift_details = shift_timings_as_per_timestamp[int((timestamp_index - 1) / 2)]
actual_shift_start = shift_details.actual_start
actual_shift_end = shift_details.actual_end
elif timestamp_index:
shift_details = shift_timings_as_per_timestamp[int(timestamp_index / 2)]
return actual_shift_start, actual_shift_end, shift_details
return frappe._dict({
'shift_type': shift_type,
'start_datetime': start_datetime,
'end_datetime': end_datetime,
'actual_start': actual_start,
'actual_end': actual_end
})