style: format code with black

This commit is contained in:
Rucha Mahabal 2022-03-30 15:23:13 +05:30
parent acb27430ac
commit baec607ff5
6 changed files with 540 additions and 334 deletions

View File

@ -14,6 +14,7 @@ from erpnext.hr.utils import get_holiday_dates_for_employee, validate_active_emp
class DuplicateAttendanceError(frappe.ValidationError): class DuplicateAttendanceError(frappe.ValidationError):
pass pass
class Attendance(Document): class Attendance(Document):
def validate(self): def validate(self):
from erpnext.controllers.status_updater import validate_status from erpnext.controllers.status_updater import validate_status
@ -39,12 +40,20 @@ class Attendance(Document):
frappe.throw(_("Attendance date can not be less than employee's joining date")) frappe.throw(_("Attendance date can not be less than employee's joining date"))
def validate_duplicate_record(self): def validate_duplicate_record(self):
duplicate = get_duplicate_attendance_record(self.employee, self.attendance_date, self.shift, self.name) duplicate = get_duplicate_attendance_record(
self.employee, self.attendance_date, self.shift, self.name
)
if duplicate: if duplicate:
frappe.throw(_("Attendance for employee {0} is already marked for the date {1}: {2}").format( frappe.throw(
frappe.bold(self.employee), frappe.bold(self.attendance_date), get_link_to_form("Attendance", duplicate[0].name)), _("Attendance for employee {0} is already marked for the date {1}: {2}").format(
title=_("Duplicate Attendance"), exc=DuplicateAttendanceError) frappe.bold(self.employee),
frappe.bold(self.attendance_date),
get_link_to_form("Attendance", duplicate[0].name),
),
title=_("Duplicate Attendance"),
exc=DuplicateAttendanceError,
)
def validate_employee_status(self): def validate_employee_status(self):
if frappe.db.get_value("Employee", self.employee, "status") == "Inactive": if frappe.db.get_value("Employee", self.employee, "status") == "Inactive":
@ -101,26 +110,29 @@ def get_duplicate_attendance_record(employee, attendance_date, shift, name=None)
attendance = frappe.qb.DocType("Attendance") attendance = frappe.qb.DocType("Attendance")
query = ( query = (
frappe.qb.from_(attendance) frappe.qb.from_(attendance)
.select(attendance.name) .select(attendance.name)
.where( .where((attendance.employee == employee) & (attendance.docstatus < 2))
(attendance.employee == employee)
& (attendance.docstatus < 2)
)
) )
if shift: if shift:
query = query.where( query = query.where(
Criterion.any([ Criterion.any(
Criterion.all([ [
((attendance.shift.isnull()) | (attendance.shift == "")), Criterion.all(
(attendance.attendance_date == attendance_date) [
]), ((attendance.shift.isnull()) | (attendance.shift == "")),
Criterion.all([ (attendance.attendance_date == attendance_date),
((attendance.shift.isnotnull()) | (attendance.shift != "")), ]
(attendance.attendance_date == attendance_date), ),
(attendance.shift == shift) Criterion.all(
]) [
]) ((attendance.shift.isnotnull()) | (attendance.shift != "")),
(attendance.attendance_date == attendance_date),
(attendance.shift == shift),
]
),
]
)
) )
else: else:
query = query.where((attendance.attendance_date == attendance_date)) query = query.where((attendance.attendance_date == attendance_date))
@ -167,18 +179,32 @@ def add_attendance(events, start, end, conditions=None):
if e not in events: if e not in events:
events.append(e) events.append(e)
def mark_attendance(employee, attendance_date, status, shift=None, leave_type=None, ignore_validate=False):
def mark_attendance(
employee,
attendance_date,
status,
shift=None,
leave_type=None,
ignore_validate=False,
late_entry=False,
early_exit=False,
):
if not get_duplicate_attendance_record(employee, attendance_date, shift): if not get_duplicate_attendance_record(employee, attendance_date, shift):
company = frappe.db.get_value('Employee', employee, 'company') company = frappe.db.get_value("Employee", employee, "company")
attendance = frappe.get_doc({ attendance = frappe.get_doc(
'doctype': 'Attendance', {
'employee': employee, "doctype": "Attendance",
'attendance_date': attendance_date, "employee": employee,
'status': status, "attendance_date": attendance_date,
'company': company, "status": status,
'shift': shift, "company": company,
'leave_type': leave_type "shift": shift,
}) "leave_type": leave_type,
"late_entry": late_entry,
"early_exit": early_exit,
}
)
attendance.flags.ignore_validate = ignore_validate attendance.flags.ignore_validate = ignore_validate
attendance.insert() attendance.insert()
attendance.submit() attendance.submit()

View File

@ -31,11 +31,21 @@ class EmployeeCheckin(Document):
) )
def fetch_shift(self): def fetch_shift(self):
shift_actual_timings = get_actual_start_end_datetime_of_shift(self.employee, get_datetime(self.time), True) shift_actual_timings = get_actual_start_end_datetime_of_shift(
self.employee, get_datetime(self.time), True
)
if shift_actual_timings: if shift_actual_timings:
if shift_actual_timings.shift_type.determine_check_in_and_check_out == 'Strictly based on Log Type in Employee Checkin' \ if (
and not self.log_type and not self.skip_auto_attendance: shift_actual_timings.shift_type.determine_check_in_and_check_out
frappe.throw(_('Log Type is required for check-ins falling in the shift: {0}.').format(shift_actual_timings.shift_type.name)) == "Strictly based on Log Type in Employee Checkin"
and not self.log_type
and not self.skip_auto_attendance
):
frappe.throw(
_("Log Type is required for check-ins falling in the shift: {0}.").format(
shift_actual_timings.shift_type.name
)
)
if not self.attendance: if not self.attendance:
self.shift = shift_actual_timings.shift_type.name self.shift = shift_actual_timings.shift_type.name
self.shift_actual_start = shift_actual_timings.actual_start self.shift_actual_start = shift_actual_timings.actual_start
@ -125,8 +135,8 @@ def mark_attendance_and_link_log(
("1", log_names), ("1", log_names),
) )
return None return None
elif attendance_status in ('Present', 'Absent', 'Half Day'): elif attendance_status in ("Present", "Absent", "Half Day"):
employee_doc = frappe.get_doc('Employee', employee) employee_doc = frappe.get_doc("Employee", employee)
if not get_duplicate_attendance_record(employee, attendance_date, shift): if not get_duplicate_attendance_record(employee, attendance_date, shift):
doc_dict = { doc_dict = {
"doctype": "Attendance", "doctype": "Attendance",

View File

@ -19,6 +19,7 @@ from erpnext.hr.utils import validate_active_employee
class OverlappingShiftError(frappe.ValidationError): class OverlappingShiftError(frappe.ValidationError):
pass pass
class ShiftAssignment(Document): class ShiftAssignment(Document):
def validate(self): def validate(self):
validate_active_employee(self.employee) validate_active_employee(self.employee)
@ -42,27 +43,35 @@ class ShiftAssignment(Document):
shift = frappe.qb.DocType("Shift Assignment") shift = frappe.qb.DocType("Shift Assignment")
query = ( query = (
frappe.qb.from_(shift) frappe.qb.from_(shift)
.select(shift.name, shift.shift_type, shift.start_date, shift.end_date, shift.docstatus, shift.status) .select(
.where( shift.name, shift.shift_type, shift.start_date, shift.end_date, shift.docstatus, shift.status
(shift.employee == self.employee) )
& (shift.docstatus == 1) .where(
& (shift.name != self.name) (shift.employee == self.employee)
& (shift.status == "Active") & (shift.docstatus == 1)
) & (shift.name != self.name)
& (shift.status == "Active")
)
) )
if self.end_date: if self.end_date:
query = query.where( query = query.where(
Criterion.any([ Criterion.any(
Criterion.any([ [
shift.end_date.isnull(), Criterion.any(
((self.start_date >= shift.start_date) & (self.start_date <= shift.end_date)) [
]), shift.end_date.isnull(),
Criterion.any([ ((self.start_date >= shift.start_date) & (self.start_date <= shift.end_date)),
((self.end_date >= shift.start_date) & (self.end_date <= shift.end_date)), ]
shift.start_date.between(self.start_date, self.end_date) ),
]) Criterion.any(
]) [
((self.end_date >= shift.start_date) & (self.end_date <= shift.end_date)),
shift.start_date.between(self.start_date, self.end_date),
]
),
]
)
) )
else: else:
query = query.where( query = query.where(
@ -73,12 +82,27 @@ class ShiftAssignment(Document):
return query.run(as_dict=True) return query.run(as_dict=True)
def has_overlapping_timings(self, overlapping_shift): def has_overlapping_timings(self, overlapping_shift):
curr_shift = frappe.db.get_value("Shift Type", self.shift_type, ["start_time", "end_time"], as_dict=True) curr_shift = frappe.db.get_value(
overlapping_shift = frappe.db.get_value("Shift Type", overlapping_shift, ["start_time", "end_time"], as_dict=True) "Shift Type", self.shift_type, ["start_time", "end_time"], as_dict=True
)
overlapping_shift = frappe.db.get_value(
"Shift Type", overlapping_shift, ["start_time", "end_time"], as_dict=True
)
if ((curr_shift.start_time > overlapping_shift.start_time and curr_shift.start_time < overlapping_shift.end_time) or if (
(curr_shift.end_time > overlapping_shift.start_time and curr_shift.end_time < overlapping_shift.end_time) or (
(curr_shift.start_time <= overlapping_shift.start_time and curr_shift.end_time >= overlapping_shift.end_time)): curr_shift.start_time > overlapping_shift.start_time
and curr_shift.start_time < overlapping_shift.end_time
)
or (
curr_shift.end_time > overlapping_shift.start_time
and curr_shift.end_time < overlapping_shift.end_time
)
or (
curr_shift.start_time <= overlapping_shift.start_time
and curr_shift.end_time >= overlapping_shift.end_time
)
):
return True return True
return False return False
@ -87,14 +111,20 @@ class ShiftAssignment(Document):
msg = None msg = None
if shift_details.docstatus == 1 and shift_details.status == "Active": if shift_details.docstatus == 1 and shift_details.status == "Active":
if shift_details.start_date and shift_details.end_date: if shift_details.start_date and shift_details.end_date:
msg = _("Employee {0} already has an active Shift {1}: {2} from {3} to {4}").format(frappe.bold(self.employee), frappe.bold(self.shift_type), msg = _("Employee {0} already has an active Shift {1}: {2} from {3} to {4}").format(
frappe.bold(self.employee),
frappe.bold(self.shift_type),
get_link_to_form("Shift Assignment", shift_details.name), get_link_to_form("Shift Assignment", shift_details.name),
getdate(self.start_date).strftime("%d-%m-%Y"), getdate(self.start_date).strftime("%d-%m-%Y"),
getdate(self.end_date).strftime("%d-%m-%Y")) getdate(self.end_date).strftime("%d-%m-%Y"),
)
else: else:
msg = _("Employee {0} already has an active Shift {1}: {2} from {3}").format(frappe.bold(self.employee), frappe.bold(self.shift_type), msg = _("Employee {0} already has an active Shift {1}: {2} from {3}").format(
frappe.bold(self.employee),
frappe.bold(self.shift_type),
get_link_to_form("Shift Assignment", shift_details.name), get_link_to_form("Shift Assignment", shift_details.name),
getdate(self.start_date).strftime("%d-%m-%Y")) getdate(self.start_date).strftime("%d-%m-%Y"),
)
if msg: if msg:
frappe.throw(msg, title=_("Overlapping Shifts"), exc=OverlappingShiftError) frappe.throw(msg, title=_("Overlapping Shifts"), exc=OverlappingShiftError)
@ -180,10 +210,14 @@ def get_shift_for_time(shifts: List[Dict], for_timestamp: datetime) -> Dict:
for entry in shifts: for entry in shifts:
shift_details = get_shift_details(entry.shift_type, for_timestamp=for_timestamp) shift_details = get_shift_details(entry.shift_type, for_timestamp=for_timestamp)
if get_datetime(shift_details.actual_start) <= get_datetime(for_timestamp) <= get_datetime(shift_details.actual_end): if (
get_datetime(shift_details.actual_start)
<= get_datetime(for_timestamp)
<= get_datetime(shift_details.actual_end)
):
valid_shifts.append(shift_details) valid_shifts.append(shift_details)
valid_shifts.sort(key=lambda x: x['actual_start']) valid_shifts.sort(key=lambda x: x["actual_start"])
if len(valid_shifts) > 1: if len(valid_shifts) > 1:
for i in range(len(valid_shifts) - 1): for i in range(len(valid_shifts) - 1):
@ -193,8 +227,16 @@ def get_shift_for_time(shifts: List[Dict], for_timestamp: datetime) -> Dict:
next_shift = valid_shifts[i + 1] next_shift = valid_shifts[i + 1]
if curr_shift and next_shift: if curr_shift and next_shift:
next_shift.actual_start = curr_shift.end_datetime if next_shift.actual_start < curr_shift.end_datetime else next_shift.actual_start next_shift.actual_start = (
curr_shift.actual_end = next_shift.actual_start if curr_shift.actual_end > next_shift.actual_start else curr_shift.actual_end curr_shift.end_datetime
if next_shift.actual_start < curr_shift.end_datetime
else next_shift.actual_start
)
curr_shift.actual_end = (
next_shift.actual_start
if curr_shift.actual_end > next_shift.actual_start
else curr_shift.actual_end
)
valid_shifts[i] = curr_shift valid_shifts[i] = curr_shift
valid_shifts[i + 1] = next_shift valid_shifts[i + 1] = next_shift
@ -206,23 +248,25 @@ def get_shift_for_time(shifts: List[Dict], for_timestamp: datetime) -> Dict:
def get_shifts_for_date(employee: str, for_timestamp: datetime) -> List[Dict[str, str]]: def get_shifts_for_date(employee: str, for_timestamp: datetime) -> List[Dict[str, str]]:
"""Returns list of shifts with details for given date""" """Returns list of shifts with details for given date"""
assignment = frappe.qb.DocType('Shift Assignment') assignment = frappe.qb.DocType("Shift Assignment")
return ( return (
frappe.qb.from_(assignment) frappe.qb.from_(assignment)
.select(assignment.name, assignment.shift_type) .select(assignment.name, assignment.shift_type)
.where( .where(
(assignment.employee == employee) (assignment.employee == employee)
& (assignment.docstatus == 1) & (assignment.docstatus == 1)
& (assignment.status == 'Active') & (assignment.status == "Active")
& (assignment.start_date <= getdate(for_timestamp.date())) & (assignment.start_date <= getdate(for_timestamp.date()))
& ( & (
Criterion.any([ Criterion.any(
[
assignment.end_date.isnull(), assignment.end_date.isnull(),
(assignment.end_date.isnotnull() & (getdate(for_timestamp.date()) >= assignment.end_date)) (assignment.end_date.isnotnull() & (getdate(for_timestamp.date()) >= assignment.end_date)),
]) ]
) )
) )
)
).run(as_dict=True) ).run(as_dict=True)
@ -233,7 +277,12 @@ def get_shift_for_timestamp(employee: str, for_timestamp: datetime) -> Dict:
return {} return {}
def get_employee_shift(employee: str, for_timestamp: datetime = None, consider_default_shift: bool = False, next_shift_direction: str = None) -> Dict: def get_employee_shift(
employee: str,
for_timestamp: datetime = None,
consider_default_shift: bool = False,
next_shift_direction: str = None,
) -> Dict:
"""Returns a Shift Type for the given employee on the given date. (excluding the holidays) """Returns a Shift Type for the given employee on the given date. (excluding the holidays)
:param employee: Employee for which shift is required. :param employee: Employee for which shift is required.
@ -247,7 +296,7 @@ def get_employee_shift(employee: str, for_timestamp: datetime = None, consider_d
shift_details = get_shift_for_timestamp(employee, for_timestamp) shift_details = get_shift_for_timestamp(employee, for_timestamp)
# if shift assignment is not found, consider default shift # if shift assignment is not found, consider default shift
default_shift = frappe.db.get_value('Employee', employee, 'default_shift') default_shift = frappe.db.get_value("Employee", employee, "default_shift")
if not shift_details and consider_default_shift: if not shift_details and consider_default_shift:
shift_details = get_shift_details(default_shift, for_timestamp) shift_details = get_shift_details(default_shift, for_timestamp)
@ -257,38 +306,55 @@ def get_employee_shift(employee: str, for_timestamp: datetime = None, consider_d
# if no shift is found, find next or prev shift assignment based on direction # if no shift is found, find next or prev shift assignment based on direction
if not shift_details and next_shift_direction: if not shift_details and next_shift_direction:
shift_details = get_prev_or_next_shift(employee, for_timestamp, consider_default_shift, default_shift, next_shift_direction) shift_details = get_prev_or_next_shift(
employee, for_timestamp, consider_default_shift, default_shift, next_shift_direction
)
return shift_details or {} return shift_details or {}
def get_prev_or_next_shift(employee: str, for_timestamp: datetime, consider_default_shift: bool, def get_prev_or_next_shift(
default_shift: str, next_shift_direction: str) -> Dict: employee: str,
for_timestamp: datetime,
consider_default_shift: bool,
default_shift: str,
next_shift_direction: str,
) -> Dict:
"""Returns a dict of shift details for the next or prev shift based on the next_shift_direction""" """Returns a dict of shift details for the next or prev shift based on the next_shift_direction"""
MAX_DAYS = 366 MAX_DAYS = 366
shift_details = {} shift_details = {}
if consider_default_shift and default_shift: if consider_default_shift and default_shift:
direction = -1 if next_shift_direction == 'reverse' else 1 direction = -1 if next_shift_direction == "reverse" else 1
for i in range(MAX_DAYS): for i in range(MAX_DAYS):
date = for_timestamp + timedelta(days=direction*(i+1)) date = for_timestamp + timedelta(days=direction * (i + 1))
shift_details = get_employee_shift(employee, date, consider_default_shift, None) shift_details = get_employee_shift(employee, date, consider_default_shift, None)
if shift_details: if shift_details:
break break
else: else:
direction = '<' if next_shift_direction == 'reverse' else '>' direction = "<" if next_shift_direction == "reverse" else ">"
sort_order = 'desc' if next_shift_direction == 'reverse' else 'asc' sort_order = "desc" if next_shift_direction == "reverse" else "asc"
dates = frappe.db.get_all('Shift Assignment', dates = frappe.db.get_all(
['start_date', 'end_date'], "Shift Assignment",
{'employee': employee, 'start_date': (direction, for_timestamp.date()), 'docstatus': 1, 'status': 'Active'}, ["start_date", "end_date"],
{
"employee": employee,
"start_date": (direction, for_timestamp.date()),
"docstatus": 1,
"status": "Active",
},
as_list=True, as_list=True,
limit=MAX_DAYS, order_by='start_date ' + sort_order) limit=MAX_DAYS,
order_by="start_date " + sort_order,
)
if dates: if dates:
for date in dates: for date in dates:
if date[1] and date[1] < for_timestamp.date(): if date[1] and date[1] < for_timestamp.date():
continue continue
shift_details = get_employee_shift(employee, datetime.combine(date[0], for_timestamp.time()), consider_default_shift, None) shift_details = get_employee_shift(
employee, datetime.combine(date[0], for_timestamp.time()), consider_default_shift, None
)
if shift_details: if shift_details:
break break
@ -296,7 +362,9 @@ def get_prev_or_next_shift(employee: str, for_timestamp: datetime, consider_defa
def is_holiday_date(employee: str, shift_details: Dict) -> bool: def is_holiday_date(employee: str, shift_details: Dict) -> bool:
holiday_list_name = frappe.db.get_value('Shift Type', shift_details.shift_type.name, 'holiday_list') holiday_list_name = frappe.db.get_value(
"Shift Type", shift_details.shift_type.name, "holiday_list"
)
if not holiday_list_name: if not holiday_list_name:
holiday_list_name = get_holiday_list_for_employee(employee, False) holiday_list_name = get_holiday_list_for_employee(employee, False)
@ -304,17 +372,23 @@ def is_holiday_date(employee: str, shift_details: Dict) -> bool:
return holiday_list_name and is_holiday(holiday_list_name, shift_details.start_datetime.date()) return holiday_list_name and is_holiday(holiday_list_name, shift_details.start_datetime.date())
def get_employee_shift_timings(employee: str, for_timestamp: datetime = None, consider_default_shift: bool = False) -> List[Dict]: def get_employee_shift_timings(
employee: str, for_timestamp: datetime = None, consider_default_shift: bool = False
) -> List[Dict]:
"""Returns previous shift, current/upcoming shift, next_shift for the given timestamp and employee""" """Returns previous shift, current/upcoming shift, next_shift for the given timestamp and employee"""
if for_timestamp is None: if for_timestamp is None:
for_timestamp = now_datetime() for_timestamp = now_datetime()
# write and verify a test case for midnight shift. # write and verify a test case for midnight shift.
prev_shift = curr_shift = next_shift = None prev_shift = curr_shift = next_shift = None
curr_shift = get_employee_shift(employee, for_timestamp, consider_default_shift, 'forward') curr_shift = get_employee_shift(employee, for_timestamp, consider_default_shift, "forward")
if curr_shift: if curr_shift:
next_shift = get_employee_shift(employee, curr_shift.start_datetime + timedelta(days=1), consider_default_shift, 'forward') next_shift = get_employee_shift(
prev_shift = get_employee_shift(employee, for_timestamp + timedelta(days=-1), consider_default_shift, 'reverse') employee, curr_shift.start_datetime + timedelta(days=1), consider_default_shift, "forward"
)
prev_shift = get_employee_shift(
employee, for_timestamp + timedelta(days=-1), consider_default_shift, "reverse"
)
if curr_shift: if curr_shift:
# adjust actual start and end times if they are overlapping with grace period (before start and after end) # adjust actual start and end times if they are overlapping with grace period (before start and after end)
@ -330,26 +404,35 @@ def get_employee_shift_timings(employee: str, for_timestamp: datetime = None, co
else prev_shift.actual_end else prev_shift.actual_end
) )
if next_shift: if next_shift:
next_shift.actual_start = curr_shift.end_datetime if next_shift.actual_start < curr_shift.end_datetime else next_shift.actual_start next_shift.actual_start = (
curr_shift.actual_end = next_shift.actual_start if curr_shift.actual_end > next_shift.actual_start else curr_shift.actual_end curr_shift.end_datetime
if next_shift.actual_start < curr_shift.end_datetime
else next_shift.actual_start
)
curr_shift.actual_end = (
next_shift.actual_start
if curr_shift.actual_end > next_shift.actual_start
else curr_shift.actual_end
)
return prev_shift, curr_shift, next_shift return prev_shift, curr_shift, next_shift
def get_actual_start_end_datetime_of_shift(employee: str, for_timestamp: datetime, consider_default_shift: bool = False) -> Dict: def get_actual_start_end_datetime_of_shift(
""" employee: str, for_timestamp: datetime, consider_default_shift: bool = False
Params: ) -> Dict:
employee (str): Employee name """Returns a Dict containing shift details with actual_start and actual_end datetime values
for_timestamp (datetime, optional): Datetime value of checkin, if not provided considers current datetime Here 'actual' means taking into account the "begin_check_in_before_shift_start_time" and "allow_check_out_after_shift_end_time".
consider_default_shift (bool, optional): Flag (defaults to False) to specify whether to consider Empty Dict is returned if the timestamp is outside any actual shift timings.
default shift in employee master if no shift assignment is found
Returns: :param employee (str): Employee name
dict: Dict containing shift details with actual_start and actual_end datetime values :param for_timestamp (datetime, optional): Datetime value of checkin, if not provided considers current datetime
Here 'actual' means taking into account the "begin_check_in_before_shift_start_time" and "allow_check_out_after_shift_end_time". :param consider_default_shift (bool, optional): Flag (defaults to False) to specify whether to consider
Empty Dict is returned if the timestamp is outside any actual shift timings. default shift in employee master if no shift assignment is found
""" """
shift_timings_as_per_timestamp = get_employee_shift_timings(employee, for_timestamp, consider_default_shift) shift_timings_as_per_timestamp = get_employee_shift_timings(
employee, for_timestamp, consider_default_shift
)
return get_exact_shift(shift_timings_as_per_timestamp, for_timestamp) return get_exact_shift(shift_timings_as_per_timestamp, for_timestamp)
@ -381,25 +464,22 @@ def get_exact_shift(shifts: List, for_timestamp: datetime) -> Dict:
if timestamp_index: if timestamp_index:
break break
if timestamp_index and timestamp_index%2 == 1: if timestamp_index and timestamp_index % 2 == 1:
shift_details = shifts[int((timestamp_index-1)/2)] shift_details = shifts[int((timestamp_index - 1) / 2)]
return shift_details return shift_details
def get_shift_details(shift_type_name: str, for_timestamp: datetime = None) -> Dict: def get_shift_details(shift_type_name: str, for_timestamp: datetime = None) -> Dict:
""" """Returns a Dict containing shift details with the following data:
Params: 'shift_type' - Object of DocType Shift Type,
shift_type_name (str): shift type name for which shift_details are required. 'start_datetime' - datetime of shift start on given timestamp,
for_timestamp (datetime, optional): Datetime value of checkin, if not provided considers current datetime 'end_datetime' - datetime of shift end on given timestamp,
'actual_start' - datetime of shift start after adding 'begin_check_in_before_shift_start_time',
'actual_end' - datetime of shift end after adding 'allow_check_out_after_shift_end_time' (None is returned if this is zero)
Returns: :param shift_type_name (str): shift type name for which shift_details are required.
dict: Dict containing shift details with the following data: :param for_timestamp (datetime, optional): Datetime value of checkin, if not provided considers current datetime
'shift_type' - Object of DocType Shift Type,
'start_datetime' - datetime of shift start on given timestamp,
'end_datetime' - datetime of shift end on given timestamp,
'actual_start' - datetime of shift start after adding 'begin_check_in_before_shift_start_time',
'actual_end' - datetime of shift end after adding 'allow_check_out_after_shift_end_time' (None is returned if this is zero)
""" """
if not shift_type_name: if not shift_type_name:
return {} return {}
@ -407,8 +487,10 @@ def get_shift_details(shift_type_name: str, for_timestamp: datetime = None) -> D
if for_timestamp is None: if for_timestamp is None:
for_timestamp = now_datetime() for_timestamp = now_datetime()
shift_type = frappe.get_doc('Shift Type', shift_type_name) shift_type = frappe.get_doc("Shift Type", shift_type_name)
shift_actual_start = shift_type.start_time - timedelta(minutes=shift_type.begin_check_in_before_shift_start_time) shift_actual_start = shift_type.start_time - timedelta(
minutes=shift_type.begin_check_in_before_shift_start_time
)
if shift_type.start_time > shift_type.end_time: if shift_type.start_time > shift_type.end_time:
# shift spans accross 2 different days # shift spans accross 2 different days
@ -428,13 +510,17 @@ def get_shift_details(shift_type_name: str, for_timestamp: datetime = None) -> D
start_datetime = datetime.combine(for_timestamp, datetime.min.time()) + shift_type.start_time start_datetime = datetime.combine(for_timestamp, datetime.min.time()) + shift_type.start_time
end_datetime = datetime.combine(for_timestamp, datetime.min.time()) + shift_type.end_time end_datetime = datetime.combine(for_timestamp, datetime.min.time()) + shift_type.end_time
actual_start = start_datetime - timedelta(minutes=shift_type.begin_check_in_before_shift_start_time) actual_start = start_datetime - timedelta(
minutes=shift_type.begin_check_in_before_shift_start_time
)
actual_end = end_datetime + timedelta(minutes=shift_type.allow_check_out_after_shift_end_time) actual_end = end_datetime + timedelta(minutes=shift_type.allow_check_out_after_shift_end_time)
return frappe._dict({ return frappe._dict(
'shift_type': shift_type, {
'start_datetime': start_datetime, "shift_type": shift_type,
'end_datetime': end_datetime, "start_datetime": start_datetime,
'actual_start': actual_start, "end_datetime": end_datetime,
'actual_end': actual_end "actual_start": actual_start,
}) "actual_end": actual_end,
}
)

View File

@ -34,19 +34,40 @@ class ShiftType(Document):
return return
filters = { filters = {
'skip_auto_attendance': 0, "skip_auto_attendance": 0,
'attendance': ('is', 'not set'), "attendance": ("is", "not set"),
'time': ('>=', self.process_attendance_after), "time": (">=", self.process_attendance_after),
'shift_actual_end': ('<', self.last_sync_of_checkin), "shift_actual_end": ("<", self.last_sync_of_checkin),
'shift': self.name "shift": self.name,
} }
logs = frappe.db.get_list('Employee Checkin', fields="*", filters=filters, order_by="employee,time") logs = frappe.db.get_list(
"Employee Checkin", fields="*", filters=filters, order_by="employee,time"
)
for key, group in itertools.groupby(logs, key=lambda x: (x['employee'], x['shift_actual_start'])): for key, group in itertools.groupby(
logs, key=lambda x: (x["employee"], x["shift_actual_start"])
):
single_shift_logs = list(group) single_shift_logs = list(group)
attendance_status, working_hours, late_entry, early_exit, in_time, out_time = self.get_attendance(single_shift_logs) (
mark_attendance_and_link_log(single_shift_logs, attendance_status, key[1].date(), attendance_status,
working_hours, late_entry, early_exit, in_time, out_time, self.name) working_hours,
late_entry,
early_exit,
in_time,
out_time,
) = self.get_attendance(single_shift_logs)
mark_attendance_and_link_log(
single_shift_logs,
attendance_status,
key[1].date(),
working_hours,
late_entry,
early_exit,
in_time,
out_time,
self.name,
)
for employee in self.get_assigned_employee(self.process_attendance_after, True): for employee in self.get_assigned_employee(self.process_attendance_after, True):
self.mark_absent_for_dates_with_no_attendance(employee) self.mark_absent_for_dates_with_no_attendance(employee)
@ -54,9 +75,9 @@ class ShiftType(Document):
def get_attendance(self, logs): def get_attendance(self, logs):
"""Return attendance_status, working_hours, late_entry, early_exit, in_time, out_time """Return attendance_status, working_hours, late_entry, early_exit, in_time, out_time
for a set of logs belonging to a single shift. for a set of logs belonging to a single shift.
Assumption: Assumptions:
1. These logs belongs to a single shift, single employee and it's not in a holiday date. 1. These logs belongs to a single shift, single employee and it's not in a holiday date.
2. Logs are in chronological order 2. Logs are in chronological order
""" """
late_entry = early_exit = False late_entry = early_exit = False
total_working_hours, in_time, out_time = calculate_working_hours( total_working_hours, in_time, out_time = calculate_working_hours(
@ -116,8 +137,9 @@ class ShiftType(Document):
mark_attendance(employee, date, "Absent", self.name) mark_attendance(employee, date, "Absent", self.name)
def get_start_and_end_dates(self, employee): def get_start_and_end_dates(self, employee):
date_of_joining, relieving_date, employee_creation = frappe.db.get_value("Employee", employee, date_of_joining, relieving_date, employee_creation = frappe.db.get_value(
["date_of_joining", "relieving_date", "creation"]) "Employee", employee, ["date_of_joining", "relieving_date", "creation"]
)
if not date_of_joining: if not date_of_joining:
date_of_joining = employee_creation.date() date_of_joining = employee_creation.date()
@ -126,26 +148,32 @@ class ShiftType(Document):
end_date = None end_date = None
shift_details = get_shift_details(self.name, get_datetime(self.last_sync_of_checkin)) shift_details = get_shift_details(self.name, get_datetime(self.last_sync_of_checkin))
last_shift_time = shift_details.actual_start if shift_details else get_datetime(self.last_sync_of_checkin) last_shift_time = (
shift_details.actual_start if shift_details else get_datetime(self.last_sync_of_checkin)
)
prev_shift = get_employee_shift(employee, last_shift_time - timedelta(days=1), True, 'reverse') prev_shift = get_employee_shift(employee, last_shift_time - timedelta(days=1), True, "reverse")
if prev_shift: if prev_shift:
end_date = min(prev_shift.start_datetime.date(), relieving_date) if relieving_date else prev_shift.start_datetime.date() end_date = (
min(prev_shift.start_datetime.date(), relieving_date)
if relieving_date
else prev_shift.start_datetime.date()
)
return start_date, end_date return start_date, end_date
def get_assigned_employee(self, from_date=None, consider_default_shift=False): def get_assigned_employee(self, from_date=None, consider_default_shift=False):
filters = {'shift_type': self.name, 'docstatus': '1'} filters = {"shift_type": self.name, "docstatus": "1"}
if from_date: if from_date:
filters['start_date'] = ('>', from_date) filters["start_date"] = (">", from_date)
assigned_employees = frappe.get_all('Shift Assignment', filters=filters, pluck='employee') assigned_employees = frappe.get_all("Shift Assignment", filters=filters, pluck="employee")
if consider_default_shift: if consider_default_shift:
filters = {'default_shift': self.name, 'status': ['!=', 'Inactive']} filters = {"default_shift": self.name, "status": ["!=", "Inactive"]}
default_shift_employees = frappe.get_all('Employee', filters=filters, pluck='name') default_shift_employees = frappe.get_all("Employee", filters=filters, pluck="name")
return list(set(assigned_employees+default_shift_employees)) return list(set(assigned_employees + default_shift_employees))
return assigned_employees return assigned_employees

View File

@ -14,44 +14,47 @@ from frappe.utils import cint, cstr, getdate
Filters = frappe._dict Filters = frappe._dict
status_map = { status_map = {
'Present': 'P', "Present": "P",
'Absent': 'A', "Absent": "A",
'Half Day': 'HD', "Half Day": "HD",
'Work From Home': 'WFH', "Work From Home": "WFH",
'On Leave': 'L', "On Leave": "L",
'Holiday': 'H', "Holiday": "H",
'Weekly Off': 'WO' "Weekly Off": "WO",
} }
day_abbr = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] day_abbr = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
def execute(filters: Optional[Filters] = None) -> Tuple:
def execute(filters: Optional[Filters] = None) -> Tuple:
filters = frappe._dict(filters or {}) filters = frappe._dict(filters or {})
if not (filters.month and filters.year): if not (filters.month and filters.year):
frappe.throw(_('Please select month and year.')) frappe.throw(_("Please select month and year."))
attendance_map = get_attendance_map(filters) attendance_map = get_attendance_map(filters)
if not attendance_map: if not attendance_map:
frappe.msgprint(_('No attendance records found.'), alert=True, indicator='orange') frappe.msgprint(_("No attendance records found."), alert=True, indicator="orange")
return [], [], None, None return [], [], None, None
columns = get_columns(filters) columns = get_columns(filters)
data = get_data(filters, attendance_map) data = get_data(filters, attendance_map)
if not data: if not data:
frappe.msgprint(_('No attendance records found for this criteria.'), alert=True, indicator='orange') frappe.msgprint(
_("No attendance records found for this criteria."), alert=True, indicator="orange"
)
return columns, [], None, None return columns, [], None, None
message = get_message() if not filters.summarized_view else '' message = get_message() if not filters.summarized_view else ""
chart = get_chart_data(attendance_map, filters) chart = get_chart_data(attendance_map, filters)
return columns, data, message, chart return columns, data, message, chart
def get_message() -> str: def get_message() -> str:
message = '' message = ""
colors = ['green', 'red', 'orange', 'green', '#318AD8', '', ''] colors = ["green", "red", "orange", "green", "#318AD8", "", ""]
count = 0 count = 0
for status, abbr in status_map.items(): for status, abbr in status_map.items():
@ -70,39 +73,84 @@ def get_columns(filters: Filters) -> List[Dict]:
if filters.group_by: if filters.group_by:
columns.append( columns.append(
{'label': _(filters.group_by), 'fieldname': frappe.scrub(filters.group_by), 'fieldtype': 'Link', 'options': 'Branch', 'width': 120} {
"label": _(filters.group_by),
"fieldname": frappe.scrub(filters.group_by),
"fieldtype": "Link",
"options": "Branch",
"width": 120,
}
) )
columns.extend([ columns.extend(
{'label': _('Employee'), 'fieldname': 'employee', 'fieldtype': 'Link', 'options': 'Employee', 'width': 135}, [
{'label': _('Employee Name'), 'fieldname': 'employee_name', 'fieldtype': 'Data', 'width': 120} {
]) "label": _("Employee"),
"fieldname": "employee",
"fieldtype": "Link",
"options": "Employee",
"width": 135,
},
{"label": _("Employee Name"), "fieldname": "employee_name", "fieldtype": "Data", "width": 120},
]
)
if filters.summarized_view: if filters.summarized_view:
columns.extend([ columns.extend(
{'label': _('Total Present'), 'fieldname': 'total_present', 'fieldtype': 'Float', 'width': 110}, [
{'label': _('Total Leaves'), 'fieldname': 'total_leaves', 'fieldtype': 'Float', 'width': 110}, {
{'label': _('Total Absent'), 'fieldname': 'total_absent', 'fieldtype': 'Float', 'width': 110}, "label": _("Total Present"),
{'label': _('Total Holidays'), 'fieldname': 'total_holidays', 'fieldtype': 'Float', 'width': 120}, "fieldname": "total_present",
{'label': _('Unmarked Days'), 'fieldname': 'unmarked_days', 'fieldtype': 'Float', 'width': 130} "fieldtype": "Float",
]) "width": 110,
},
{"label": _("Total Leaves"), "fieldname": "total_leaves", "fieldtype": "Float", "width": 110},
{"label": _("Total Absent"), "fieldname": "total_absent", "fieldtype": "Float", "width": 110},
{
"label": _("Total Holidays"),
"fieldname": "total_holidays",
"fieldtype": "Float",
"width": 120,
},
{
"label": _("Unmarked Days"),
"fieldname": "unmarked_days",
"fieldtype": "Float",
"width": 130,
},
]
)
columns.extend(get_columns_for_leave_types()) columns.extend(get_columns_for_leave_types())
columns.extend([ columns.extend(
{'label': _('Total Late Entries'), 'fieldname': 'total_late_entries', 'fieldtype': 'Float', 'width': 140}, [
{'label': _('Total Early Exits'), 'fieldname': 'total_early_exits', 'fieldtype': 'Float', 'width': 140} {
]) "label": _("Total Late Entries"),
"fieldname": "total_late_entries",
"fieldtype": "Float",
"width": 140,
},
{
"label": _("Total Early Exits"),
"fieldname": "total_early_exits",
"fieldtype": "Float",
"width": 140,
},
]
)
else: else:
columns.append({'label': _('Shift'), 'fieldname': 'shift', 'fieldtype': 'Data', 'width': 120}) columns.append({"label": _("Shift"), "fieldname": "shift", "fieldtype": "Data", "width": 120})
columns.extend(get_columns_for_days(filters)) columns.extend(get_columns_for_days(filters))
return columns return columns
def get_columns_for_leave_types() -> List[Dict]: def get_columns_for_leave_types() -> List[Dict]:
leave_types = frappe.db.get_all('Leave Type', pluck='name') leave_types = frappe.db.get_all("Leave Type", pluck="name")
types = [] types = []
for entry in leave_types: for entry in leave_types:
types.append({'label': entry, 'fieldname': frappe.scrub(entry), 'fieldtype': 'Float', 'width': 120}) types.append(
{"label": entry, "fieldname": frappe.scrub(entry), "fieldtype": "Float", "width": 120}
)
return types return types
@ -111,23 +159,14 @@ def get_columns_for_days(filters: Filters) -> List[Dict]:
total_days = get_total_days_in_month(filters) total_days = get_total_days_in_month(filters)
days = [] days = []
for day in range(1, total_days+1): for day in range(1, total_days + 1):
# forms the dates from selected year and month from filters # forms the dates from selected year and month from filters
date = '{}-{}-{}'.format( date = "{}-{}-{}".format(cstr(filters.year), cstr(filters.month), cstr(day))
cstr(filters.year),
cstr(filters.month),
cstr(day)
)
# gets abbr from weekday number # gets abbr from weekday number
weekday = day_abbr[getdate(date).weekday()] weekday = day_abbr[getdate(date).weekday()]
# sets days as 1 Mon, 2 Tue, 3 Wed # sets days as 1 Mon, 2 Tue, 3 Wed
label = '{} {}'.format(cstr(day), weekday) label = "{} {}".format(cstr(day), weekday)
days.append({ days.append({"label": label, "fieldtype": "Data", "fieldname": day, "width": 65})
'label': label,
'fieldtype': 'Data',
'fieldname': day,
'width': 65
})
return days return days
@ -137,7 +176,9 @@ def get_total_days_in_month(filters: Filters) -> int:
def get_data(filters: Filters, attendance_map: Dict) -> List[Dict]: def get_data(filters: Filters, attendance_map: Dict) -> List[Dict]:
employee_details, group_by_param_values = get_employee_related_details(filters.group_by, filters.company) employee_details, group_by_param_values = get_employee_related_details(
filters.group_by, filters.company
)
holiday_map = get_holiday_map(filters) holiday_map = get_holiday_map(filters)
data = [] data = []
@ -151,9 +192,7 @@ def get_data(filters: Filters, attendance_map: Dict) -> List[Dict]:
records = get_rows(employee_details[value], filters, holiday_map, attendance_map) records = get_rows(employee_details[value], filters, holiday_map, attendance_map)
if records: if records:
data.append({ data.append({group_by_column: frappe.bold(value)})
group_by_column: frappe.bold(value)
})
data.extend(records) data.extend(records)
else: else:
data = get_rows(employee_details, filters, holiday_map, attendance_map) data = get_rows(employee_details, filters, holiday_map, attendance_map)
@ -163,30 +202,31 @@ def get_data(filters: Filters, attendance_map: Dict) -> List[Dict]:
def get_attendance_map(filters: Filters) -> Dict: def get_attendance_map(filters: Filters) -> Dict:
"""Returns a dictionary of employee wise attendance map as per shifts for all the days of the month like """Returns a dictionary of employee wise attendance map as per shifts for all the days of the month like
{ {
'employee1': { 'employee1': {
'Morning Shift': {1: 'Present', 2: 'Absent', ...} 'Morning Shift': {1: 'Present', 2: 'Absent', ...}
'Evening Shift': {1: 'Absent', 2: 'Present', ...} 'Evening Shift': {1: 'Absent', 2: 'Present', ...}
}, },
'employee2': { 'employee2': {
'Afternoon Shift': {1: 'Present', 2: 'Absent', ...} 'Afternoon Shift': {1: 'Present', 2: 'Absent', ...}
'Night Shift': {1: 'Absent', 2: 'Absent', ...} 'Night Shift': {1: 'Absent', 2: 'Absent', ...}
} }
} }
""" """
Attendance = frappe.qb.DocType('Attendance') Attendance = frappe.qb.DocType("Attendance")
query = ( query = (
frappe.qb.from_(Attendance) frappe.qb.from_(Attendance)
.select( .select(
Attendance.employee, Attendance.employee,
Extract('day', Attendance.attendance_date).as_('day_of_month'), Extract("day", Attendance.attendance_date).as_("day_of_month"),
Attendance.status, Attendance.status,
Attendance.shift Attendance.shift,
).where( )
.where(
(Attendance.docstatus == 1) (Attendance.docstatus == 1)
& (Attendance.company == filters.company) & (Attendance.company == filters.company)
& (Extract('month', Attendance.attendance_date) == filters.month) & (Extract("month", Attendance.attendance_date) == filters.month)
& (Extract('year', Attendance.attendance_date) == filters.year) & (Extract("year", Attendance.attendance_date) == filters.year)
) )
) )
if filters.employee: if filters.employee:
@ -205,18 +245,23 @@ def get_attendance_map(filters: Filters) -> Dict:
def get_employee_related_details(group_by: str, company: str) -> Tuple[Dict, List]: def get_employee_related_details(group_by: str, company: str) -> Tuple[Dict, List]:
"""Returns """Returns
1. nested dict for employee details 1. nested dict for employee details
2. list of values for the group by filter 2. list of values for the group by filter
eg: if group by filter is set to "Department" then returns a list like ['HR', 'Support', 'Engineering']
""" """
Employee = frappe.qb.DocType('Employee') Employee = frappe.qb.DocType("Employee")
query = ( query = (
frappe.qb.from_(Employee) frappe.qb.from_(Employee)
.select( .select(
Employee.name, Employee.employee_name, Employee.designation, Employee.name,
Employee.grade, Employee.department, Employee.branch, Employee.employee_name,
Employee.company, Employee.holiday_list Employee.designation,
).where(Employee.company == company) Employee.grade,
Employee.department,
Employee.branch,
Employee.company,
Employee.holiday_list,
)
.where(Employee.company == company)
) )
if group_by: if group_by:
@ -247,23 +292,23 @@ def get_holiday_map(filters: Filters) -> Dict[str, List[Dict]]:
Returns a dict of holidays falling in the filter month and year Returns a dict of holidays falling in the filter month and year
with list name as key and list of holidays as values like with list name as key and list of holidays as values like
{ {
'Holiday List 1': [ 'Holiday List 1': [
{'day_of_month': '0' , 'weekly_off': 1}, {'day_of_month': '0' , 'weekly_off': 1},
{'day_of_month': '1', 'weekly_off': 0} {'day_of_month': '1', 'weekly_off': 0}
], ],
'Holiday List 2': [ 'Holiday List 2': [
{'day_of_month': '0' , 'weekly_off': 1}, {'day_of_month': '0' , 'weekly_off': 1},
{'day_of_month': '1', 'weekly_off': 0} {'day_of_month': '1', 'weekly_off': 0}
] ]
} }
""" """
# add default holiday list too # add default holiday list too
holiday_lists = frappe.db.get_all('Holiday List', pluck='name') holiday_lists = frappe.db.get_all("Holiday List", pluck="name")
default_holiday_list = frappe.get_cached_value('Company', filters.company, 'default_holiday_list') default_holiday_list = frappe.get_cached_value("Company", filters.company, "default_holiday_list")
holiday_lists.append(default_holiday_list) holiday_lists.append(default_holiday_list)
holiday_map = frappe._dict() holiday_map = frappe._dict()
Holiday = frappe.qb.DocType('Holiday') Holiday = frappe.qb.DocType("Holiday")
for d in holiday_lists: for d in holiday_lists:
if not d: if not d:
@ -271,13 +316,11 @@ def get_holiday_map(filters: Filters) -> Dict[str, List[Dict]]:
holidays = ( holidays = (
frappe.qb.from_(Holiday) frappe.qb.from_(Holiday)
.select( .select(Extract("day", Holiday.holiday_date).as_("day_of_month"), Holiday.weekly_off)
Extract('day', Holiday.holiday_date).as_('day_of_month'), .where(
Holiday.weekly_off
).where(
(Holiday.parent == d) (Holiday.parent == d)
& (Extract('month', Holiday.holiday_date) == filters.month) & (Extract("month", Holiday.holiday_date) == filters.month)
& (Extract('year', Holiday.holiday_date) == filters.year) & (Extract("year", Holiday.holiday_date) == filters.year)
) )
).run(as_dict=True) ).run(as_dict=True)
@ -286,13 +329,15 @@ def get_holiday_map(filters: Filters) -> Dict[str, List[Dict]]:
return holiday_map return holiday_map
def get_rows(employee_details: Dict, filters: Filters, holiday_map: Dict, attendance_map: Dict) -> List[Dict]: def get_rows(
employee_details: Dict, filters: Filters, holiday_map: Dict, attendance_map: Dict
) -> List[Dict]:
records = [] records = []
default_holiday_list = frappe.get_cached_value('Company', filters.company, 'default_holiday_list') default_holiday_list = frappe.get_cached_value("Company", filters.company, "default_holiday_list")
for employee, details in employee_details.items(): for employee, details in employee_details.items():
emp_holiday_list = details.holiday_list or default_holiday_list emp_holiday_list = details.holiday_list or default_holiday_list
holidays = holiday_map[emp_holiday_list] holidays = holiday_map.get(emp_holiday_list)
if filters.summarized_view: if filters.summarized_view:
attendance = get_attendance_status_for_summarized_view(employee, filters, holidays) attendance = get_attendance_status_for_summarized_view(employee, filters, holidays)
@ -302,7 +347,7 @@ def get_rows(employee_details: Dict, filters: Filters, holiday_map: Dict, attend
leave_summary = get_leave_summary(employee, filters) leave_summary = get_leave_summary(employee, filters)
entry_exits_summary = get_entry_exits_summary(employee, filters) entry_exits_summary = get_entry_exits_summary(employee, filters)
row = {'employee': employee, 'employee_name': details.employee_name} row = {"employee": employee, "employee_name": details.employee_name}
set_defaults_for_summarized_view(filters, row) set_defaults_for_summarized_view(filters, row)
row.update(attendance) row.update(attendance)
row.update(leave_summary) row.update(leave_summary)
@ -314,12 +359,13 @@ def get_rows(employee_details: Dict, filters: Filters, holiday_map: Dict, attend
if not employee_attendance: if not employee_attendance:
continue continue
attendance_for_employee = get_attendance_status_for_detailed_view(employee, filters, employee_attendance, holidays) attendance_for_employee = get_attendance_status_for_detailed_view(
employee, filters, employee_attendance, holidays
)
# set employee details in the first row # set employee details in the first row
attendance_for_employee[0].update({ attendance_for_employee[0].update(
'employee': employee, {"employee": employee, "employee_name": details.employee_name}
'employee_name': details.employee_name )
})
records.extend(attendance_for_employee) records.extend(attendance_for_employee)
@ -328,13 +374,15 @@ def get_rows(employee_details: Dict, filters: Filters, holiday_map: Dict, attend
def set_defaults_for_summarized_view(filters, row): def set_defaults_for_summarized_view(filters, row):
for entry in get_columns(filters): for entry in get_columns(filters):
if entry.get('fieldtype') == 'Float': if entry.get("fieldtype") == "Float":
row[entry.get('fieldname')] = 0.0 row[entry.get("fieldname")] = 0.0
def get_attendance_status_for_summarized_view(employee: str, filters: Filters, holidays: List) -> Dict: def get_attendance_status_for_summarized_view(
employee: str, filters: Filters, holidays: List
) -> Dict:
"""Returns dict of attendance status for employee like """Returns dict of attendance status for employee like
{'total_present': 1.5, 'total_leaves': 0.5, 'total_absent': 13.5, 'total_holidays': 8, 'unmarked_days': 5} {'total_present': 1.5, 'total_leaves': 0.5, 'total_absent': 13.5, 'total_holidays': 8, 'unmarked_days': 5}
""" """
summary, attendance_days = get_attendance_summary_and_days(employee, filters) summary, attendance_days = get_attendance_summary_and_days(employee, filters)
if not any(summary.values()): if not any(summary.values()):
@ -348,83 +396,93 @@ def get_attendance_status_for_summarized_view(employee: str, filters: Filters, h
continue continue
status = get_holiday_status(day, holidays) status = get_holiday_status(day, holidays)
if status in ['Weekly Off', 'Holiday']: if status in ["Weekly Off", "Holiday"]:
total_holidays += 1 total_holidays += 1
elif not status: elif not status:
total_unmarked_days += 1 total_unmarked_days += 1
return { return {
'total_present': summary.total_present + summary.total_half_days, "total_present": summary.total_present + summary.total_half_days,
'total_leaves': summary.total_leaves + summary.total_half_days, "total_leaves": summary.total_leaves + summary.total_half_days,
'total_absent': summary.total_absent + summary.total_half_days, "total_absent": summary.total_absent + summary.total_half_days,
'total_holidays': total_holidays, "total_holidays": total_holidays,
'unmarked_days': total_unmarked_days "unmarked_days": total_unmarked_days,
} }
def get_attendance_summary_and_days(employee: str, filters: Filters) -> Tuple[Dict, List]: def get_attendance_summary_and_days(employee: str, filters: Filters) -> Tuple[Dict, List]:
Attendance = frappe.qb.DocType('Attendance') Attendance = frappe.qb.DocType("Attendance")
present_case = frappe.qb.terms.Case().when(((Attendance.status == 'Present') | (Attendance.status == 'Work From Home')), 1).else_(0) present_case = (
sum_present = Sum(present_case).as_('total_present') frappe.qb.terms.Case()
.when(((Attendance.status == "Present") | (Attendance.status == "Work From Home")), 1)
.else_(0)
)
sum_present = Sum(present_case).as_("total_present")
absent_case = frappe.qb.terms.Case().when(Attendance.status == 'Absent', 1).else_(0) absent_case = frappe.qb.terms.Case().when(Attendance.status == "Absent", 1).else_(0)
sum_absent = Sum(absent_case).as_('total_absent') sum_absent = Sum(absent_case).as_("total_absent")
leave_case = frappe.qb.terms.Case().when(Attendance.status == 'On Leave', 1).else_(0) leave_case = frappe.qb.terms.Case().when(Attendance.status == "On Leave", 1).else_(0)
sum_leave = Sum(leave_case).as_('total_leaves') sum_leave = Sum(leave_case).as_("total_leaves")
half_day_case = frappe.qb.terms.Case().when(Attendance.status == 'Half Day', 0.5).else_(0) half_day_case = frappe.qb.terms.Case().when(Attendance.status == "Half Day", 0.5).else_(0)
sum_half_day = Sum(half_day_case).as_('total_half_days') sum_half_day = Sum(half_day_case).as_("total_half_days")
summary = ( summary = (
frappe.qb.from_(Attendance) frappe.qb.from_(Attendance)
.select( .select(
sum_present, sum_absent, sum_leave, sum_half_day, sum_present,
).where( sum_absent,
sum_leave,
sum_half_day,
)
.where(
(Attendance.docstatus == 1) (Attendance.docstatus == 1)
& (Attendance.employee == employee) & (Attendance.employee == employee)
& (Attendance.company == filters.company) & (Attendance.company == filters.company)
& (Extract('month', Attendance.attendance_date) == filters.month) & (Extract("month", Attendance.attendance_date) == filters.month)
& (Extract('year', Attendance.attendance_date) == filters.year) & (Extract("year", Attendance.attendance_date) == filters.year)
) )
).run(as_dict=True) ).run(as_dict=True)
days = ( days = (
frappe.qb.from_(Attendance) frappe.qb.from_(Attendance)
.select(Extract('day', Attendance.attendance_date).as_('day_of_month')) .select(Extract("day", Attendance.attendance_date).as_("day_of_month"))
.distinct() .distinct()
.where( .where(
(Attendance.docstatus == 1) (Attendance.docstatus == 1)
& (Attendance.employee == employee) & (Attendance.employee == employee)
& (Attendance.company == filters.company) & (Attendance.company == filters.company)
& (Extract('month', Attendance.attendance_date) == filters.month) & (Extract("month", Attendance.attendance_date) == filters.month)
& (Extract('year', Attendance.attendance_date) == filters.year) & (Extract("year", Attendance.attendance_date) == filters.year)
) )
).run(pluck=True) ).run(pluck=True)
return summary[0], days return summary[0], days
def get_attendance_status_for_detailed_view(employee: str, filters: Filters, employee_attendance: Dict, holidays: List) -> List[Dict]: def get_attendance_status_for_detailed_view(
employee: str, filters: Filters, employee_attendance: Dict, holidays: List
) -> List[Dict]:
"""Returns list of shift-wise attendance status for employee """Returns list of shift-wise attendance status for employee
[ [
{'shift': 'Morning Shift', 1: 'A', 2: 'P', 3: 'A'....}, {'shift': 'Morning Shift', 1: 'A', 2: 'P', 3: 'A'....},
{'shift': 'Evening Shift', 1: 'P', 2: 'A', 3: 'P'....} {'shift': 'Evening Shift', 1: 'P', 2: 'A', 3: 'P'....}
] ]
""" """
total_days = get_total_days_in_month(filters) total_days = get_total_days_in_month(filters)
attendance_values = [] attendance_values = []
for shift, status_dict in employee_attendance.items(): for shift, status_dict in employee_attendance.items():
row = {'shift': shift} row = {"shift": shift}
for day in range(1, total_days + 1): for day in range(1, total_days + 1):
status = status_dict.get(day) status = status_dict.get(day)
if status is None and holidays: if status is None and holidays:
status = get_holiday_status(day, holidays) status = get_holiday_status(day, holidays)
abbr = status_map.get(status, '') abbr = status_map.get(status, "")
row[day] = abbr row[day] = abbr
attendance_values.append(row) attendance_values.append(row)
@ -435,22 +493,22 @@ def get_attendance_status_for_detailed_view(employee: str, filters: Filters, emp
def get_holiday_status(day: int, holidays: List) -> str: def get_holiday_status(day: int, holidays: List) -> str:
status = None status = None
for holiday in holidays: for holiday in holidays:
if day == holiday.get('day_of_month'): if day == holiday.get("day_of_month"):
if holiday.get('weekly_off'): if holiday.get("weekly_off"):
status = 'Weekly Off' status = "Weekly Off"
else: else:
status = 'Holiday' status = "Holiday"
break break
return status return status
def get_leave_summary(employee: str, filters: Filters) -> Dict[str, float]: def get_leave_summary(employee: str, filters: Filters) -> Dict[str, float]:
"""Returns a dict of leave type and corresponding leaves taken by employee like: """Returns a dict of leave type and corresponding leaves taken by employee like:
{'leave_without_pay': 1.0, 'sick_leave': 2.0} {'leave_without_pay': 1.0, 'sick_leave': 2.0}
""" """
Attendance = frappe.qb.DocType('Attendance') Attendance = frappe.qb.DocType("Attendance")
day_case = frappe.qb.terms.Case().when(Attendance.status == 'Half Day', 0.5).else_(1) day_case = frappe.qb.terms.Case().when(Attendance.status == "Half Day", 0.5).else_(1)
sum_leave_days = Sum(day_case).as_('leave_days') sum_leave_days = Sum(day_case).as_("leave_days")
leave_details = ( leave_details = (
frappe.qb.from_(Attendance) frappe.qb.from_(Attendance)
@ -459,10 +517,11 @@ def get_leave_summary(employee: str, filters: Filters) -> Dict[str, float]:
(Attendance.employee == employee) (Attendance.employee == employee)
& (Attendance.docstatus == 1) & (Attendance.docstatus == 1)
& (Attendance.company == filters.company) & (Attendance.company == filters.company)
& ((Attendance.leave_type.isnotnull()) | (Attendance.leave_type != '')) & ((Attendance.leave_type.isnotnull()) | (Attendance.leave_type != ""))
& (Extract('month', Attendance.attendance_date) == filters.month) & (Extract("month", Attendance.attendance_date) == filters.month)
& (Extract('year', Attendance.attendance_date) == filters.year) & (Extract("year", Attendance.attendance_date) == filters.year)
).groupby(Attendance.leave_type) )
.groupby(Attendance.leave_type)
).run(as_dict=True) ).run(as_dict=True)
leaves = {} leaves = {}
@ -475,15 +534,15 @@ def get_leave_summary(employee: str, filters: Filters) -> Dict[str, float]:
def get_entry_exits_summary(employee: str, filters: Filters) -> Dict[str, float]: def get_entry_exits_summary(employee: str, filters: Filters) -> Dict[str, float]:
"""Returns total late entries and total early exits for employee like: """Returns total late entries and total early exits for employee like:
{'total_late_entries': 5, 'total_early_exits': 2} {'total_late_entries': 5, 'total_early_exits': 2}
""" """
Attendance = frappe.qb.DocType('Attendance') Attendance = frappe.qb.DocType("Attendance")
late_entry_case = frappe.qb.terms.Case().when(Attendance.late_entry == '1', '1') late_entry_case = frappe.qb.terms.Case().when(Attendance.late_entry == "1", "1")
count_late_entries = Count(late_entry_case).as_('total_late_entries') count_late_entries = Count(late_entry_case).as_("total_late_entries")
early_exit_case = frappe.qb.terms.Case().when(Attendance.early_exit == '1', '1') early_exit_case = frappe.qb.terms.Case().when(Attendance.early_exit == "1", "1")
count_early_exits = Count(early_exit_case).as_('total_early_exits') count_early_exits = Count(early_exit_case).as_("total_early_exits")
entry_exits = ( entry_exits = (
frappe.qb.from_(Attendance) frappe.qb.from_(Attendance)
@ -492,8 +551,8 @@ def get_entry_exits_summary(employee: str, filters: Filters) -> Dict[str, float]
(Attendance.docstatus == 1) (Attendance.docstatus == 1)
& (Attendance.employee == employee) & (Attendance.employee == employee)
& (Attendance.company == filters.company) & (Attendance.company == filters.company)
& (Extract('month', Attendance.attendance_date) == filters.month) & (Extract("month", Attendance.attendance_date) == filters.month)
& (Extract('year', Attendance.attendance_date) == filters.year) & (Extract("year", Attendance.attendance_date) == filters.year)
) )
).run(as_dict=True) ).run(as_dict=True)
@ -503,10 +562,10 @@ def get_entry_exits_summary(employee: str, filters: Filters) -> Dict[str, float]
@frappe.whitelist() @frappe.whitelist()
def get_attendance_years() -> str: def get_attendance_years() -> str:
"""Returns all the years for which attendance records exist""" """Returns all the years for which attendance records exist"""
Attendance = frappe.qb.DocType('Attendance') Attendance = frappe.qb.DocType("Attendance")
year_list = ( year_list = (
frappe.qb.from_(Attendance) frappe.qb.from_(Attendance)
.select(Extract('year', Attendance.attendance_date).as_('year')) .select(Extract("year", Attendance.attendance_date).as_("year"))
.distinct() .distinct()
).run(as_dict=True) ).run(as_dict=True)
@ -526,21 +585,21 @@ def get_chart_data(attendance_map: Dict, filters: Filters) -> Dict:
leave = [] leave = []
for day in days: for day in days:
labels.append(day['label']) labels.append(day["label"])
total_absent_on_day = total_leaves_on_day = total_present_on_day = 0 total_absent_on_day = total_leaves_on_day = total_present_on_day = 0
for employee, attendance_dict in attendance_map.items(): for employee, attendance_dict in attendance_map.items():
for shift, attendance in attendance_dict.items(): for shift, attendance in attendance_dict.items():
attendance_on_day = attendance.get(day['fieldname']) attendance_on_day = attendance.get(day["fieldname"])
if attendance_on_day == 'Absent': if attendance_on_day == "Absent":
total_absent_on_day += 1 total_absent_on_day += 1
elif attendance_on_day in ['Present', 'Work From Home']: elif attendance_on_day in ["Present", "Work From Home"]:
total_present_on_day += 1 total_present_on_day += 1
elif attendance_on_day == 'Half Day': elif attendance_on_day == "Half Day":
total_present_on_day += 0.5 total_present_on_day += 0.5
total_leaves_on_day += 0.5 total_leaves_on_day += 0.5
elif attendance_on_day == 'On Leave': elif attendance_on_day == "On Leave":
total_leaves_on_day += 1 total_leaves_on_day += 1
absent.append(total_absent_on_day) absent.append(total_absent_on_day)
@ -548,14 +607,14 @@ def get_chart_data(attendance_map: Dict, filters: Filters) -> Dict:
leave.append(total_leaves_on_day) leave.append(total_leaves_on_day)
return { return {
'data': { "data": {
'labels': labels, "labels": labels,
'datasets': [ "datasets": [
{'name': 'Absent', 'values': absent}, {"name": "Absent", "values": absent},
{'name': 'Present', 'values': present}, {"name": "Present", "values": present},
{'name': 'Leave', 'values': leave}, {"name": "Leave", "values": leave},
] ],
}, },
'type': 'line', "type": "line",
'colors': ['red', 'green', 'blue'], "colors": ["red", "green", "blue"],
} }

View File

@ -7,10 +7,7 @@ from erpnext.hr.doctype.attendance.attendance import mark_attendance
from erpnext.hr.doctype.employee.test_employee import make_employee from erpnext.hr.doctype.employee.test_employee import make_employee
from erpnext.hr.doctype.holiday_list.test_holiday_list import set_holiday_list from erpnext.hr.doctype.holiday_list.test_holiday_list import set_holiday_list
from erpnext.hr.doctype.leave_application.test_leave_application import make_allocation_record from erpnext.hr.doctype.leave_application.test_leave_application import make_allocation_record
from erpnext.hr.report.monthly_attendance_sheet.monthly_attendance_sheet import ( from erpnext.hr.report.monthly_attendance_sheet.monthly_attendance_sheet import execute
execute,
get_total_days_in_month,
)
from erpnext.payroll.doctype.salary_slip.test_salary_slip import make_leave_application from erpnext.payroll.doctype.salary_slip.test_salary_slip import make_leave_application
test_dependencies = ["Shift Type"] test_dependencies = ["Shift Type"]