diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs
index 3bc22af96a..e9cb6cf903 100644
--- a/.git-blame-ignore-revs
+++ b/.git-blame-ignore-revs
@@ -26,6 +26,3 @@ b147b85e6ac19a9220cd1e2958a6ebd99373283a
# bulk format python code with black
494bd9ef78313436f0424b918f200dab8fc7c20b
-
-# bulk format python code with black
-baec607ff5905b1c67531096a9cf50ec7ff00a5d
\ No newline at end of file
diff --git a/erpnext/hr/doctype/attendance/attendance.py b/erpnext/hr/doctype/attendance/attendance.py
index e43d40ef56..7f4bd83685 100644
--- a/erpnext/hr/doctype/attendance/attendance.py
+++ b/erpnext/hr/doctype/attendance/attendance.py
@@ -5,21 +5,11 @@
import frappe
from frappe import _
from frappe.model.document import Document
-from frappe.query_builder import Criterion
-from frappe.utils import cint, cstr, formatdate, get_datetime, get_link_to_form, getdate, nowdate
+from frappe.utils import cint, cstr, formatdate, get_datetime, getdate, nowdate
-from erpnext.hr.doctype.shift_assignment.shift_assignment import has_overlapping_timings
from erpnext.hr.utils import get_holiday_dates_for_employee, validate_active_employee
-class DuplicateAttendanceError(frappe.ValidationError):
- pass
-
-
-class OverlappingShiftAttendanceError(frappe.ValidationError):
- pass
-
-
class Attendance(Document):
def validate(self):
from erpnext.controllers.status_updater import validate_status
@@ -28,7 +18,6 @@ class Attendance(Document):
validate_active_employee(self.employee)
self.validate_attendance_date()
self.validate_duplicate_record()
- self.validate_overlapping_shift_attendance()
self.validate_employee_status()
self.check_leave_record()
@@ -46,35 +35,21 @@ class Attendance(Document):
frappe.throw(_("Attendance date can not be less than employee's joining date"))
def validate_duplicate_record(self):
- duplicate = get_duplicate_attendance_record(
- self.employee, self.attendance_date, self.shift, self.name
+ res = frappe.db.sql(
+ """
+ select name from `tabAttendance`
+ where employee = %s
+ and attendance_date = %s
+ and name != %s
+ and docstatus != 2
+ """,
+ (self.employee, getdate(self.attendance_date), self.name),
)
-
- if duplicate:
+ if res:
frappe.throw(
- _("Attendance for employee {0} is already marked for the date {1}: {2}").format(
- frappe.bold(self.employee),
- frappe.bold(self.attendance_date),
- get_link_to_form("Attendance", duplicate[0].name),
- ),
- title=_("Duplicate Attendance"),
- exc=DuplicateAttendanceError,
- )
-
- def validate_overlapping_shift_attendance(self):
- attendance = get_overlapping_shift_attendance(
- self.employee, self.attendance_date, self.shift, self.name
- )
-
- if attendance:
- frappe.throw(
- _("Attendance for employee {0} is already marked for an overlapping shift {1}: {2}").format(
- frappe.bold(self.employee),
- frappe.bold(attendance.shift),
- get_link_to_form("Attendance", attendance.name),
- ),
- title=_("Overlapping Shift Attendance"),
- exc=OverlappingShiftAttendanceError,
+ _("Attendance for employee {0} is already marked for the date {1}").format(
+ frappe.bold(self.employee), frappe.bold(self.attendance_date)
+ )
)
def validate_employee_status(self):
@@ -128,69 +103,6 @@ class Attendance(Document):
frappe.throw(_("Employee {0} is not active or does not exist").format(self.employee))
-def get_duplicate_attendance_record(employee, attendance_date, shift, name=None):
- attendance = frappe.qb.DocType("Attendance")
- query = (
- frappe.qb.from_(attendance)
- .select(attendance.name)
- .where((attendance.employee == employee) & (attendance.docstatus < 2))
- )
-
- if shift:
- query = query.where(
- Criterion.any(
- [
- Criterion.all(
- [
- ((attendance.shift.isnull()) | (attendance.shift == "")),
- (attendance.attendance_date == attendance_date),
- ]
- ),
- Criterion.all(
- [
- ((attendance.shift.isnotnull()) | (attendance.shift != "")),
- (attendance.attendance_date == attendance_date),
- (attendance.shift == shift),
- ]
- ),
- ]
- )
- )
- else:
- query = query.where((attendance.attendance_date == attendance_date))
-
- if name:
- query = query.where(attendance.name != name)
-
- return query.run(as_dict=True)
-
-
-def get_overlapping_shift_attendance(employee, attendance_date, shift, name=None):
- if not shift:
- return {}
-
- attendance = frappe.qb.DocType("Attendance")
- query = (
- frappe.qb.from_(attendance)
- .select(attendance.name, attendance.shift)
- .where(
- (attendance.employee == employee)
- & (attendance.docstatus < 2)
- & (attendance.attendance_date == attendance_date)
- & (attendance.shift != shift)
- )
- )
-
- if name:
- query = query.where(attendance.name != name)
-
- overlapping_attendance = query.run(as_dict=True)
-
- if overlapping_attendance and has_overlapping_timings(shift, overlapping_attendance[0].shift):
- return overlapping_attendance[0]
- return {}
-
-
@frappe.whitelist()
def get_events(start, end, filters=None):
events = []
@@ -229,39 +141,28 @@ def add_attendance(events, start, end, conditions=None):
def mark_attendance(
- employee,
- attendance_date,
- status,
- shift=None,
- leave_type=None,
- ignore_validate=False,
- late_entry=False,
- early_exit=False,
+ employee, attendance_date, status, shift=None, leave_type=None, ignore_validate=False
):
- if get_duplicate_attendance_record(employee, attendance_date, shift):
- return
-
- if get_overlapping_shift_attendance(employee, attendance_date, shift):
- return
-
- company = frappe.db.get_value("Employee", employee, "company")
- attendance = frappe.get_doc(
- {
- "doctype": "Attendance",
- "employee": employee,
- "attendance_date": attendance_date,
- "status": status,
- "company": company,
- "shift": shift,
- "leave_type": leave_type,
- "late_entry": late_entry,
- "early_exit": early_exit,
- }
- )
- attendance.flags.ignore_validate = ignore_validate
- attendance.insert()
- attendance.submit()
- return attendance.name
+ if not frappe.db.exists(
+ "Attendance",
+ {"employee": employee, "attendance_date": attendance_date, "docstatus": ("!=", "2")},
+ ):
+ company = frappe.db.get_value("Employee", employee, "company")
+ attendance = frappe.get_doc(
+ {
+ "doctype": "Attendance",
+ "employee": employee,
+ "attendance_date": attendance_date,
+ "status": status,
+ "company": company,
+ "shift": shift,
+ "leave_type": leave_type,
+ }
+ )
+ attendance.flags.ignore_validate = ignore_validate
+ attendance.insert()
+ attendance.submit()
+ return attendance.name
@frappe.whitelist()
diff --git a/erpnext/hr/doctype/attendance/test_attendance.py b/erpnext/hr/doctype/attendance/test_attendance.py
index 762d0f7567..058bc93d72 100644
--- a/erpnext/hr/doctype/attendance/test_attendance.py
+++ b/erpnext/hr/doctype/attendance/test_attendance.py
@@ -6,8 +6,6 @@ from frappe.tests.utils import FrappeTestCase
from frappe.utils import add_days, get_year_ending, get_year_start, getdate, now_datetime, nowdate
from erpnext.hr.doctype.attendance.attendance import (
- DuplicateAttendanceError,
- OverlappingShiftAttendanceError,
get_month_map,
get_unmarked_days,
mark_attendance,
@@ -25,112 +23,11 @@ class TestAttendance(FrappeTestCase):
from_date = get_year_start(getdate())
to_date = get_year_ending(getdate())
self.holiday_list = make_holiday_list(from_date=from_date, to_date=to_date)
- frappe.db.delete("Attendance")
-
- def test_duplicate_attendance(self):
- employee = make_employee("test_duplicate_attendance@example.com", company="_Test Company")
- date = nowdate()
-
- mark_attendance(employee, date, "Present")
- attendance = frappe.get_doc(
- {
- "doctype": "Attendance",
- "employee": employee,
- "attendance_date": date,
- "status": "Absent",
- "company": "_Test Company",
- }
- )
-
- self.assertRaises(DuplicateAttendanceError, attendance.insert)
-
- def test_duplicate_attendance_with_shift(self):
- from erpnext.hr.doctype.shift_type.test_shift_type import setup_shift_type
-
- employee = make_employee("test_duplicate_attendance@example.com", company="_Test Company")
- date = nowdate()
-
- shift_1 = setup_shift_type(shift_type="Shift 1", start_time="08:00:00", end_time="10:00:00")
- mark_attendance(employee, date, "Present", shift=shift_1.name)
-
- # attendance record with shift
- attendance = frappe.get_doc(
- {
- "doctype": "Attendance",
- "employee": employee,
- "attendance_date": date,
- "status": "Absent",
- "company": "_Test Company",
- "shift": shift_1.name,
- }
- )
-
- self.assertRaises(DuplicateAttendanceError, attendance.insert)
-
- # attendance record without any shift
- attendance = frappe.get_doc(
- {
- "doctype": "Attendance",
- "employee": employee,
- "attendance_date": date,
- "status": "Absent",
- "company": "_Test Company",
- }
- )
-
- self.assertRaises(DuplicateAttendanceError, attendance.insert)
-
- def test_overlapping_shift_attendance_validation(self):
- from erpnext.hr.doctype.shift_type.test_shift_type import setup_shift_type
-
- employee = make_employee("test_overlap_attendance@example.com", company="_Test Company")
- date = nowdate()
-
- shift_1 = setup_shift_type(shift_type="Shift 1", start_time="08:00:00", end_time="10:00:00")
- shift_2 = setup_shift_type(shift_type="Shift 2", start_time="09:30:00", end_time="11:00:00")
-
- mark_attendance(employee, date, "Present", shift=shift_1.name)
-
- # attendance record with overlapping shift
- attendance = frappe.get_doc(
- {
- "doctype": "Attendance",
- "employee": employee,
- "attendance_date": date,
- "status": "Absent",
- "company": "_Test Company",
- "shift": shift_2.name,
- }
- )
-
- self.assertRaises(OverlappingShiftAttendanceError, attendance.insert)
-
- def test_allow_attendance_with_different_shifts(self):
- # allows attendance with 2 different non-overlapping shifts
- from erpnext.hr.doctype.shift_type.test_shift_type import setup_shift_type
-
- employee = make_employee("test_duplicate_attendance@example.com", company="_Test Company")
- date = nowdate()
-
- shift_1 = setup_shift_type(shift_type="Shift 1", start_time="08:00:00", end_time="10:00:00")
- shift_2 = setup_shift_type(shift_type="Shift 2", start_time="11:00:00", end_time="12:00:00")
-
- mark_attendance(employee, date, "Present", shift_1.name)
- frappe.get_doc(
- {
- "doctype": "Attendance",
- "employee": employee,
- "attendance_date": date,
- "status": "Absent",
- "company": "_Test Company",
- "shift": shift_2.name,
- }
- ).insert()
def test_mark_absent(self):
employee = make_employee("test_mark_absent@example.com")
date = nowdate()
-
+ frappe.db.delete("Attendance", {"employee": employee, "attendance_date": date})
attendance = mark_attendance(employee, date, "Absent")
fetch_attendance = frappe.get_value(
"Attendance", {"employee": employee, "attendance_date": date, "status": "Absent"}
@@ -145,6 +42,7 @@ class TestAttendance(FrappeTestCase):
employee = make_employee(
"test_unmarked_days@example.com", date_of_joining=add_days(first_day, -1)
)
+ frappe.db.delete("Attendance", {"employee": employee})
frappe.db.set_value("Employee", employee, "holiday_list", self.holiday_list)
first_sunday = get_first_sunday(self.holiday_list, for_date=first_day)
@@ -169,6 +67,8 @@ class TestAttendance(FrappeTestCase):
employee = make_employee(
"test_unmarked_days@example.com", date_of_joining=add_days(first_day, -1)
)
+ frappe.db.delete("Attendance", {"employee": employee})
+
frappe.db.set_value("Employee", employee, "holiday_list", self.holiday_list)
first_sunday = get_first_sunday(self.holiday_list, for_date=first_day)
@@ -195,6 +95,7 @@ class TestAttendance(FrappeTestCase):
employee = make_employee(
"test_unmarked_days_as_per_doj@example.com", date_of_joining=doj, relieving_date=relieving_date
)
+ frappe.db.delete("Attendance", {"employee": employee})
frappe.db.set_value("Employee", employee, "holiday_list", self.holiday_list)
diff --git a/erpnext/hr/doctype/employee_checkin/employee_checkin.py b/erpnext/hr/doctype/employee_checkin/employee_checkin.py
index 64eb019b00..87f48b7e25 100644
--- a/erpnext/hr/doctype/employee_checkin/employee_checkin.py
+++ b/erpnext/hr/doctype/employee_checkin/employee_checkin.py
@@ -7,10 +7,6 @@ from frappe import _
from frappe.model.document import Document
from frappe.utils import cint, get_datetime
-from erpnext.hr.doctype.attendance.attendance import (
- get_duplicate_attendance_record,
- get_overlapping_shift_attendance,
-)
from erpnext.hr.doctype.shift_assignment.shift_assignment import (
get_actual_start_end_datetime_of_shift,
)
@@ -37,24 +33,24 @@ class EmployeeCheckin(Document):
shift_actual_timings = get_actual_start_end_datetime_of_shift(
self.employee, get_datetime(self.time), True
)
- if shift_actual_timings:
+ if shift_actual_timings[0] and shift_actual_timings[1]:
if (
- shift_actual_timings.shift_type.determine_check_in_and_check_out
+ shift_actual_timings[2].shift_type.determine_check_in_and_check_out
== "Strictly based on Log Type in Employee Checkin"
and not self.log_type
and not self.skip_auto_attendance
):
frappe.throw(
_("Log Type is required for check-ins falling in the shift: {0}.").format(
- shift_actual_timings.shift_type.name
+ shift_actual_timings[2].shift_type.name
)
)
if not self.attendance:
- self.shift = shift_actual_timings.shift_type.name
- self.shift_actual_start = shift_actual_timings.actual_start
- self.shift_actual_end = shift_actual_timings.actual_end
- self.shift_start = shift_actual_timings.start_datetime
- self.shift_end = shift_actual_timings.end_datetime
+ self.shift = shift_actual_timings[2].shift_type.name
+ self.shift_actual_start = shift_actual_timings[0]
+ self.shift_actual_end = shift_actual_timings[1]
+ self.shift_start = shift_actual_timings[2].start_datetime
+ self.shift_end = shift_actual_timings[2].end_datetime
else:
self.shift = None
@@ -140,10 +136,10 @@ def mark_attendance_and_link_log(
return None
elif attendance_status in ("Present", "Absent", "Half Day"):
employee_doc = frappe.get_doc("Employee", employee)
- duplicate = get_duplicate_attendance_record(employee, attendance_date, shift)
- overlapping = get_overlapping_shift_attendance(employee, attendance_date, shift)
-
- if not duplicate and not overlapping:
+ if not frappe.db.exists(
+ "Attendance",
+ {"employee": employee, "attendance_date": attendance_date, "docstatus": ("!=", "2")},
+ ):
doc_dict = {
"doctype": "Attendance",
"employee": employee,
@@ -236,7 +232,7 @@ def calculate_working_hours(logs, check_in_out_type, working_hours_calc_type):
def time_diff_in_hours(start, end):
- return round(float((end - start).total_seconds()) / 3600, 2)
+ return round((end - start).total_seconds() / 3600, 1)
def find_index_in_dict(dict_list, key, value):
diff --git a/erpnext/hr/doctype/employee_checkin/test_employee_checkin.py b/erpnext/hr/doctype/employee_checkin/test_employee_checkin.py
index 81b44f8fea..97f76b0350 100644
--- a/erpnext/hr/doctype/employee_checkin/test_employee_checkin.py
+++ b/erpnext/hr/doctype/employee_checkin/test_employee_checkin.py
@@ -2,19 +2,10 @@
# See license.txt
import unittest
-from datetime import datetime, timedelta
+from datetime import timedelta
import frappe
-from frappe.tests.utils import FrappeTestCase
-from frappe.utils import (
- add_days,
- get_time,
- get_year_ending,
- get_year_start,
- getdate,
- now_datetime,
- nowdate,
-)
+from frappe.utils import now_datetime, nowdate
from erpnext.hr.doctype.employee.test_employee import make_employee
from erpnext.hr.doctype.employee_checkin.employee_checkin import (
@@ -22,22 +13,9 @@ from erpnext.hr.doctype.employee_checkin.employee_checkin import (
calculate_working_hours,
mark_attendance_and_link_log,
)
-from erpnext.hr.doctype.holiday_list.test_holiday_list import set_holiday_list
-from erpnext.hr.doctype.leave_application.test_leave_application import get_first_sunday
-from erpnext.hr.doctype.shift_type.test_shift_type import make_shift_assignment, setup_shift_type
-from erpnext.payroll.doctype.salary_slip.test_salary_slip import make_holiday_list
-class TestEmployeeCheckin(FrappeTestCase):
- def setUp(self):
- frappe.db.delete("Shift Type")
- frappe.db.delete("Shift Assignment")
- frappe.db.delete("Employee Checkin")
-
- from_date = get_year_start(getdate())
- to_date = get_year_ending(getdate())
- self.holiday_list = make_holiday_list(from_date=from_date, to_date=to_date)
-
+class TestEmployeeCheckin(unittest.TestCase):
def test_add_log_based_on_employee_field(self):
employee = make_employee("test_add_log_based_on_employee_field@example.com")
employee = frappe.get_doc("Employee", employee)
@@ -125,163 +103,6 @@ class TestEmployeeCheckin(FrappeTestCase):
)
self.assertEqual(working_hours, (4.5, logs_type_2[1].time, logs_type_2[-1].time))
- def test_fetch_shift(self):
- employee = make_employee("test_employee_checkin@example.com", company="_Test Company")
-
- # shift setup for 8-12
- shift_type = setup_shift_type()
- date = getdate()
- make_shift_assignment(shift_type.name, employee, date)
-
- # within shift time
- timestamp = datetime.combine(date, get_time("08:45:00"))
- log = make_checkin(employee, timestamp)
- self.assertEqual(log.shift, shift_type.name)
-
- # "begin checkin before shift time" = 60 mins, so should work for 7:00:00
- timestamp = datetime.combine(date, get_time("07:00:00"))
- log = make_checkin(employee, timestamp)
- self.assertEqual(log.shift, shift_type.name)
-
- # "allow checkout after shift end time" = 60 mins, so should work for 13:00:00
- timestamp = datetime.combine(date, get_time("13:00:00"))
- log = make_checkin(employee, timestamp)
- self.assertEqual(log.shift, shift_type.name)
-
- # should not fetch this shift beyond allowed time
- timestamp = datetime.combine(date, get_time("13:01:00"))
- log = make_checkin(employee, timestamp)
- self.assertIsNone(log.shift)
-
- def test_shift_start_and_end_timings(self):
- employee = make_employee("test_employee_checkin@example.com", company="_Test Company")
-
- # shift setup for 8-12
- shift_type = setup_shift_type()
- date = getdate()
- make_shift_assignment(shift_type.name, employee, date)
-
- timestamp = datetime.combine(date, get_time("08:45:00"))
- log = make_checkin(employee, timestamp)
-
- self.assertEqual(log.shift, shift_type.name)
- self.assertEqual(log.shift_start, datetime.combine(date, get_time("08:00:00")))
- self.assertEqual(log.shift_end, datetime.combine(date, get_time("12:00:00")))
- self.assertEqual(log.shift_actual_start, datetime.combine(date, get_time("07:00:00")))
- self.assertEqual(log.shift_actual_end, datetime.combine(date, get_time("13:00:00")))
-
- def test_fetch_shift_based_on_default_shift(self):
- employee = make_employee("test_default_shift@example.com", company="_Test Company")
- default_shift = setup_shift_type(
- shift_type="Default Shift", start_time="14:00:00", end_time="16:00:00"
- )
-
- date = getdate()
- frappe.db.set_value("Employee", employee, "default_shift", default_shift.name)
-
- timestamp = datetime.combine(date, get_time("14:45:00"))
- log = make_checkin(employee, timestamp)
-
- # should consider default shift
- self.assertEqual(log.shift, default_shift.name)
-
- def test_fetch_shift_spanning_over_two_days(self):
- employee = make_employee("test_employee_checkin@example.com", company="_Test Company")
- shift_type = setup_shift_type(
- shift_type="Midnight Shift", start_time="23:00:00", end_time="01:00:00"
- )
- date = getdate()
- next_day = add_days(date, 1)
- make_shift_assignment(shift_type.name, employee, date)
-
- # log falls in the first day
- timestamp = datetime.combine(date, get_time("23:00:00"))
- log = make_checkin(employee, timestamp)
-
- self.assertEqual(log.shift, shift_type.name)
- self.assertEqual(log.shift_start, datetime.combine(date, get_time("23:00:00")))
- self.assertEqual(log.shift_end, datetime.combine(next_day, get_time("01:00:00")))
- self.assertEqual(log.shift_actual_start, datetime.combine(date, get_time("22:00:00")))
- self.assertEqual(log.shift_actual_end, datetime.combine(next_day, get_time("02:00:00")))
-
- log.delete()
-
- # log falls in the second day
- prev_day = add_days(date, -1)
- timestamp = datetime.combine(date, get_time("01:30:00"))
- log = make_checkin(employee, timestamp)
- self.assertEqual(log.shift, shift_type.name)
- self.assertEqual(log.shift_start, datetime.combine(prev_day, get_time("23:00:00")))
- self.assertEqual(log.shift_end, datetime.combine(date, get_time("01:00:00")))
- self.assertEqual(log.shift_actual_start, datetime.combine(prev_day, get_time("22:00:00")))
- self.assertEqual(log.shift_actual_end, datetime.combine(date, get_time("02:00:00")))
-
- def test_no_shift_fetched_on_holiday_as_per_shift_holiday_list(self):
- date = getdate()
- from_date = get_year_start(date)
- to_date = get_year_ending(date)
- holiday_list = make_holiday_list(from_date=from_date, to_date=to_date)
-
- employee = make_employee("test_shift_with_holiday@example.com", company="_Test Company")
- setup_shift_type(shift_type="Test Holiday Shift", holiday_list=holiday_list)
-
- first_sunday = get_first_sunday(holiday_list, for_date=date)
- timestamp = datetime.combine(first_sunday, get_time("08:00:00"))
- log = make_checkin(employee, timestamp)
-
- self.assertIsNone(log.shift)
-
- @set_holiday_list("Salary Slip Test Holiday List", "_Test Company")
- def test_no_shift_fetched_on_holiday_as_per_employee_holiday_list(self):
- employee = make_employee("test_shift_with_holiday@example.com", company="_Test Company")
- shift_type = setup_shift_type(shift_type="Test Holiday Shift")
- shift_type.holiday_list = None
- shift_type.save()
-
- date = getdate()
-
- first_sunday = get_first_sunday(self.holiday_list, for_date=date)
- timestamp = datetime.combine(first_sunday, get_time("08:00:00"))
- log = make_checkin(employee, timestamp)
-
- self.assertIsNone(log.shift)
-
- def test_consecutive_shift_assignments_overlapping_within_grace_period(self):
- # test adjustment for start and end times if they are overlapping
- # within "begin_check_in_before_shift_start_time" and "allow_check_out_after_shift_end_time" periods
- employee = make_employee("test_shift@example.com", company="_Test Company")
-
- # 8 - 12
- shift1 = setup_shift_type()
- # 12:30 - 16:30
- shift2 = setup_shift_type(
- shift_type="Consecutive Shift", start_time="12:30:00", end_time="16:30:00"
- )
-
- # the actual start and end times (with grace) for these shifts are 7 - 13 and 11:30 - 17:30
- date = getdate()
- make_shift_assignment(shift1.name, employee, date)
- make_shift_assignment(shift2.name, employee, date)
-
- # log at 12:30 should set shift2 and actual start as 12 and not 11:30
- timestamp = datetime.combine(date, get_time("12:30:00"))
- log = make_checkin(employee, timestamp)
- self.assertEqual(log.shift, shift2.name)
- self.assertEqual(log.shift_start, datetime.combine(date, get_time("12:30:00")))
- self.assertEqual(log.shift_actual_start, datetime.combine(date, get_time("12:00:00")))
-
- # log at 12:00 should set shift1 and actual end as 12 and not 1 since the next shift's grace starts
- timestamp = datetime.combine(date, get_time("12:00:00"))
- log = make_checkin(employee, timestamp)
- self.assertEqual(log.shift, shift1.name)
- self.assertEqual(log.shift_end, datetime.combine(date, get_time("12:00:00")))
- self.assertEqual(log.shift_actual_end, datetime.combine(date, get_time("12:00:00")))
-
- # log at 12:01 should set shift2
- timestamp = datetime.combine(date, get_time("12:01:00"))
- log = make_checkin(employee, timestamp)
- self.assertEqual(log.shift, shift2.name)
-
def make_n_checkins(employee, n, hours_to_reverse=1):
logs = [make_checkin(employee, now_datetime() - timedelta(hours=hours_to_reverse, minutes=n + 1))]
diff --git a/erpnext/hr/doctype/shift_assignment/shift_assignment.py b/erpnext/hr/doctype/shift_assignment/shift_assignment.py
index 0b21c00eac..f6bd15951d 100644
--- a/erpnext/hr/doctype/shift_assignment/shift_assignment.py
+++ b/erpnext/hr/doctype/shift_assignment/shift_assignment.py
@@ -3,120 +3,83 @@
from datetime import datetime, timedelta
-from typing import Dict, List
import frappe
from frappe import _
from frappe.model.document import Document
-from frappe.query_builder import Criterion
-from frappe.utils import cstr, get_datetime, get_link_to_form, get_time, getdate, now_datetime
+from frappe.utils import cstr, getdate, now_datetime, nowdate
from erpnext.hr.doctype.employee.employee import get_holiday_list_for_employee
from erpnext.hr.doctype.holiday_list.holiday_list import is_holiday
from erpnext.hr.utils import validate_active_employee
-class OverlappingShiftError(frappe.ValidationError):
- pass
-
-
class ShiftAssignment(Document):
def validate(self):
validate_active_employee(self.employee)
- self.validate_overlapping_shifts()
+ self.validate_overlapping_dates()
if self.end_date:
self.validate_from_to_dates("start_date", "end_date")
- def validate_overlapping_shifts(self):
- overlapping_dates = self.get_overlapping_dates()
- if len(overlapping_dates):
- # if dates are overlapping, check if timings are overlapping, else allow
- overlapping_timings = has_overlapping_timings(self.shift_type, overlapping_dates[0].shift_type)
- if overlapping_timings:
- self.throw_overlap_error(overlapping_dates[0])
-
- def get_overlapping_dates(self):
+ def validate_overlapping_dates(self):
if not self.name:
self.name = "New Shift Assignment"
- shift = frappe.qb.DocType("Shift Assignment")
- query = (
- frappe.qb.from_(shift)
- .select(shift.name, shift.shift_type, shift.docstatus, shift.status)
- .where(
- (shift.employee == self.employee)
- & (shift.docstatus == 1)
- & (shift.name != self.name)
- & (shift.status == "Active")
- )
- )
+ condition = """and (
+ end_date is null
+ or
+ %(start_date)s between start_date and end_date
+ """
if self.end_date:
- query = query.where(
- Criterion.any(
- [
- Criterion.any(
- [
- shift.end_date.isnull(),
- ((self.start_date >= shift.start_date) & (self.start_date <= shift.end_date)),
- ]
- ),
- Criterion.any(
- [
- ((self.end_date >= shift.start_date) & (self.end_date <= shift.end_date)),
- shift.start_date.between(self.start_date, self.end_date),
- ]
- ),
- ]
- )
- )
+ condition += """ or
+ %(end_date)s between start_date and end_date
+ or
+ start_date between %(start_date)s and %(end_date)s
+ ) """
else:
- query = query.where(
- shift.end_date.isnull()
- | ((self.start_date >= shift.start_date) & (self.start_date <= shift.end_date))
- )
+ condition += """ ) """
- return query.run(as_dict=True)
+ assigned_shifts = frappe.db.sql(
+ """
+ select name, shift_type, start_date ,end_date, docstatus, status
+ from `tabShift Assignment`
+ where
+ employee=%(employee)s and docstatus = 1
+ and name != %(name)s
+ and status = "Active"
+ {0}
+ """.format(
+ condition
+ ),
+ {
+ "employee": self.employee,
+ "shift_type": self.shift_type,
+ "start_date": self.start_date,
+ "end_date": self.end_date,
+ "name": self.name,
+ },
+ as_dict=1,
+ )
+
+ if len(assigned_shifts):
+ self.throw_overlap_error(assigned_shifts[0])
def throw_overlap_error(self, shift_details):
shift_details = frappe._dict(shift_details)
if shift_details.docstatus == 1 and shift_details.status == "Active":
- msg = _(
- "Employee {0} already has an active Shift {1}: {2} that overlaps within this period."
- ).format(
- frappe.bold(self.employee),
- frappe.bold(shift_details.shift_type),
- get_link_to_form("Shift Assignment", shift_details.name),
+ msg = _("Employee {0} already has Active Shift {1}: {2}").format(
+ frappe.bold(self.employee), frappe.bold(self.shift_type), frappe.bold(shift_details.name)
)
- frappe.throw(msg, title=_("Overlapping Shifts"), exc=OverlappingShiftError)
-
-
-def has_overlapping_timings(shift_1: str, shift_2: str) -> bool:
- """
- Accepts two shift types and checks whether their timings are overlapping
- """
- curr_shift = frappe.db.get_value("Shift Type", shift_1, ["start_time", "end_time"], as_dict=True)
- overlapping_shift = frappe.db.get_value(
- "Shift Type", shift_2, ["start_time", "end_time"], as_dict=True
- )
-
- if (
- (
- curr_shift.start_time > overlapping_shift.start_time
- and curr_shift.start_time < overlapping_shift.end_time
- )
- or (
- curr_shift.end_time > overlapping_shift.start_time
- and curr_shift.end_time < overlapping_shift.end_time
- )
- or (
- curr_shift.start_time <= overlapping_shift.start_time
- and curr_shift.end_time >= overlapping_shift.end_time
- )
- ):
- return True
- return False
+ if shift_details.start_date:
+ msg += " " + _("from {0}").format(getdate(self.start_date).strftime("%d-%m-%Y"))
+ title = "Ongoing Shift"
+ if shift_details.end_date:
+ msg += " " + _("to {0}").format(getdate(self.end_date).strftime("%d-%m-%Y"))
+ title = "Active Shift"
+ if msg:
+ frappe.throw(msg, title=title)
@frappe.whitelist()
@@ -192,195 +155,102 @@ def get_shift_type_timing(shift_types):
return shift_timing_map
-def get_shift_for_time(shifts: List[Dict], for_timestamp: datetime) -> Dict:
- """Returns shift with details for given timestamp"""
- valid_shifts = []
-
- for entry in shifts:
- shift_details = get_shift_details(entry.shift_type, for_timestamp=for_timestamp)
-
- if (
- get_datetime(shift_details.actual_start)
- <= get_datetime(for_timestamp)
- <= get_datetime(shift_details.actual_end)
- ):
- valid_shifts.append(shift_details)
-
- valid_shifts.sort(key=lambda x: x["actual_start"])
-
- if len(valid_shifts) > 1:
- for i in range(len(valid_shifts) - 1):
- # comparing 2 consecutive shifts and adjusting start and end times
- # if they are overlapping within grace period
- curr_shift = valid_shifts[i]
- next_shift = valid_shifts[i + 1]
-
- if curr_shift and next_shift:
- next_shift.actual_start = (
- curr_shift.end_datetime
- if next_shift.actual_start < curr_shift.end_datetime
- else next_shift.actual_start
- )
- curr_shift.actual_end = (
- next_shift.actual_start
- if curr_shift.actual_end > next_shift.actual_start
- else curr_shift.actual_end
- )
-
- valid_shifts[i] = curr_shift
- valid_shifts[i + 1] = next_shift
-
- return get_exact_shift(valid_shifts, for_timestamp) or {}
-
- return (valid_shifts and valid_shifts[0]) or {}
-
-
-def get_shifts_for_date(employee: str, for_timestamp: datetime) -> List[Dict[str, str]]:
- """Returns list of shifts with details for given date"""
- assignment = frappe.qb.DocType("Shift Assignment")
-
- return (
- frappe.qb.from_(assignment)
- .select(assignment.name, assignment.shift_type)
- .where(
- (assignment.employee == employee)
- & (assignment.docstatus == 1)
- & (assignment.status == "Active")
- & (assignment.start_date <= getdate(for_timestamp.date()))
- & (
- Criterion.any(
- [
- assignment.end_date.isnull(),
- (assignment.end_date.isnotnull() & (getdate(for_timestamp.date()) >= assignment.end_date)),
- ]
- )
- )
- )
- ).run(as_dict=True)
-
-
-def get_shift_for_timestamp(employee: str, for_timestamp: datetime) -> Dict:
- shifts = get_shifts_for_date(employee, for_timestamp)
- if shifts:
- return get_shift_for_time(shifts, for_timestamp)
- return {}
-
-
def get_employee_shift(
- employee: str,
- for_timestamp: datetime = None,
- consider_default_shift: bool = False,
- next_shift_direction: str = None,
-) -> Dict:
+ employee, for_date=None, consider_default_shift=False, next_shift_direction=None
+):
"""Returns a Shift Type for the given employee on the given date. (excluding the holidays)
:param employee: Employee for which shift is required.
- :param for_timestamp: DateTime on which shift is required
+ :param for_date: Date on which shift are required
:param consider_default_shift: If set to true, default shift is taken when no shift assignment is found.
:param next_shift_direction: One of: None, 'forward', 'reverse'. Direction to look for next shift if shift not found on given date.
"""
- if for_timestamp is None:
- for_timestamp = now_datetime()
-
- shift_details = get_shift_for_timestamp(employee, for_timestamp)
-
- # if shift assignment is not found, consider default shift
+ if for_date is None:
+ for_date = nowdate()
default_shift = frappe.db.get_value("Employee", employee, "default_shift")
- if not shift_details and consider_default_shift:
- shift_details = get_shift_details(default_shift, for_timestamp)
-
- # if its a holiday, reset
- if shift_details and is_holiday_date(employee, shift_details):
- shift_details = None
-
- # if no shift is found, find next or prev shift assignment based on direction
- if not shift_details and next_shift_direction:
- shift_details = get_prev_or_next_shift(
- employee, for_timestamp, consider_default_shift, default_shift, next_shift_direction
- )
-
- return shift_details or {}
-
-
-def get_prev_or_next_shift(
- employee: str,
- for_timestamp: datetime,
- consider_default_shift: bool,
- default_shift: str,
- next_shift_direction: str,
-) -> Dict:
- """Returns a dict of shift details for the next or prev shift based on the next_shift_direction"""
- MAX_DAYS = 366
- shift_details = {}
-
- if consider_default_shift and default_shift:
- direction = -1 if next_shift_direction == "reverse" else 1
- for i in range(MAX_DAYS):
- date = for_timestamp + timedelta(days=direction * (i + 1))
- shift_details = get_employee_shift(employee, date, consider_default_shift, None)
- if shift_details:
- break
- else:
- direction = "<" if next_shift_direction == "reverse" else ">"
- sort_order = "desc" if next_shift_direction == "reverse" else "asc"
- dates = frappe.db.get_all(
- "Shift Assignment",
- ["start_date", "end_date"],
- {
- "employee": employee,
- "start_date": (direction, for_timestamp.date()),
- "docstatus": 1,
- "status": "Active",
- },
- as_list=True,
- limit=MAX_DAYS,
- order_by="start_date " + sort_order,
- )
-
- if dates:
- for date in dates:
- if date[1] and date[1] < for_timestamp.date():
- continue
- shift_details = get_employee_shift(
- employee, datetime.combine(date[0], for_timestamp.time()), consider_default_shift, None
- )
- if shift_details:
- break
-
- return shift_details or {}
-
-
-def is_holiday_date(employee: str, shift_details: Dict) -> bool:
- holiday_list_name = frappe.db.get_value(
- "Shift Type", shift_details.shift_type.name, "holiday_list"
+ shift_type_name = None
+ shift_assignment_details = frappe.db.get_value(
+ "Shift Assignment",
+ {"employee": employee, "start_date": ("<=", for_date), "docstatus": "1", "status": "Active"},
+ ["shift_type", "end_date"],
)
- if not holiday_list_name:
- holiday_list_name = get_holiday_list_for_employee(employee, False)
+ if shift_assignment_details:
+ shift_type_name = shift_assignment_details[0]
- return holiday_list_name and is_holiday(holiday_list_name, shift_details.start_datetime.date())
+ # if end_date present means that shift is over after end_date else it is a ongoing shift.
+ if shift_assignment_details[1] and for_date >= shift_assignment_details[1]:
+ shift_type_name = None
+
+ if not shift_type_name and consider_default_shift:
+ shift_type_name = default_shift
+ if shift_type_name:
+ holiday_list_name = frappe.db.get_value("Shift Type", shift_type_name, "holiday_list")
+ if not holiday_list_name:
+ holiday_list_name = get_holiday_list_for_employee(employee, False)
+ if holiday_list_name and is_holiday(holiday_list_name, for_date):
+ shift_type_name = None
+
+ if not shift_type_name and next_shift_direction:
+ MAX_DAYS = 366
+ if consider_default_shift and default_shift:
+ direction = -1 if next_shift_direction == "reverse" else +1
+ for i in range(MAX_DAYS):
+ date = for_date + timedelta(days=direction * (i + 1))
+ shift_details = get_employee_shift(employee, date, consider_default_shift, None)
+ if shift_details:
+ shift_type_name = shift_details.shift_type.name
+ for_date = date
+ break
+ else:
+ direction = "<" if next_shift_direction == "reverse" else ">"
+ sort_order = "desc" if next_shift_direction == "reverse" else "asc"
+ dates = frappe.db.get_all(
+ "Shift Assignment",
+ ["start_date", "end_date"],
+ {
+ "employee": employee,
+ "start_date": (direction, for_date),
+ "docstatus": "1",
+ "status": "Active",
+ },
+ as_list=True,
+ limit=MAX_DAYS,
+ order_by="start_date " + sort_order,
+ )
+
+ if dates:
+ for date in dates:
+ if date[1] and date[1] < for_date:
+ continue
+ shift_details = get_employee_shift(employee, date[0], consider_default_shift, None)
+ if shift_details:
+ shift_type_name = shift_details.shift_type.name
+ for_date = date[0]
+ break
+
+ return get_shift_details(shift_type_name, for_date)
-def get_employee_shift_timings(
- employee: str, for_timestamp: datetime = None, consider_default_shift: bool = False
-) -> List[Dict]:
+def get_employee_shift_timings(employee, for_timestamp=None, consider_default_shift=False):
"""Returns previous shift, current/upcoming shift, next_shift for the given timestamp and employee"""
if for_timestamp is None:
for_timestamp = now_datetime()
-
# write and verify a test case for midnight shift.
prev_shift = curr_shift = next_shift = None
- curr_shift = get_employee_shift(employee, for_timestamp, consider_default_shift, "forward")
+ curr_shift = get_employee_shift(employee, for_timestamp.date(), consider_default_shift, "forward")
if curr_shift:
next_shift = get_employee_shift(
- employee, curr_shift.start_datetime + timedelta(days=1), consider_default_shift, "forward"
+ employee,
+ curr_shift.start_datetime.date() + timedelta(days=1),
+ consider_default_shift,
+ "forward",
)
prev_shift = get_employee_shift(
- employee, for_timestamp + timedelta(days=-1), consider_default_shift, "reverse"
+ employee, for_timestamp.date() + timedelta(days=-1), consider_default_shift, "reverse"
)
if curr_shift:
- # adjust actual start and end times if they are overlapping with grace period (before start and after end)
if prev_shift:
curr_shift.actual_start = (
prev_shift.end_datetime
@@ -403,102 +273,31 @@ def get_employee_shift_timings(
if curr_shift.actual_end > next_shift.actual_start
else curr_shift.actual_end
)
-
return prev_shift, curr_shift, next_shift
-def get_actual_start_end_datetime_of_shift(
- employee: str, for_timestamp: datetime, consider_default_shift: bool = False
-) -> Dict:
- """Returns a Dict containing shift details with actual_start and actual_end datetime values
- Here 'actual' means taking into account the "begin_check_in_before_shift_start_time" and "allow_check_out_after_shift_end_time".
- Empty Dict is returned if the timestamp is outside any actual shift timings.
+def get_shift_details(shift_type_name, for_date=None):
+ """Returns Shift Details which contain some additional information as described below.
+ 'shift_details' contains the following keys:
+ 'shift_type' - Object of DocType Shift Type,
+ 'start_datetime' - Date and Time of shift start on given date,
+ 'end_datetime' - Date and Time of shift end on given date,
+ 'actual_start' - datetime of shift start after adding 'begin_check_in_before_shift_start_time',
+ 'actual_end' - datetime of shift end after adding 'allow_check_out_after_shift_end_time'(None is returned if this is zero)
- :param employee (str): Employee name
- :param for_timestamp (datetime, optional): Datetime value of checkin, if not provided considers current datetime
- :param consider_default_shift (bool, optional): Flag (defaults to False) to specify whether to consider
- default shift in employee master if no shift assignment is found
- """
- shift_timings_as_per_timestamp = get_employee_shift_timings(
- employee, for_timestamp, consider_default_shift
- )
- return get_exact_shift(shift_timings_as_per_timestamp, for_timestamp)
-
-
-def get_exact_shift(shifts: List, for_timestamp: datetime) -> Dict:
- """Returns the shift details (dict) for the exact shift in which the 'for_timestamp' value falls among multiple shifts"""
- shift_details = dict()
- timestamp_list = []
-
- for shift in shifts:
- if shift:
- timestamp_list.extend([shift.actual_start, shift.actual_end])
- else:
- timestamp_list.extend([None, None])
-
- timestamp_index = None
- for index, timestamp in enumerate(timestamp_list):
- if not timestamp:
- continue
-
- if for_timestamp < timestamp:
- timestamp_index = index
- elif for_timestamp == timestamp:
- # on timestamp boundary
- if index % 2 == 1:
- timestamp_index = index
- else:
- timestamp_index = index + 1
-
- if timestamp_index:
- break
-
- if timestamp_index and timestamp_index % 2 == 1:
- shift_details = shifts[int((timestamp_index - 1) / 2)]
-
- return shift_details
-
-
-def get_shift_details(shift_type_name: str, for_timestamp: datetime = None) -> Dict:
- """Returns a Dict containing shift details with the following data:
- 'shift_type' - Object of DocType Shift Type,
- 'start_datetime' - datetime of shift start on given timestamp,
- 'end_datetime' - datetime of shift end on given timestamp,
- 'actual_start' - datetime of shift start after adding 'begin_check_in_before_shift_start_time',
- 'actual_end' - datetime of shift end after adding 'allow_check_out_after_shift_end_time' (None is returned if this is zero)
-
- :param shift_type_name (str): shift type name for which shift_details are required.
- :param for_timestamp (datetime, optional): Datetime value of checkin, if not provided considers current datetime
+ :param shift_type_name: shift type name for which shift_details is required.
+ :param for_date: Date on which shift_details are required
"""
if not shift_type_name:
- return {}
-
- if for_timestamp is None:
- for_timestamp = now_datetime()
-
+ return None
+ if not for_date:
+ for_date = nowdate()
shift_type = frappe.get_doc("Shift Type", shift_type_name)
- shift_actual_start = shift_type.start_time - timedelta(
- minutes=shift_type.begin_check_in_before_shift_start_time
+ start_datetime = datetime.combine(for_date, datetime.min.time()) + shift_type.start_time
+ for_date = (
+ for_date + timedelta(days=1) if shift_type.start_time > shift_type.end_time else for_date
)
-
- if shift_type.start_time > shift_type.end_time:
- # shift spans accross 2 different days
- if get_time(for_timestamp.time()) >= get_time(shift_actual_start):
- # if for_timestamp is greater than start time, it's within the first day
- start_datetime = datetime.combine(for_timestamp, datetime.min.time()) + shift_type.start_time
- for_timestamp = for_timestamp + timedelta(days=1)
- end_datetime = datetime.combine(for_timestamp, datetime.min.time()) + shift_type.end_time
-
- elif get_time(for_timestamp.time()) < get_time(shift_actual_start):
- # if for_timestamp is less than start time, it's within the second day
- end_datetime = datetime.combine(for_timestamp, datetime.min.time()) + shift_type.end_time
- for_timestamp = for_timestamp + timedelta(days=-1)
- start_datetime = datetime.combine(for_timestamp, datetime.min.time()) + shift_type.start_time
- else:
- # start and end timings fall on the same day
- start_datetime = datetime.combine(for_timestamp, datetime.min.time()) + shift_type.start_time
- end_datetime = datetime.combine(for_timestamp, datetime.min.time()) + shift_type.end_time
-
+ end_datetime = datetime.combine(for_date, datetime.min.time()) + shift_type.end_time
actual_start = start_datetime - timedelta(
minutes=shift_type.begin_check_in_before_shift_start_time
)
@@ -513,3 +312,34 @@ def get_shift_details(shift_type_name: str, for_timestamp: datetime = None) -> D
"actual_end": actual_end,
}
)
+
+
+def get_actual_start_end_datetime_of_shift(employee, for_datetime, consider_default_shift=False):
+ """Takes a datetime and returns the 'actual' start datetime and end datetime of the shift in which the timestamp belongs.
+ Here 'actual' means - taking in to account the "begin_check_in_before_shift_start_time" and "allow_check_out_after_shift_end_time".
+ None is returned if the timestamp is outside any actual shift timings.
+ Shift Details is also returned(current/upcoming i.e. if timestamp not in any actual shift then details of next shift returned)
+ """
+ actual_shift_start = actual_shift_end = shift_details = None
+ shift_timings_as_per_timestamp = get_employee_shift_timings(
+ employee, for_datetime, consider_default_shift
+ )
+ timestamp_list = []
+ for shift in shift_timings_as_per_timestamp:
+ if shift:
+ timestamp_list.extend([shift.actual_start, shift.actual_end])
+ else:
+ timestamp_list.extend([None, None])
+ timestamp_index = None
+ for index, timestamp in enumerate(timestamp_list):
+ if timestamp and for_datetime <= timestamp:
+ timestamp_index = index
+ break
+ if timestamp_index and timestamp_index % 2 == 1:
+ shift_details = shift_timings_as_per_timestamp[int((timestamp_index - 1) / 2)]
+ actual_shift_start = shift_details.actual_start
+ actual_shift_end = shift_details.actual_end
+ elif timestamp_index:
+ shift_details = shift_timings_as_per_timestamp[int(timestamp_index / 2)]
+
+ return actual_shift_start, actual_shift_end, shift_details
diff --git a/erpnext/hr/doctype/shift_assignment/test_shift_assignment.py b/erpnext/hr/doctype/shift_assignment/test_shift_assignment.py
index 0fe9108168..4a1ec293bd 100644
--- a/erpnext/hr/doctype/shift_assignment/test_shift_assignment.py
+++ b/erpnext/hr/doctype/shift_assignment/test_shift_assignment.py
@@ -4,23 +4,16 @@
import unittest
import frappe
-from frappe.tests.utils import FrappeTestCase
-from frappe.utils import add_days, getdate, nowdate
-
-from erpnext.hr.doctype.employee.test_employee import make_employee
-from erpnext.hr.doctype.shift_assignment.shift_assignment import OverlappingShiftError
-from erpnext.hr.doctype.shift_type.test_shift_type import make_shift_assignment, setup_shift_type
+from frappe.utils import add_days, nowdate
test_dependencies = ["Shift Type"]
-class TestShiftAssignment(FrappeTestCase):
+class TestShiftAssignment(unittest.TestCase):
def setUp(self):
- frappe.db.delete("Shift Assignment")
- frappe.db.delete("Shift Type")
+ frappe.db.sql("delete from `tabShift Assignment`")
def test_make_shift_assignment(self):
- setup_shift_type(shift_type="Day Shift")
shift_assignment = frappe.get_doc(
{
"doctype": "Shift Assignment",
@@ -36,7 +29,7 @@ class TestShiftAssignment(FrappeTestCase):
def test_overlapping_for_ongoing_shift(self):
# shift should be Ongoing if Only start_date is present and status = Active
- setup_shift_type(shift_type="Day Shift")
+
shift_assignment_1 = frappe.get_doc(
{
"doctype": "Shift Assignment",
@@ -61,11 +54,11 @@ class TestShiftAssignment(FrappeTestCase):
}
)
- self.assertRaises(OverlappingShiftError, shift_assignment.save)
+ self.assertRaises(frappe.ValidationError, shift_assignment.save)
def test_overlapping_for_fixed_period_shift(self):
# shift should is for Fixed period if Only start_date and end_date both are present and status = Active
- setup_shift_type(shift_type="Day Shift")
+
shift_assignment_1 = frappe.get_doc(
{
"doctype": "Shift Assignment",
@@ -92,65 +85,4 @@ class TestShiftAssignment(FrappeTestCase):
}
)
- self.assertRaises(OverlappingShiftError, shift_assignment_3.save)
-
- def test_overlapping_for_a_fixed_period_shift_and_ongoing_shift(self):
- employee = make_employee("test_shift_assignment@example.com", company="_Test Company")
-
- # shift setup for 8-12
- shift_type = setup_shift_type(shift_type="Shift 1", start_time="08:00:00", end_time="12:00:00")
- date = getdate()
- # shift with end date
- make_shift_assignment(shift_type.name, employee, date, add_days(date, 30))
-
- # shift setup for 11-15
- shift_type = setup_shift_type(shift_type="Shift 2", start_time="11:00:00", end_time="15:00:00")
- date = getdate()
-
- # shift assignment without end date
- shift2 = frappe.get_doc(
- {
- "doctype": "Shift Assignment",
- "shift_type": shift_type.name,
- "company": "_Test Company",
- "employee": employee,
- "start_date": date,
- }
- )
- self.assertRaises(OverlappingShiftError, shift2.insert)
-
- def test_overlap_validation_for_shifts_on_same_day_with_overlapping_timeslots(self):
- employee = make_employee("test_shift_assignment@example.com", company="_Test Company")
-
- # shift setup for 8-12
- shift_type = setup_shift_type(shift_type="Shift 1", start_time="08:00:00", end_time="12:00:00")
- date = getdate()
- make_shift_assignment(shift_type.name, employee, date)
-
- # shift setup for 11-15
- shift_type = setup_shift_type(shift_type="Shift 2", start_time="11:00:00", end_time="15:00:00")
- date = getdate()
-
- shift2 = frappe.get_doc(
- {
- "doctype": "Shift Assignment",
- "shift_type": shift_type.name,
- "company": "_Test Company",
- "employee": employee,
- "start_date": date,
- }
- )
- self.assertRaises(OverlappingShiftError, shift2.insert)
-
- def test_multiple_shift_assignments_for_same_day(self):
- employee = make_employee("test_shift_assignment@example.com", company="_Test Company")
-
- # shift setup for 8-12
- shift_type = setup_shift_type(shift_type="Shift 1", start_time="08:00:00", end_time="12:00:00")
- date = getdate()
- make_shift_assignment(shift_type.name, employee, date)
-
- # shift setup for 13-15
- shift_type = setup_shift_type(shift_type="Shift 2", start_time="13:00:00", end_time="15:00:00")
- date = getdate()
- make_shift_assignment(shift_type.name, employee, date)
+ self.assertRaises(frappe.ValidationError, shift_assignment_3.save)
diff --git a/erpnext/hr/doctype/shift_request/shift_request.py b/erpnext/hr/doctype/shift_request/shift_request.py
index 2bee2404aa..b5beef7a99 100644
--- a/erpnext/hr/doctype/shift_request/shift_request.py
+++ b/erpnext/hr/doctype/shift_request/shift_request.py
@@ -5,14 +5,12 @@
import frappe
from frappe import _
from frappe.model.document import Document
-from frappe.query_builder import Criterion
-from frappe.utils import get_link_to_form, getdate
+from frappe.utils import formatdate, getdate
-from erpnext.hr.doctype.shift_assignment.shift_assignment import has_overlapping_timings
from erpnext.hr.utils import share_doc_with_approver, validate_active_employee
-class OverlappingShiftRequestError(frappe.ValidationError):
+class OverlapError(frappe.ValidationError):
pass
@@ -20,7 +18,7 @@ class ShiftRequest(Document):
def validate(self):
validate_active_employee(self.employee)
self.validate_dates()
- self.validate_overlapping_shift_requests()
+ self.validate_shift_request_overlap_dates()
self.validate_approver()
self.validate_default_shift()
@@ -81,60 +79,37 @@ class ShiftRequest(Document):
if self.from_date and self.to_date and (getdate(self.to_date) < getdate(self.from_date)):
frappe.throw(_("To date cannot be before from date"))
- def validate_overlapping_shift_requests(self):
- overlapping_dates = self.get_overlapping_dates()
- if len(overlapping_dates):
- # if dates are overlapping, check if timings are overlapping, else allow
- overlapping_timings = has_overlapping_timings(self.shift_type, overlapping_dates[0].shift_type)
- if overlapping_timings:
- self.throw_overlap_error(overlapping_dates[0])
-
- def get_overlapping_dates(self):
+ def validate_shift_request_overlap_dates(self):
if not self.name:
self.name = "New Shift Request"
- shift = frappe.qb.DocType("Shift Request")
- query = (
- frappe.qb.from_(shift)
- .select(shift.name, shift.shift_type)
- .where((shift.employee == self.employee) & (shift.docstatus < 2) & (shift.name != self.name))
+ d = frappe.db.sql(
+ """
+ select
+ name, shift_type, from_date, to_date
+ from `tabShift Request`
+ where employee = %(employee)s and docstatus < 2
+ and ((%(from_date)s >= from_date
+ and %(from_date)s <= to_date) or
+ ( %(to_date)s >= from_date
+ and %(to_date)s <= to_date ))
+ and name != %(name)s""",
+ {
+ "employee": self.employee,
+ "shift_type": self.shift_type,
+ "from_date": self.from_date,
+ "to_date": self.to_date,
+ "name": self.name,
+ },
+ as_dict=1,
)
- if self.to_date:
- query = query.where(
- Criterion.any(
- [
- Criterion.any(
- [
- shift.to_date.isnull(),
- ((self.from_date >= shift.from_date) & (self.from_date <= shift.to_date)),
- ]
- ),
- Criterion.any(
- [
- ((self.to_date >= shift.from_date) & (self.to_date <= shift.to_date)),
- shift.from_date.between(self.from_date, self.to_date),
- ]
- ),
- ]
- )
- )
- else:
- query = query.where(
- shift.to_date.isnull()
- | ((self.from_date >= shift.from_date) & (self.from_date <= shift.to_date))
- )
+ for date_overlap in d:
+ if date_overlap["name"]:
+ self.throw_overlap_error(date_overlap)
- return query.run(as_dict=True)
-
- def throw_overlap_error(self, shift_details):
- shift_details = frappe._dict(shift_details)
- msg = _(
- "Employee {0} has already applied for Shift {1}: {2} that overlaps within this period"
- ).format(
- frappe.bold(self.employee),
- frappe.bold(shift_details.shift_type),
- get_link_to_form("Shift Request", shift_details.name),
- )
-
- frappe.throw(msg, title=_("Overlapping Shift Requests"), exc=OverlappingShiftRequestError)
+ def throw_overlap_error(self, d):
+ msg = _("Employee {0} has already applied for {1} between {2} and {3}").format(
+ self.employee, d["shift_type"], formatdate(d["from_date"]), formatdate(d["to_date"])
+ ) + """ : {0}""".format(d["name"])
+ frappe.throw(msg, OverlapError)
diff --git a/erpnext/hr/doctype/shift_request/test_shift_request.py b/erpnext/hr/doctype/shift_request/test_shift_request.py
index c47418cfa8..b4f5177215 100644
--- a/erpnext/hr/doctype/shift_request/test_shift_request.py
+++ b/erpnext/hr/doctype/shift_request/test_shift_request.py
@@ -4,24 +4,23 @@
import unittest
import frappe
-from frappe.tests.utils import FrappeTestCase
from frappe.utils import add_days, nowdate
from erpnext.hr.doctype.employee.test_employee import make_employee
-from erpnext.hr.doctype.shift_request.shift_request import OverlappingShiftRequestError
-from erpnext.hr.doctype.shift_type.test_shift_type import setup_shift_type
test_dependencies = ["Shift Type"]
-class TestShiftRequest(FrappeTestCase):
+class TestShiftRequest(unittest.TestCase):
def setUp(self):
- for doctype in ["Shift Request", "Shift Assignment", "Shift Type"]:
- frappe.db.delete(doctype)
+ for doctype in ["Shift Request", "Shift Assignment"]:
+ frappe.db.sql("delete from `tab{doctype}`".format(doctype=doctype))
+
+ def tearDown(self):
+ frappe.db.rollback()
def test_make_shift_request(self):
"Test creation/updation of Shift Assignment from Shift Request."
- setup_shift_type(shift_type="Day Shift")
department = frappe.get_value("Employee", "_T-Employee-00001", "department")
set_shift_approver(department)
approver = frappe.db.sql(
@@ -49,7 +48,6 @@ class TestShiftRequest(FrappeTestCase):
self.assertEqual(shift_assignment_docstatus, 2)
def test_shift_request_approver_perms(self):
- setup_shift_type(shift_type="Day Shift")
employee = frappe.get_doc("Employee", "_T-Employee-00001")
user = "test_approver_perm_emp@example.com"
make_employee(user, "_Test Company")
@@ -89,145 +87,6 @@ class TestShiftRequest(FrappeTestCase):
employee.shift_request_approver = ""
employee.save()
- def test_overlap_for_request_without_to_date(self):
- # shift should be Ongoing if Only from_date is present
- user = "test_shift_request@example.com"
- employee = make_employee(user, company="_Test Company", shift_request_approver=user)
- setup_shift_type(shift_type="Day Shift")
-
- shift_request = frappe.get_doc(
- {
- "doctype": "Shift Request",
- "shift_type": "Day Shift",
- "company": "_Test Company",
- "employee": employee,
- "from_date": nowdate(),
- "approver": user,
- "status": "Approved",
- }
- ).submit()
-
- shift_request = frappe.get_doc(
- {
- "doctype": "Shift Request",
- "shift_type": "Day Shift",
- "company": "_Test Company",
- "employee": employee,
- "from_date": add_days(nowdate(), 2),
- "approver": user,
- "status": "Approved",
- }
- )
-
- self.assertRaises(OverlappingShiftRequestError, shift_request.save)
-
- def test_overlap_for_request_with_from_and_to_dates(self):
- user = "test_shift_request@example.com"
- employee = make_employee(user, company="_Test Company", shift_request_approver=user)
- setup_shift_type(shift_type="Day Shift")
-
- shift_request = frappe.get_doc(
- {
- "doctype": "Shift Request",
- "shift_type": "Day Shift",
- "company": "_Test Company",
- "employee": employee,
- "from_date": nowdate(),
- "to_date": add_days(nowdate(), 30),
- "approver": user,
- "status": "Approved",
- }
- ).submit()
-
- shift_request = frappe.get_doc(
- {
- "doctype": "Shift Request",
- "shift_type": "Day Shift",
- "company": "_Test Company",
- "employee": employee,
- "from_date": add_days(nowdate(), 10),
- "to_date": add_days(nowdate(), 35),
- "approver": user,
- "status": "Approved",
- }
- )
-
- self.assertRaises(OverlappingShiftRequestError, shift_request.save)
-
- def test_overlapping_for_a_fixed_period_shift_and_ongoing_shift(self):
- user = "test_shift_request@example.com"
- employee = make_employee(user, company="_Test Company", shift_request_approver=user)
-
- # shift setup for 8-12
- shift_type = setup_shift_type(shift_type="Shift 1", start_time="08:00:00", end_time="12:00:00")
- date = nowdate()
-
- # shift with end date
- frappe.get_doc(
- {
- "doctype": "Shift Request",
- "shift_type": shift_type.name,
- "company": "_Test Company",
- "employee": employee,
- "from_date": date,
- "to_date": add_days(date, 30),
- "approver": user,
- "status": "Approved",
- }
- ).submit()
-
- # shift setup for 11-15
- shift_type = setup_shift_type(shift_type="Shift 2", start_time="11:00:00", end_time="15:00:00")
- shift2 = frappe.get_doc(
- {
- "doctype": "Shift Request",
- "shift_type": shift_type.name,
- "company": "_Test Company",
- "employee": employee,
- "from_date": date,
- "approver": user,
- "status": "Approved",
- }
- )
-
- self.assertRaises(OverlappingShiftRequestError, shift2.insert)
-
- def test_allow_non_overlapping_shift_requests_for_same_day(self):
- user = "test_shift_request@example.com"
- employee = make_employee(user, company="_Test Company", shift_request_approver=user)
-
- # shift setup for 8-12
- shift_type = setup_shift_type(shift_type="Shift 1", start_time="08:00:00", end_time="12:00:00")
- date = nowdate()
-
- # shift with end date
- frappe.get_doc(
- {
- "doctype": "Shift Request",
- "shift_type": shift_type.name,
- "company": "_Test Company",
- "employee": employee,
- "from_date": date,
- "to_date": add_days(date, 30),
- "approver": user,
- "status": "Approved",
- }
- ).submit()
-
- # shift setup for 13-15
- shift_type = setup_shift_type(shift_type="Shift 2", start_time="13:00:00", end_time="15:00:00")
- frappe.get_doc(
- {
- "doctype": "Shift Request",
- "shift_type": shift_type.name,
- "company": "_Test Company",
- "employee": employee,
- "from_date": date,
- "approver": user,
- "status": "Approved",
- }
- ).submit()
-
def set_shift_approver(department):
department_doc = frappe.get_doc("Department", department)
diff --git a/erpnext/hr/doctype/shift_type/shift_type.py b/erpnext/hr/doctype/shift_type/shift_type.py
index 5e214cf7b7..3f5cb222bf 100644
--- a/erpnext/hr/doctype/shift_type/shift_type.py
+++ b/erpnext/hr/doctype/shift_type/shift_type.py
@@ -3,23 +3,21 @@
import itertools
-from datetime import datetime, timedelta
+from datetime import timedelta
import frappe
from frappe.model.document import Document
-from frappe.utils import cint, get_datetime, get_time, getdate
+from frappe.utils import cint, get_datetime, getdate
-from erpnext.buying.doctype.supplier_scorecard.supplier_scorecard import daterange
from erpnext.hr.doctype.attendance.attendance import mark_attendance
from erpnext.hr.doctype.employee.employee import get_holiday_list_for_employee
from erpnext.hr.doctype.employee_checkin.employee_checkin import (
calculate_working_hours,
mark_attendance_and_link_log,
)
-from erpnext.hr.doctype.holiday_list.holiday_list import is_holiday
from erpnext.hr.doctype.shift_assignment.shift_assignment import (
+ get_actual_start_end_datetime_of_shift,
get_employee_shift,
- get_shift_details,
)
@@ -32,9 +30,8 @@ class ShiftType(Document):
or not self.last_sync_of_checkin
):
return
-
filters = {
- "skip_auto_attendance": 0,
+ "skip_auto_attendance": "0",
"attendance": ("is", "not set"),
"time": (">=", self.process_attendance_after),
"shift_actual_end": ("<", self.last_sync_of_checkin),
@@ -43,7 +40,6 @@ class ShiftType(Document):
logs = frappe.db.get_list(
"Employee Checkin", fields="*", filters=filters, order_by="employee,time"
)
-
for key, group in itertools.groupby(
logs, key=lambda x: (x["employee"], x["shift_actual_start"])
):
@@ -56,7 +52,6 @@ class ShiftType(Document):
in_time,
out_time,
) = self.get_attendance(single_shift_logs)
-
mark_attendance_and_link_log(
single_shift_logs,
attendance_status,
@@ -68,16 +63,15 @@ class ShiftType(Document):
out_time,
self.name,
)
-
for employee in self.get_assigned_employee(self.process_attendance_after, True):
self.mark_absent_for_dates_with_no_attendance(employee)
def get_attendance(self, logs):
"""Return attendance_status, working_hours, late_entry, early_exit, in_time, out_time
for a set of logs belonging to a single shift.
- Assumptions:
- 1. These logs belongs to a single shift, single employee and it's not in a holiday date.
- 2. Logs are in chronological order
+ Assumtion:
+ 1. These logs belongs to an single shift, single employee and is not in a holiday date.
+ 2. Logs are in chronological order
"""
late_entry = early_exit = False
total_working_hours, in_time, out_time = calculate_working_hours(
@@ -97,68 +91,39 @@ class ShiftType(Document):
):
early_exit = True
- if (
- self.working_hours_threshold_for_half_day
- and total_working_hours < self.working_hours_threshold_for_half_day
- ):
- return "Half Day", total_working_hours, late_entry, early_exit, in_time, out_time
if (
self.working_hours_threshold_for_absent
and total_working_hours < self.working_hours_threshold_for_absent
):
return "Absent", total_working_hours, late_entry, early_exit, in_time, out_time
+ if (
+ self.working_hours_threshold_for_half_day
+ and total_working_hours < self.working_hours_threshold_for_half_day
+ ):
+ return "Half Day", total_working_hours, late_entry, early_exit, in_time, out_time
return "Present", total_working_hours, late_entry, early_exit, in_time, out_time
def mark_absent_for_dates_with_no_attendance(self, employee):
"""Marks Absents for the given employee on working days in this shift which have no attendance marked.
The Absent is marked starting from 'process_attendance_after' or employee creation date.
"""
- start_date, end_date = self.get_start_and_end_dates(employee)
-
- # no shift assignment found, no need to process absent attendance records
- if start_date is None:
- return
-
- holiday_list_name = self.holiday_list
- if not holiday_list_name:
- holiday_list_name = get_holiday_list_for_employee(employee, False)
-
- start_time = get_time(self.start_time)
-
- for date in daterange(getdate(start_date), getdate(end_date)):
- if is_holiday(holiday_list_name, date):
- # skip marking absent on a holiday
- continue
-
- timestamp = datetime.combine(date, start_time)
- shift_details = get_employee_shift(employee, timestamp, True)
-
- if shift_details and shift_details.shift_type.name == self.name:
- mark_attendance(employee, date, "Absent", self.name)
-
- def get_start_and_end_dates(self, employee):
- """Returns start and end dates for checking attendance and marking absent
- return: start date = max of `process_attendance_after` and DOJ
- return: end date = min of shift before `last_sync_of_checkin` and Relieving Date
- """
date_of_joining, relieving_date, employee_creation = frappe.db.get_value(
"Employee", employee, ["date_of_joining", "relieving_date", "creation"]
)
-
if not date_of_joining:
date_of_joining = employee_creation.date()
-
start_date = max(getdate(self.process_attendance_after), date_of_joining)
- end_date = None
-
- shift_details = get_shift_details(self.name, get_datetime(self.last_sync_of_checkin))
- last_shift_time = (
- shift_details.actual_start if shift_details else get_datetime(self.last_sync_of_checkin)
+ actual_shift_datetime = get_actual_start_end_datetime_of_shift(
+ employee, get_datetime(self.last_sync_of_checkin), True
+ )
+ last_shift_time = (
+ actual_shift_datetime[0]
+ if actual_shift_datetime[0]
+ else get_datetime(self.last_sync_of_checkin)
+ )
+ prev_shift = get_employee_shift(
+ employee, last_shift_time.date() - timedelta(days=1), True, "reverse"
)
-
- # check if shift is found for 1 day before the last sync of checkin
- # absentees are auto-marked 1 day after the shift to wait for any manual attendance records
- prev_shift = get_employee_shift(employee, last_shift_time - timedelta(days=1), True, "reverse")
if prev_shift:
end_date = (
min(prev_shift.start_datetime.date(), relieving_date)
@@ -166,21 +131,28 @@ class ShiftType(Document):
else prev_shift.start_datetime.date()
)
else:
- # no shift found
- return None, None
- return start_date, end_date
+ return
+ holiday_list_name = self.holiday_list
+ if not holiday_list_name:
+ holiday_list_name = get_holiday_list_for_employee(employee, False)
+ dates = get_filtered_date_list(employee, start_date, end_date, holiday_list=holiday_list_name)
+ for date in dates:
+ shift_details = get_employee_shift(employee, date, True)
+ if shift_details and shift_details.shift_type.name == self.name:
+ mark_attendance(employee, date, "Absent", self.name)
def get_assigned_employee(self, from_date=None, consider_default_shift=False):
- filters = {"shift_type": self.name, "docstatus": "1"}
- if from_date:
- filters["start_date"] = (">", from_date)
+ filters = {"start_date": (">", from_date), "shift_type": self.name, "docstatus": "1"}
+ if not from_date:
+ del filters["start_date"]
- assigned_employees = frappe.get_all("Shift Assignment", filters=filters, pluck="employee")
+ assigned_employees = frappe.get_all("Shift Assignment", "employee", filters, as_list=True)
+ assigned_employees = [x[0] for x in assigned_employees]
if consider_default_shift:
filters = {"default_shift": self.name, "status": ["!=", "Inactive"]}
- default_shift_employees = frappe.get_all("Employee", filters=filters, pluck="name")
-
+ default_shift_employees = frappe.get_all("Employee", "name", filters, as_list=True)
+ default_shift_employees = [x[0] for x in default_shift_employees]
return list(set(assigned_employees + default_shift_employees))
return assigned_employees
@@ -190,3 +162,42 @@ def process_auto_attendance_for_all_shifts():
for shift in shift_list:
doc = frappe.get_doc("Shift Type", shift[0])
doc.process_auto_attendance()
+
+
+def get_filtered_date_list(
+ employee, start_date, end_date, filter_attendance=True, holiday_list=None
+):
+ """Returns a list of dates after removing the dates with attendance and holidays"""
+ base_dates_query = """select adddate(%(start_date)s, t2.i*100 + t1.i*10 + t0.i) selected_date from
+ (select 0 i union select 1 union select 2 union select 3 union select 4 union select 5 union select 6 union select 7 union select 8 union select 9) t0,
+ (select 0 i union select 1 union select 2 union select 3 union select 4 union select 5 union select 6 union select 7 union select 8 union select 9) t1,
+ (select 0 i union select 1 union select 2 union select 3 union select 4 union select 5 union select 6 union select 7 union select 8 union select 9) t2"""
+ condition_query = ""
+ if filter_attendance:
+ condition_query += """ and a.selected_date not in (
+ select attendance_date from `tabAttendance`
+ where docstatus = 1 and employee = %(employee)s
+ and attendance_date between %(start_date)s and %(end_date)s)"""
+ if holiday_list:
+ condition_query += """ and a.selected_date not in (
+ select holiday_date from `tabHoliday` where parenttype = 'Holiday List' and
+ parentfield = 'holidays' and parent = %(holiday_list)s
+ and holiday_date between %(start_date)s and %(end_date)s)"""
+
+ dates = frappe.db.sql(
+ """select * from
+ ({base_dates_query}) as a
+ where a.selected_date <= %(end_date)s {condition_query}
+ """.format(
+ base_dates_query=base_dates_query, condition_query=condition_query
+ ),
+ {
+ "employee": employee,
+ "start_date": start_date,
+ "end_date": end_date,
+ "holiday_list": holiday_list,
+ },
+ as_list=True,
+ )
+
+ return [getdate(date[0]) for date in dates]
diff --git a/erpnext/hr/doctype/shift_type/test_shift_type.py b/erpnext/hr/doctype/shift_type/test_shift_type.py
index 0d75292a1e..7d2f29cd6c 100644
--- a/erpnext/hr/doctype/shift_type/test_shift_type.py
+++ b/erpnext/hr/doctype/shift_type/test_shift_type.py
@@ -2,381 +2,7 @@
# See license.txt
import unittest
-from datetime import datetime, timedelta
-import frappe
-from frappe.tests.utils import FrappeTestCase
-from frappe.utils import add_days, get_time, get_year_ending, get_year_start, getdate, now_datetime
-from erpnext.hr.doctype.employee.test_employee import make_employee
-from erpnext.hr.doctype.holiday_list.test_holiday_list import set_holiday_list
-from erpnext.hr.doctype.leave_application.test_leave_application import get_first_sunday
-from erpnext.payroll.doctype.salary_slip.test_salary_slip import make_holiday_list
-
-
-class TestShiftType(FrappeTestCase):
- def setUp(self):
- frappe.db.delete("Shift Type")
- frappe.db.delete("Shift Assignment")
- frappe.db.delete("Employee Checkin")
- frappe.db.delete("Attendance")
-
- from_date = get_year_start(getdate())
- to_date = get_year_ending(getdate())
- self.holiday_list = make_holiday_list(from_date=from_date, to_date=to_date)
-
- def test_mark_attendance(self):
- from erpnext.hr.doctype.employee_checkin.test_employee_checkin import make_checkin
-
- employee = make_employee("test_employee_checkin@example.com", company="_Test Company")
-
- shift_type = setup_shift_type()
- date = getdate()
- make_shift_assignment(shift_type.name, employee, date)
-
- timestamp = datetime.combine(date, get_time("08:00:00"))
- log_in = make_checkin(employee, timestamp)
- self.assertEqual(log_in.shift, shift_type.name)
-
- timestamp = datetime.combine(date, get_time("12:00:00"))
- log_out = make_checkin(employee, timestamp)
- self.assertEqual(log_out.shift, shift_type.name)
-
- shift_type.process_auto_attendance()
-
- attendance = frappe.db.get_value(
- "Attendance", {"shift": shift_type.name}, ["status", "name"], as_dict=True
- )
- self.assertEqual(attendance.status, "Present")
-
- def test_entry_and_exit_grace(self):
- from erpnext.hr.doctype.employee_checkin.test_employee_checkin import make_checkin
-
- employee = make_employee("test_employee_checkin@example.com", company="_Test Company")
-
- # doesn't mark late entry until 60 mins after shift start i.e. till 9
- # doesn't mark late entry until 60 mins before shift end i.e. 11
- shift_type = setup_shift_type(
- enable_entry_grace_period=1,
- enable_exit_grace_period=1,
- late_entry_grace_period=60,
- early_exit_grace_period=60,
- )
- date = getdate()
- make_shift_assignment(shift_type.name, employee, date)
-
- timestamp = datetime.combine(date, get_time("09:30:00"))
- log_in = make_checkin(employee, timestamp)
- self.assertEqual(log_in.shift, shift_type.name)
-
- timestamp = datetime.combine(date, get_time("10:30:00"))
- log_out = make_checkin(employee, timestamp)
- self.assertEqual(log_out.shift, shift_type.name)
-
- shift_type.process_auto_attendance()
-
- attendance = frappe.db.get_value(
- "Attendance",
- {"shift": shift_type.name},
- ["status", "name", "late_entry", "early_exit"],
- as_dict=True,
- )
- self.assertEqual(attendance.status, "Present")
- self.assertEqual(attendance.late_entry, 1)
- self.assertEqual(attendance.early_exit, 1)
-
- def test_working_hours_threshold_for_half_day(self):
- from erpnext.hr.doctype.employee_checkin.test_employee_checkin import make_checkin
-
- employee = make_employee("test_employee_checkin@example.com", company="_Test Company")
- shift_type = setup_shift_type(shift_type="Half Day Test", working_hours_threshold_for_half_day=2)
- date = getdate()
- make_shift_assignment(shift_type.name, employee, date)
-
- timestamp = datetime.combine(date, get_time("08:00:00"))
- log_in = make_checkin(employee, timestamp)
- self.assertEqual(log_in.shift, shift_type.name)
-
- timestamp = datetime.combine(date, get_time("09:30:00"))
- log_out = make_checkin(employee, timestamp)
- self.assertEqual(log_out.shift, shift_type.name)
-
- shift_type.process_auto_attendance()
-
- attendance = frappe.db.get_value(
- "Attendance", {"shift": shift_type.name}, ["status", "working_hours"], as_dict=True
- )
- self.assertEqual(attendance.status, "Half Day")
- self.assertEqual(attendance.working_hours, 1.5)
-
- def test_working_hours_threshold_for_absent(self):
- from erpnext.hr.doctype.employee_checkin.test_employee_checkin import make_checkin
-
- employee = make_employee("test_employee_checkin@example.com", company="_Test Company")
- shift_type = setup_shift_type(shift_type="Absent Test", working_hours_threshold_for_absent=2)
- date = getdate()
- make_shift_assignment(shift_type.name, employee, date)
-
- timestamp = datetime.combine(date, get_time("08:00:00"))
- log_in = make_checkin(employee, timestamp)
- self.assertEqual(log_in.shift, shift_type.name)
-
- timestamp = datetime.combine(date, get_time("09:30:00"))
- log_out = make_checkin(employee, timestamp)
- self.assertEqual(log_out.shift, shift_type.name)
-
- shift_type.process_auto_attendance()
-
- attendance = frappe.db.get_value(
- "Attendance", {"shift": shift_type.name}, ["status", "working_hours"], as_dict=True
- )
- self.assertEqual(attendance.status, "Absent")
- self.assertEqual(attendance.working_hours, 1.5)
-
- def test_working_hours_threshold_for_absent_and_half_day_1(self):
- # considers half day over absent
- from erpnext.hr.doctype.employee_checkin.test_employee_checkin import make_checkin
-
- employee = make_employee("test_employee_checkin@example.com", company="_Test Company")
- shift_type = setup_shift_type(
- shift_type="Half Day + Absent Test",
- working_hours_threshold_for_half_day=1,
- working_hours_threshold_for_absent=2,
- )
- date = getdate()
- make_shift_assignment(shift_type.name, employee, date)
-
- timestamp = datetime.combine(date, get_time("08:00:00"))
- log_in = make_checkin(employee, timestamp)
- self.assertEqual(log_in.shift, shift_type.name)
-
- timestamp = datetime.combine(date, get_time("08:45:00"))
- log_out = make_checkin(employee, timestamp)
- self.assertEqual(log_out.shift, shift_type.name)
-
- shift_type.process_auto_attendance()
-
- attendance = frappe.db.get_value(
- "Attendance", {"shift": shift_type.name}, ["status", "working_hours"], as_dict=True
- )
- self.assertEqual(attendance.status, "Half Day")
- self.assertEqual(attendance.working_hours, 0.75)
-
- def test_working_hours_threshold_for_absent_and_half_day_2(self):
- # considers absent over half day
- from erpnext.hr.doctype.employee_checkin.test_employee_checkin import make_checkin
-
- employee = make_employee("test_employee_checkin@example.com", company="_Test Company")
- shift_type = setup_shift_type(
- shift_type="Half Day + Absent Test",
- working_hours_threshold_for_half_day=1,
- working_hours_threshold_for_absent=2,
- )
- date = getdate()
- make_shift_assignment(shift_type.name, employee, date)
-
- timestamp = datetime.combine(date, get_time("08:00:00"))
- log_in = make_checkin(employee, timestamp)
- self.assertEqual(log_in.shift, shift_type.name)
-
- timestamp = datetime.combine(date, get_time("09:30:00"))
- log_out = make_checkin(employee, timestamp)
- self.assertEqual(log_out.shift, shift_type.name)
-
- shift_type.process_auto_attendance()
-
- attendance = frappe.db.get_value("Attendance", {"shift": shift_type.name}, "status")
- self.assertEqual(attendance, "Absent")
-
- def test_mark_absent_for_dates_with_no_attendance(self):
- employee = make_employee("test_employee_checkin@example.com", company="_Test Company")
- shift_type = setup_shift_type(shift_type="Test Absent with no Attendance")
-
- # absentees are auto-marked one day after to wait for any manual attendance records
- date = add_days(getdate(), -1)
- make_shift_assignment(shift_type.name, employee, date)
-
- shift_type.process_auto_attendance()
-
- attendance = frappe.db.get_value(
- "Attendance", {"attendance_date": date, "employee": employee}, "status"
- )
- self.assertEqual(attendance, "Absent")
-
- @set_holiday_list("Salary Slip Test Holiday List", "_Test Company")
- def test_skip_marking_absent_on_a_holiday(self):
- employee = make_employee("test_employee_checkin@example.com", company="_Test Company")
- shift_type = setup_shift_type(shift_type="Test Absent with no Attendance")
- shift_type.holiday_list = None
- shift_type.save()
-
- # should not mark any attendance if no shift assignment is created
- shift_type.process_auto_attendance()
- attendance = frappe.db.get_value("Attendance", {"employee": employee}, "status")
- self.assertIsNone(attendance)
-
- first_sunday = get_first_sunday(self.holiday_list, for_date=getdate())
- make_shift_assignment(shift_type.name, employee, first_sunday)
-
- shift_type.process_auto_attendance()
-
- attendance = frappe.db.get_value(
- "Attendance", {"attendance_date": first_sunday, "employee": employee}, "status"
- )
- self.assertIsNone(attendance)
-
- def test_get_start_and_end_dates(self):
- date = getdate()
-
- doj = add_days(date, -30)
- relieving_date = add_days(date, -5)
- employee = make_employee(
- "test_employee_dates@example.com",
- company="_Test Company",
- date_of_joining=doj,
- relieving_date=relieving_date,
- )
- shift_type = setup_shift_type(
- shift_type="Test Absent with no Attendance", process_attendance_after=add_days(doj, 2)
- )
-
- make_shift_assignment(shift_type.name, employee, add_days(date, -25))
-
- shift_type.process_auto_attendance()
-
- # should not mark absent before shift assignment/process attendance after date
- attendance = frappe.db.get_value(
- "Attendance", {"attendance_date": doj, "employee": employee}, "name"
- )
- self.assertIsNone(attendance)
-
- # mark absent on Relieving Date
- attendance = frappe.db.get_value(
- "Attendance", {"attendance_date": relieving_date, "employee": employee}, "status"
- )
- self.assertEquals(attendance, "Absent")
-
- # should not mark absent after Relieving Date
- attendance = frappe.db.get_value(
- "Attendance", {"attendance_date": add_days(relieving_date, 1), "employee": employee}, "name"
- )
- self.assertIsNone(attendance)
-
- def test_skip_auto_attendance_for_duplicate_record(self):
- # Skip auto attendance in case of duplicate attendance record
- from erpnext.hr.doctype.attendance.attendance import mark_attendance
- from erpnext.hr.doctype.employee_checkin.test_employee_checkin import make_checkin
-
- employee = make_employee("test_employee_checkin@example.com", company="_Test Company")
-
- shift_type = setup_shift_type()
- date = getdate()
-
- # mark attendance
- mark_attendance(employee, date, "Present")
- make_shift_assignment(shift_type.name, employee, date)
-
- timestamp = datetime.combine(date, get_time("08:00:00"))
- log_in = make_checkin(employee, timestamp)
- self.assertEqual(log_in.shift, shift_type.name)
-
- timestamp = datetime.combine(date, get_time("12:00:00"))
- log_out = make_checkin(employee, timestamp)
- self.assertEqual(log_out.shift, shift_type.name)
-
- # auto attendance should skip marking
- shift_type.process_auto_attendance()
-
- log_in.reload()
- log_out.reload()
- self.assertEqual(log_in.skip_auto_attendance, 1)
- self.assertEqual(log_out.skip_auto_attendance, 1)
-
- def test_skip_auto_attendance_for_overlapping_shift(self):
- # Skip auto attendance in case of overlapping shift attendance record
- # this case won't occur in case of shift assignment, since it will not allow overlapping shifts to be assigned
- # can happen if manual attendance records are created
- from erpnext.hr.doctype.attendance.attendance import mark_attendance
- from erpnext.hr.doctype.employee_checkin.test_employee_checkin import make_checkin
-
- employee = make_employee("test_employee_checkin@example.com", company="_Test Company")
- shift_1 = setup_shift_type(shift_type="Shift 1", start_time="08:00:00", end_time="10:00:00")
- shift_2 = setup_shift_type(shift_type="Shift 2", start_time="09:30:00", end_time="11:00:00")
-
- date = getdate()
-
- # mark attendance
- mark_attendance(employee, date, "Present", shift=shift_1.name)
- make_shift_assignment(shift_2.name, employee, date)
-
- timestamp = datetime.combine(date, get_time("09:30:00"))
- log_in = make_checkin(employee, timestamp)
- self.assertEqual(log_in.shift, shift_2.name)
-
- timestamp = datetime.combine(date, get_time("11:00:00"))
- log_out = make_checkin(employee, timestamp)
- self.assertEqual(log_out.shift, shift_2.name)
-
- # auto attendance should be skipped for shift 2
- # since it is already marked for overlapping shift 1
- shift_2.process_auto_attendance()
-
- log_in.reload()
- log_out.reload()
- self.assertEqual(log_in.skip_auto_attendance, 1)
- self.assertEqual(log_out.skip_auto_attendance, 1)
-
-
-def setup_shift_type(**args):
- args = frappe._dict(args)
- date = getdate()
-
- shift_type = frappe.get_doc(
- {
- "doctype": "Shift Type",
- "__newname": args.shift_type or "_Test Shift",
- "start_time": "08:00:00",
- "end_time": "12:00:00",
- "enable_auto_attendance": 1,
- "determine_check_in_and_check_out": "Alternating entries as IN and OUT during the same shift",
- "working_hours_calculation_based_on": "First Check-in and Last Check-out",
- "begin_check_in_before_shift_start_time": 60,
- "allow_check_out_after_shift_end_time": 60,
- "process_attendance_after": add_days(date, -2),
- "last_sync_of_checkin": now_datetime() + timedelta(days=1),
- }
- )
-
- holiday_list = "Employee Checkin Test Holiday List"
- if not frappe.db.exists("Holiday List", "Employee Checkin Test Holiday List"):
- holiday_list = frappe.get_doc(
- {
- "doctype": "Holiday List",
- "holiday_list_name": "Employee Checkin Test Holiday List",
- "from_date": get_year_start(date),
- "to_date": get_year_ending(date),
- }
- ).insert()
- holiday_list = holiday_list.name
-
- shift_type.holiday_list = holiday_list
- shift_type.update(args)
- shift_type.save()
-
- return shift_type
-
-
-def make_shift_assignment(shift_type, employee, start_date, end_date=None):
- shift_assignment = frappe.get_doc(
- {
- "doctype": "Shift Assignment",
- "shift_type": shift_type,
- "company": "_Test Company",
- "employee": employee,
- "start_date": start_date,
- "end_date": end_date,
- }
- ).insert()
- shift_assignment.submit()
-
- return shift_assignment
+class TestShiftType(unittest.TestCase):
+ pass
diff --git a/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.js b/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.js
index 6f4bbd54fb..42f7cdb50f 100644
--- a/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.js
+++ b/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.js
@@ -66,7 +66,8 @@ frappe.query_reports["Monthly Attendance Sheet"] = {
"Default": 0,
}
],
- onload: function() {
+
+ "onload": function() {
return frappe.call({
method: "erpnext.hr.report.monthly_attendance_sheet.monthly_attendance_sheet.get_attendance_years",
callback: function(r) {
@@ -77,25 +78,5 @@ frappe.query_reports["Monthly Attendance Sheet"] = {
year_filter.set_input(year_filter.df.default);
}
});
- },
- formatter: function(value, row, column, data, default_formatter) {
- value = default_formatter(value, row, column, data);
- const summarized_view = frappe.query_report.get_filter_value('summarized_view');
- const group_by = frappe.query_report.get_filter_value('group_by');
-
- if (!summarized_view) {
- if ((group_by && column.colIndex > 3) || (!group_by && column.colIndex > 2)) {
- if (value == 'P' || value == 'WFH')
- value = "" + value + "";
- else if (value == 'A')
- value = "" + value + "";
- else if (value == 'HD')
- value = "" + value + "";
- else if (value == 'L')
- value = "" + value + "";
- }
- }
-
- return value;
}
}
diff --git a/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py b/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py
index efd2d382d5..8ea49899f2 100644
--- a/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py
+++ b/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py
@@ -3,618 +3,365 @@
from calendar import monthrange
-from itertools import groupby
-from typing import Dict, List, Optional, Tuple
import frappe
-from frappe import _
-from frappe.query_builder.functions import Count, Extract, Sum
+from frappe import _, msgprint
from frappe.utils import cint, cstr, getdate
-Filters = frappe._dict
-
status_map = {
- "Present": "P",
"Absent": "A",
"Half Day": "HD",
- "Work From Home": "WFH",
+ "Holiday": "H",
+ "Weekly Off": "WO",
"On Leave": "L",
- "Holiday": "H",
- "Weekly Off": "WO",
+ "Present": "P",
+ "Work From Home": "WFH",
}
day_abbr = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
-def execute(filters: Optional[Filters] = None) -> Tuple:
- filters = frappe._dict(filters or {})
+def execute(filters=None):
+ if not filters:
+ filters = {}
- if not (filters.month and filters.year):
- frappe.throw(_("Please select month and year."))
+ if filters.hide_year_field == 1:
+ filters.year = 2020
- attendance_map = get_attendance_map(filters)
- if not attendance_map:
- frappe.msgprint(_("No attendance records found."), alert=True, indicator="orange")
- return [], [], None, None
-
- columns = get_columns(filters)
- data = get_data(filters, attendance_map)
-
- if not data:
- frappe.msgprint(
- _("No attendance records found for this criteria."), alert=True, indicator="orange"
- )
+ conditions, filters = get_conditions(filters)
+ columns, days = get_columns(filters)
+ att_map = get_attendance_list(conditions, filters)
+ if not att_map:
return columns, [], None, None
- message = get_message() if not filters.summarized_view else ""
- chart = get_chart_data(attendance_map, filters)
+ if filters.group_by:
+ emp_map, group_by_parameters = get_employee_details(filters.group_by, filters.company)
+ holiday_list = []
+ for parameter in group_by_parameters:
+ h_list = [
+ emp_map[parameter][d]["holiday_list"]
+ for d in emp_map[parameter]
+ if emp_map[parameter][d]["holiday_list"]
+ ]
+ holiday_list += h_list
+ else:
+ emp_map = get_employee_details(filters.group_by, filters.company)
+ holiday_list = [emp_map[d]["holiday_list"] for d in emp_map if emp_map[d]["holiday_list"]]
- return columns, data, message, chart
+ default_holiday_list = frappe.get_cached_value(
+ "Company", filters.get("company"), "default_holiday_list"
+ )
+ holiday_list.append(default_holiday_list)
+ holiday_list = list(set(holiday_list))
+ holiday_map = get_holiday(holiday_list, filters["month"])
+
+ data = []
+
+ leave_types = frappe.db.get_list("Leave Type")
+ leave_list = None
+ if filters.summarized_view:
+ leave_list = [d.name + ":Float:120" for d in leave_types]
+ columns.extend(leave_list)
+ columns.extend([_("Total Late Entries") + ":Float:120", _("Total Early Exits") + ":Float:120"])
+
+ if filters.group_by:
+ emp_att_map = {}
+ for parameter in group_by_parameters:
+ emp_map_set = set([key for key in emp_map[parameter].keys()])
+ att_map_set = set([key for key in att_map.keys()])
+ if att_map_set & emp_map_set:
+ parameter_row = ["" + parameter + ""] + [
+ "" for day in range(filters["total_days_in_month"] + 2)
+ ]
+ data.append(parameter_row)
+ record, emp_att_data = add_data(
+ emp_map[parameter],
+ att_map,
+ filters,
+ holiday_map,
+ conditions,
+ default_holiday_list,
+ leave_types=leave_types,
+ )
+ emp_att_map.update(emp_att_data)
+ data += record
+ else:
+ record, emp_att_map = add_data(
+ emp_map,
+ att_map,
+ filters,
+ holiday_map,
+ conditions,
+ default_holiday_list,
+ leave_types=leave_types,
+ )
+ data += record
+
+ chart_data = get_chart_data(emp_att_map, days)
+
+ return columns, data, None, chart_data
-def get_message() -> str:
- message = ""
- colors = ["green", "red", "orange", "green", "#318AD8", "", ""]
+def get_chart_data(emp_att_map, days):
+ labels = []
+ datasets = [
+ {"name": "Absent", "values": []},
+ {"name": "Present", "values": []},
+ {"name": "Leave", "values": []},
+ ]
+ for idx, day in enumerate(days, start=0):
+ p = day.replace("::65", "")
+ labels.append(day.replace("::65", ""))
+ total_absent_on_day = 0
+ total_leave_on_day = 0
+ total_present_on_day = 0
+ total_holiday = 0
+ for emp in emp_att_map.keys():
+ if emp_att_map[emp][idx]:
+ if emp_att_map[emp][idx] == "A":
+ total_absent_on_day += 1
+ if emp_att_map[emp][idx] in ["P", "WFH"]:
+ total_present_on_day += 1
+ if emp_att_map[emp][idx] == "HD":
+ total_present_on_day += 0.5
+ total_leave_on_day += 0.5
+ if emp_att_map[emp][idx] == "L":
+ total_leave_on_day += 1
- count = 0
- for status, abbr in status_map.items():
- message += f"""
-
- {status} - {abbr}
-
- """
- count += 1
+ datasets[0]["values"].append(total_absent_on_day)
+ datasets[1]["values"].append(total_present_on_day)
+ datasets[2]["values"].append(total_leave_on_day)
- return message
+ chart = {"data": {"labels": labels, "datasets": datasets}}
+
+ chart["type"] = "line"
+
+ return chart
-def get_columns(filters: Filters) -> List[Dict]:
+def add_data(
+ employee_map, att_map, filters, holiday_map, conditions, default_holiday_list, leave_types=None
+):
+
+ record = []
+ emp_att_map = {}
+ for emp in employee_map:
+ emp_det = employee_map.get(emp)
+ if not emp_det or emp not in att_map:
+ continue
+
+ row = []
+ if filters.group_by:
+ row += [" "]
+ row += [emp, emp_det.employee_name]
+
+ total_p = total_a = total_l = total_h = total_um = 0.0
+ emp_status_map = []
+ for day in range(filters["total_days_in_month"]):
+ status = None
+ status = att_map.get(emp).get(day + 1)
+
+ if status is None and holiday_map:
+ emp_holiday_list = emp_det.holiday_list if emp_det.holiday_list else default_holiday_list
+
+ if emp_holiday_list in holiday_map:
+ for idx, ele in enumerate(holiday_map[emp_holiday_list]):
+ if day + 1 == holiday_map[emp_holiday_list][idx][0]:
+ if holiday_map[emp_holiday_list][idx][1]:
+ status = "Weekly Off"
+ else:
+ status = "Holiday"
+ total_h += 1
+
+ abbr = status_map.get(status, "")
+ emp_status_map.append(abbr)
+
+ if filters.summarized_view:
+ if status == "Present" or status == "Work From Home":
+ total_p += 1
+ elif status == "Absent":
+ total_a += 1
+ elif status == "On Leave":
+ total_l += 1
+ elif status == "Half Day":
+ total_p += 0.5
+ total_a += 0.5
+ total_l += 0.5
+ elif not status:
+ total_um += 1
+
+ if not filters.summarized_view:
+ row += emp_status_map
+
+ if filters.summarized_view:
+ row += [total_p, total_l, total_a, total_h, total_um]
+
+ if not filters.get("employee"):
+ filters.update({"employee": emp})
+ conditions += " and employee = %(employee)s"
+ elif not filters.get("employee") == emp:
+ filters.update({"employee": emp})
+
+ if filters.summarized_view:
+ leave_details = frappe.db.sql(
+ """select leave_type, status, count(*) as count from `tabAttendance`\
+ where leave_type is not NULL %s group by leave_type, status"""
+ % conditions,
+ filters,
+ as_dict=1,
+ )
+
+ time_default_counts = frappe.db.sql(
+ """select (select count(*) from `tabAttendance` where \
+ late_entry = 1 %s) as late_entry_count, (select count(*) from tabAttendance where \
+ early_exit = 1 %s) as early_exit_count"""
+ % (conditions, conditions),
+ filters,
+ )
+
+ leaves = {}
+ for d in leave_details:
+ if d.status == "Half Day":
+ d.count = d.count * 0.5
+ if d.leave_type in leaves:
+ leaves[d.leave_type] += d.count
+ else:
+ leaves[d.leave_type] = d.count
+
+ for d in leave_types:
+ if d.name in leaves:
+ row.append(leaves[d.name])
+ else:
+ row.append("0.0")
+
+ row.extend([time_default_counts[0][0], time_default_counts[0][1]])
+ emp_att_map[emp] = emp_status_map
+ record.append(row)
+
+ return record, emp_att_map
+
+
+def get_columns(filters):
+
columns = []
if filters.group_by:
- columns.append(
- {
- "label": _(filters.group_by),
- "fieldname": frappe.scrub(filters.group_by),
- "fieldtype": "Link",
- "options": "Branch",
- "width": 120,
- }
- )
+ columns = [_(filters.group_by) + ":Link/Branch:120"]
- columns.extend(
- [
- {
- "label": _("Employee"),
- "fieldname": "employee",
- "fieldtype": "Link",
- "options": "Employee",
- "width": 135,
- },
- {"label": _("Employee Name"), "fieldname": "employee_name", "fieldtype": "Data", "width": 120},
- ]
- )
+ columns += [_("Employee") + ":Link/Employee:120", _("Employee Name") + ":Data/:120"]
+ days = []
+ for day in range(filters["total_days_in_month"]):
+ date = str(filters.year) + "-" + str(filters.month) + "-" + str(day + 1)
+ day_name = day_abbr[getdate(date).weekday()]
+ days.append(cstr(day + 1) + " " + day_name + "::65")
+ if not filters.summarized_view:
+ columns += days
if filters.summarized_view:
- columns.extend(
- [
- {
- "label": _("Total Present"),
- "fieldname": "total_present",
- "fieldtype": "Float",
- "width": 110,
- },
- {"label": _("Total Leaves"), "fieldname": "total_leaves", "fieldtype": "Float", "width": 110},
- {"label": _("Total Absent"), "fieldname": "total_absent", "fieldtype": "Float", "width": 110},
- {
- "label": _("Total Holidays"),
- "fieldname": "total_holidays",
- "fieldtype": "Float",
- "width": 120,
- },
- {
- "label": _("Unmarked Days"),
- "fieldname": "unmarked_days",
- "fieldtype": "Float",
- "width": 130,
- },
- ]
- )
- columns.extend(get_columns_for_leave_types())
- columns.extend(
- [
- {
- "label": _("Total Late Entries"),
- "fieldname": "total_late_entries",
- "fieldtype": "Float",
- "width": 140,
- },
- {
- "label": _("Total Early Exits"),
- "fieldname": "total_early_exits",
- "fieldtype": "Float",
- "width": 140,
- },
- ]
- )
- else:
- columns.append({"label": _("Shift"), "fieldname": "shift", "fieldtype": "Data", "width": 120})
- columns.extend(get_columns_for_days(filters))
-
- return columns
+ columns += [
+ _("Total Present") + ":Float:120",
+ _("Total Leaves") + ":Float:120",
+ _("Total Absent") + ":Float:120",
+ _("Total Holidays") + ":Float:120",
+ _("Unmarked Days") + ":Float:120",
+ ]
+ return columns, days
-def get_columns_for_leave_types() -> List[Dict]:
- leave_types = frappe.db.get_all("Leave Type", pluck="name")
- types = []
- for entry in leave_types:
- types.append(
- {"label": entry, "fieldname": frappe.scrub(entry), "fieldtype": "Float", "width": 120}
- )
-
- return types
-
-
-def get_columns_for_days(filters: Filters) -> List[Dict]:
- total_days = get_total_days_in_month(filters)
- days = []
-
- for day in range(1, total_days + 1):
- # forms the dates from selected year and month from filters
- date = "{}-{}-{}".format(cstr(filters.year), cstr(filters.month), cstr(day))
- # gets abbr from weekday number
- weekday = day_abbr[getdate(date).weekday()]
- # sets days as 1 Mon, 2 Tue, 3 Wed
- label = "{} {}".format(cstr(day), weekday)
- days.append({"label": label, "fieldtype": "Data", "fieldname": day, "width": 65})
-
- return days
-
-
-def get_total_days_in_month(filters: Filters) -> int:
- return monthrange(cint(filters.year), cint(filters.month))[1]
-
-
-def get_data(filters: Filters, attendance_map: Dict) -> List[Dict]:
- employee_details, group_by_param_values = get_employee_related_details(
- filters.group_by, filters.company
+def get_attendance_list(conditions, filters):
+ attendance_list = frappe.db.sql(
+ """select employee, day(attendance_date) as day_of_month,
+ status from tabAttendance where docstatus = 1 %s order by employee, attendance_date"""
+ % conditions,
+ filters,
+ as_dict=1,
)
- holiday_map = get_holiday_map(filters)
- data = []
- if filters.group_by:
- group_by_column = frappe.scrub(filters.group_by)
-
- for value in group_by_param_values:
- if not value:
- continue
-
- records = get_rows(employee_details[value], filters, holiday_map, attendance_map)
-
- if records:
- data.append({group_by_column: frappe.bold(value)})
- data.extend(records)
- else:
- data = get_rows(employee_details, filters, holiday_map, attendance_map)
-
- return data
-
-
-def get_attendance_map(filters: Filters) -> Dict:
- """Returns a dictionary of employee wise attendance map as per shifts for all the days of the month like
- {
- 'employee1': {
- 'Morning Shift': {1: 'Present', 2: 'Absent', ...}
- 'Evening Shift': {1: 'Absent', 2: 'Present', ...}
- },
- 'employee2': {
- 'Afternoon Shift': {1: 'Present', 2: 'Absent', ...}
- 'Night Shift': {1: 'Absent', 2: 'Absent', ...}
- }
- }
- """
- Attendance = frappe.qb.DocType("Attendance")
- query = (
- frappe.qb.from_(Attendance)
- .select(
- Attendance.employee,
- Extract("day", Attendance.attendance_date).as_("day_of_month"),
- Attendance.status,
- Attendance.shift,
- )
- .where(
- (Attendance.docstatus == 1)
- & (Attendance.company == filters.company)
- & (Extract("month", Attendance.attendance_date) == filters.month)
- & (Extract("year", Attendance.attendance_date) == filters.year)
- )
- )
- if filters.employee:
- query = query.where(Attendance.employee == filters.employee)
- query = query.orderby(Attendance.employee, Attendance.attendance_date)
-
- attendance_list = query.run(as_dict=1)
- attendance_map = {}
+ if not attendance_list:
+ msgprint(_("No attendance record found"), alert=True, indicator="orange")
+ att_map = {}
for d in attendance_list:
- attendance_map.setdefault(d.employee, frappe._dict()).setdefault(d.shift, frappe._dict())
- attendance_map[d.employee][d.shift][d.day_of_month] = d.status
+ att_map.setdefault(d.employee, frappe._dict()).setdefault(d.day_of_month, "")
+ att_map[d.employee][d.day_of_month] = d.status
- return attendance_map
+ return att_map
-def get_employee_related_details(group_by: str, company: str) -> Tuple[Dict, List]:
- """Returns
- 1. nested dict for employee details
- 2. list of values for the group by filter
- """
- Employee = frappe.qb.DocType("Employee")
- query = (
- frappe.qb.from_(Employee)
- .select(
- Employee.name,
- Employee.employee_name,
- Employee.designation,
- Employee.grade,
- Employee.department,
- Employee.branch,
- Employee.company,
- Employee.holiday_list,
- )
- .where(Employee.company == company)
+def get_conditions(filters):
+ if not (filters.get("month") and filters.get("year")):
+ msgprint(_("Please select month and year"), raise_exception=1)
+
+ filters["total_days_in_month"] = monthrange(cint(filters.year), cint(filters.month))[1]
+
+ conditions = " and month(attendance_date) = %(month)s and year(attendance_date) = %(year)s"
+
+ if filters.get("company"):
+ conditions += " and company = %(company)s"
+ if filters.get("employee"):
+ conditions += " and employee = %(employee)s"
+
+ return conditions, filters
+
+
+def get_employee_details(group_by, company):
+ emp_map = {}
+ query = """select name, employee_name, designation, department, branch, company,
+ holiday_list from `tabEmployee` where company = %s """ % frappe.db.escape(
+ company
)
if group_by:
group_by = group_by.lower()
- query = query.orderby(group_by)
+ query += " order by " + group_by + " ASC"
- employee_details = query.run(as_dict=True)
-
- group_by_param_values = []
- emp_map = {}
+ employee_details = frappe.db.sql(query, as_dict=1)
+ group_by_parameters = []
if group_by:
- for parameter, employees in groupby(employee_details, key=lambda d: d[group_by]):
- group_by_param_values.append(parameter)
- emp_map.setdefault(parameter, frappe._dict())
- for emp in employees:
- emp_map[parameter][emp.name] = emp
+ group_by_parameters = list(
+ set(detail.get(group_by, "") for detail in employee_details if detail.get(group_by, ""))
+ )
+ for parameter in group_by_parameters:
+ emp_map[parameter] = {}
+
+ for d in employee_details:
+ if group_by and len(group_by_parameters):
+ if d.get(group_by, None):
+
+ emp_map[d.get(group_by)][d.name] = d
+ else:
+ emp_map[d.name] = d
+
+ if not group_by:
+ return emp_map
else:
- for emp in employee_details:
- emp_map[emp.name] = emp
-
- return emp_map, group_by_param_values
+ return emp_map, group_by_parameters
-def get_holiday_map(filters: Filters) -> Dict[str, List[Dict]]:
- """
- Returns a dict of holidays falling in the filter month and year
- with list name as key and list of holidays as values like
- {
- 'Holiday List 1': [
- {'day_of_month': '0' , 'weekly_off': 1},
- {'day_of_month': '1', 'weekly_off': 0}
- ],
- 'Holiday List 2': [
- {'day_of_month': '0' , 'weekly_off': 1},
- {'day_of_month': '1', 'weekly_off': 0}
- ]
- }
- """
- # add default holiday list too
- holiday_lists = frappe.db.get_all("Holiday List", pluck="name")
- default_holiday_list = frappe.get_cached_value("Company", filters.company, "default_holiday_list")
- holiday_lists.append(default_holiday_list)
-
+def get_holiday(holiday_list, month):
holiday_map = frappe._dict()
- Holiday = frappe.qb.DocType("Holiday")
-
- for d in holiday_lists:
- if not d:
- continue
-
- holidays = (
- frappe.qb.from_(Holiday)
- .select(Extract("day", Holiday.holiday_date).as_("day_of_month"), Holiday.weekly_off)
- .where(
- (Holiday.parent == d)
- & (Extract("month", Holiday.holiday_date) == filters.month)
- & (Extract("year", Holiday.holiday_date) == filters.year)
+ for d in holiday_list:
+ if d:
+ holiday_map.setdefault(
+ d,
+ frappe.db.sql(
+ """select day(holiday_date), weekly_off from `tabHoliday`
+ where parent=%s and month(holiday_date)=%s""",
+ (d, month),
+ ),
)
- ).run(as_dict=True)
-
- holiday_map.setdefault(d, holidays)
return holiday_map
-def get_rows(
- employee_details: Dict, filters: Filters, holiday_map: Dict, attendance_map: Dict
-) -> List[Dict]:
- records = []
- default_holiday_list = frappe.get_cached_value("Company", filters.company, "default_holiday_list")
-
- for employee, details in employee_details.items():
- emp_holiday_list = details.holiday_list or default_holiday_list
- holidays = holiday_map.get(emp_holiday_list)
-
- if filters.summarized_view:
- attendance = get_attendance_status_for_summarized_view(employee, filters, holidays)
- if not attendance:
- continue
-
- leave_summary = get_leave_summary(employee, filters)
- entry_exits_summary = get_entry_exits_summary(employee, filters)
-
- row = {"employee": employee, "employee_name": details.employee_name}
- set_defaults_for_summarized_view(filters, row)
- row.update(attendance)
- row.update(leave_summary)
- row.update(entry_exits_summary)
-
- records.append(row)
- else:
- employee_attendance = attendance_map.get(employee)
- if not employee_attendance:
- continue
-
- attendance_for_employee = get_attendance_status_for_detailed_view(
- employee, filters, employee_attendance, holidays
- )
- # set employee details in the first row
- attendance_for_employee[0].update(
- {"employee": employee, "employee_name": details.employee_name}
- )
-
- records.extend(attendance_for_employee)
-
- return records
-
-
-def set_defaults_for_summarized_view(filters, row):
- for entry in get_columns(filters):
- if entry.get("fieldtype") == "Float":
- row[entry.get("fieldname")] = 0.0
-
-
-def get_attendance_status_for_summarized_view(
- employee: str, filters: Filters, holidays: List
-) -> Dict:
- """Returns dict of attendance status for employee like
- {'total_present': 1.5, 'total_leaves': 0.5, 'total_absent': 13.5, 'total_holidays': 8, 'unmarked_days': 5}
- """
- summary, attendance_days = get_attendance_summary_and_days(employee, filters)
- if not any(summary.values()):
- return {}
-
- total_days = get_total_days_in_month(filters)
- total_holidays = total_unmarked_days = 0
-
- for day in range(1, total_days + 1):
- if day in attendance_days:
- continue
-
- status = get_holiday_status(day, holidays)
- if status in ["Weekly Off", "Holiday"]:
- total_holidays += 1
- elif not status:
- total_unmarked_days += 1
-
- return {
- "total_present": summary.total_present + summary.total_half_days,
- "total_leaves": summary.total_leaves + summary.total_half_days,
- "total_absent": summary.total_absent + summary.total_half_days,
- "total_holidays": total_holidays,
- "unmarked_days": total_unmarked_days,
- }
-
-
-def get_attendance_summary_and_days(employee: str, filters: Filters) -> Tuple[Dict, List]:
- Attendance = frappe.qb.DocType("Attendance")
-
- present_case = (
- frappe.qb.terms.Case()
- .when(((Attendance.status == "Present") | (Attendance.status == "Work From Home")), 1)
- .else_(0)
- )
- sum_present = Sum(present_case).as_("total_present")
-
- absent_case = frappe.qb.terms.Case().when(Attendance.status == "Absent", 1).else_(0)
- sum_absent = Sum(absent_case).as_("total_absent")
-
- leave_case = frappe.qb.terms.Case().when(Attendance.status == "On Leave", 1).else_(0)
- sum_leave = Sum(leave_case).as_("total_leaves")
-
- half_day_case = frappe.qb.terms.Case().when(Attendance.status == "Half Day", 0.5).else_(0)
- sum_half_day = Sum(half_day_case).as_("total_half_days")
-
- summary = (
- frappe.qb.from_(Attendance)
- .select(
- sum_present,
- sum_absent,
- sum_leave,
- sum_half_day,
- )
- .where(
- (Attendance.docstatus == 1)
- & (Attendance.employee == employee)
- & (Attendance.company == filters.company)
- & (Extract("month", Attendance.attendance_date) == filters.month)
- & (Extract("year", Attendance.attendance_date) == filters.year)
- )
- ).run(as_dict=True)
-
- days = (
- frappe.qb.from_(Attendance)
- .select(Extract("day", Attendance.attendance_date).as_("day_of_month"))
- .distinct()
- .where(
- (Attendance.docstatus == 1)
- & (Attendance.employee == employee)
- & (Attendance.company == filters.company)
- & (Extract("month", Attendance.attendance_date) == filters.month)
- & (Extract("year", Attendance.attendance_date) == filters.year)
- )
- ).run(pluck=True)
-
- return summary[0], days
-
-
-def get_attendance_status_for_detailed_view(
- employee: str, filters: Filters, employee_attendance: Dict, holidays: List
-) -> List[Dict]:
- """Returns list of shift-wise attendance status for employee
- [
- {'shift': 'Morning Shift', 1: 'A', 2: 'P', 3: 'A'....},
- {'shift': 'Evening Shift', 1: 'P', 2: 'A', 3: 'P'....}
- ]
- """
- total_days = get_total_days_in_month(filters)
- attendance_values = []
-
- for shift, status_dict in employee_attendance.items():
- row = {"shift": shift}
-
- for day in range(1, total_days + 1):
- status = status_dict.get(day)
- if status is None and holidays:
- status = get_holiday_status(day, holidays)
-
- abbr = status_map.get(status, "")
- row[day] = abbr
-
- attendance_values.append(row)
-
- return attendance_values
-
-
-def get_holiday_status(day: int, holidays: List) -> str:
- status = None
- for holiday in holidays:
- if day == holiday.get("day_of_month"):
- if holiday.get("weekly_off"):
- status = "Weekly Off"
- else:
- status = "Holiday"
- break
- return status
-
-
-def get_leave_summary(employee: str, filters: Filters) -> Dict[str, float]:
- """Returns a dict of leave type and corresponding leaves taken by employee like:
- {'leave_without_pay': 1.0, 'sick_leave': 2.0}
- """
- Attendance = frappe.qb.DocType("Attendance")
- day_case = frappe.qb.terms.Case().when(Attendance.status == "Half Day", 0.5).else_(1)
- sum_leave_days = Sum(day_case).as_("leave_days")
-
- leave_details = (
- frappe.qb.from_(Attendance)
- .select(Attendance.leave_type, sum_leave_days)
- .where(
- (Attendance.employee == employee)
- & (Attendance.docstatus == 1)
- & (Attendance.company == filters.company)
- & ((Attendance.leave_type.isnotnull()) | (Attendance.leave_type != ""))
- & (Extract("month", Attendance.attendance_date) == filters.month)
- & (Extract("year", Attendance.attendance_date) == filters.year)
- )
- .groupby(Attendance.leave_type)
- ).run(as_dict=True)
-
- leaves = {}
- for d in leave_details:
- leave_type = frappe.scrub(d.leave_type)
- leaves[leave_type] = d.leave_days
-
- return leaves
-
-
-def get_entry_exits_summary(employee: str, filters: Filters) -> Dict[str, float]:
- """Returns total late entries and total early exits for employee like:
- {'total_late_entries': 5, 'total_early_exits': 2}
- """
- Attendance = frappe.qb.DocType("Attendance")
-
- late_entry_case = frappe.qb.terms.Case().when(Attendance.late_entry == "1", "1")
- count_late_entries = Count(late_entry_case).as_("total_late_entries")
-
- early_exit_case = frappe.qb.terms.Case().when(Attendance.early_exit == "1", "1")
- count_early_exits = Count(early_exit_case).as_("total_early_exits")
-
- entry_exits = (
- frappe.qb.from_(Attendance)
- .select(count_late_entries, count_early_exits)
- .where(
- (Attendance.docstatus == 1)
- & (Attendance.employee == employee)
- & (Attendance.company == filters.company)
- & (Extract("month", Attendance.attendance_date) == filters.month)
- & (Extract("year", Attendance.attendance_date) == filters.year)
- )
- ).run(as_dict=True)
-
- return entry_exits[0]
-
-
@frappe.whitelist()
-def get_attendance_years() -> str:
- """Returns all the years for which attendance records exist"""
- Attendance = frappe.qb.DocType("Attendance")
- year_list = (
- frappe.qb.from_(Attendance)
- .select(Extract("year", Attendance.attendance_date).as_("year"))
- .distinct()
- ).run(as_dict=True)
-
- if year_list:
- year_list.sort(key=lambda d: d.year, reverse=True)
- else:
+def get_attendance_years():
+ year_list = frappe.db.sql_list(
+ """select distinct YEAR(attendance_date) from tabAttendance ORDER BY YEAR(attendance_date) DESC"""
+ )
+ if not year_list:
year_list = [getdate().year]
- return "\n".join(cstr(entry.year) for entry in year_list)
-
-
-def get_chart_data(attendance_map: Dict, filters: Filters) -> Dict:
- days = get_columns_for_days(filters)
- labels = []
- absent = []
- present = []
- leave = []
-
- for day in days:
- labels.append(day["label"])
- total_absent_on_day = total_leaves_on_day = total_present_on_day = 0
-
- for employee, attendance_dict in attendance_map.items():
- for shift, attendance in attendance_dict.items():
- attendance_on_day = attendance.get(day["fieldname"])
-
- if attendance_on_day == "Absent":
- total_absent_on_day += 1
- elif attendance_on_day in ["Present", "Work From Home"]:
- total_present_on_day += 1
- elif attendance_on_day == "Half Day":
- total_present_on_day += 0.5
- total_leaves_on_day += 0.5
- elif attendance_on_day == "On Leave":
- total_leaves_on_day += 1
-
- absent.append(total_absent_on_day)
- present.append(total_present_on_day)
- leave.append(total_leaves_on_day)
-
- return {
- "data": {
- "labels": labels,
- "datasets": [
- {"name": "Absent", "values": absent},
- {"name": "Present", "values": present},
- {"name": "Leave", "values": leave},
- ],
- },
- "type": "line",
- "colors": ["red", "green", "blue"],
- }
+ return "\n".join(str(year) for year in year_list)
diff --git a/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py b/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py
index cde7dd3fff..91da08eee5 100644
--- a/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py
+++ b/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py
@@ -1,32 +1,18 @@
import frappe
from dateutil.relativedelta import relativedelta
from frappe.tests.utils import FrappeTestCase
-from frappe.utils import get_year_ending, get_year_start, getdate, now_datetime
+from frappe.utils import now_datetime
from erpnext.hr.doctype.attendance.attendance import mark_attendance
from erpnext.hr.doctype.employee.test_employee import make_employee
-from erpnext.hr.doctype.holiday_list.test_holiday_list import set_holiday_list
-from erpnext.hr.doctype.leave_application.test_leave_application import make_allocation_record
from erpnext.hr.report.monthly_attendance_sheet.monthly_attendance_sheet import execute
-from erpnext.payroll.doctype.salary_slip.test_salary_slip import (
- make_holiday_list,
- make_leave_application,
-)
-
-test_dependencies = ["Shift Type"]
class TestMonthlyAttendanceSheet(FrappeTestCase):
def setUp(self):
- self.employee = make_employee("test_employee@example.com", company="_Test Company")
- frappe.db.delete("Attendance")
+ self.employee = make_employee("test_employee@example.com")
+ frappe.db.delete("Attendance", {"employee": self.employee})
- date = getdate()
- from_date = get_year_start(date)
- to_date = get_year_ending(date)
- make_holiday_list(from_date=from_date, to_date=to_date)
-
- @set_holiday_list("Salary Slip Test Holiday List", "_Test Company")
def test_monthly_attendance_sheet_report(self):
now = now_datetime()
previous_month = now.month - 1
@@ -47,203 +33,14 @@ class TestMonthlyAttendanceSheet(FrappeTestCase):
}
)
report = execute(filters=filters)
-
- record = report[1][0]
+ employees = report[1][0]
datasets = report[3]["data"]["datasets"]
absent = datasets[0]["values"]
present = datasets[1]["values"]
leaves = datasets[2]["values"]
- # ensure correct attendance is reflected on the report
- self.assertEqual(self.employee, record.get("employee"))
+ # ensure correct attendance is reflect on the report
+ self.assertIn(self.employee, employees)
self.assertEqual(absent[0], 1)
self.assertEqual(present[1], 1)
self.assertEqual(leaves[2], 1)
-
- @set_holiday_list("Salary Slip Test Holiday List", "_Test Company")
- def test_monthly_attendance_sheet_with_detailed_view(self):
- now = now_datetime()
- previous_month = now.month - 1
- previous_month_first = now.replace(day=1).replace(month=previous_month).date()
-
- company = frappe.db.get_value("Employee", self.employee, "company")
-
- # attendance with shift
- mark_attendance(self.employee, previous_month_first, "Absent", "Day Shift")
- mark_attendance(
- self.employee, previous_month_first + relativedelta(days=1), "Present", "Day Shift"
- )
-
- # attendance without shift
- mark_attendance(self.employee, previous_month_first + relativedelta(days=2), "On Leave")
- mark_attendance(self.employee, previous_month_first + relativedelta(days=3), "Present")
-
- filters = frappe._dict(
- {
- "month": previous_month,
- "year": now.year,
- "company": company,
- }
- )
- report = execute(filters=filters)
-
- day_shift_row = report[1][0]
- row_without_shift = report[1][1]
-
- self.assertEqual(day_shift_row["shift"], "Day Shift")
- self.assertEqual(day_shift_row[1], "A") # absent on the 1st day of the month
- self.assertEqual(day_shift_row[2], "P") # present on the 2nd day
-
- self.assertEqual(row_without_shift["shift"], None)
- self.assertEqual(row_without_shift[3], "L") # on leave on the 3rd day
- self.assertEqual(row_without_shift[4], "P") # present on the 4th day
-
- @set_holiday_list("Salary Slip Test Holiday List", "_Test Company")
- def test_monthly_attendance_sheet_with_summarized_view(self):
- now = now_datetime()
- previous_month = now.month - 1
- previous_month_first = now.replace(day=1).replace(month=previous_month).date()
-
- company = frappe.db.get_value("Employee", self.employee, "company")
-
- # attendance with shift
- mark_attendance(self.employee, previous_month_first, "Absent", "Day Shift")
- mark_attendance(
- self.employee, previous_month_first + relativedelta(days=1), "Present", "Day Shift"
- )
- mark_attendance(
- self.employee, previous_month_first + relativedelta(days=2), "Half Day"
- ) # half day
-
- mark_attendance(
- self.employee, previous_month_first + relativedelta(days=3), "Present"
- ) # attendance without shift
- mark_attendance(
- self.employee, previous_month_first + relativedelta(days=4), "Present", late_entry=1
- ) # late entry
- mark_attendance(
- self.employee, previous_month_first + relativedelta(days=5), "Present", early_exit=1
- ) # early exit
-
- leave_application = get_leave_application(self.employee)
-
- filters = frappe._dict(
- {"month": previous_month, "year": now.year, "company": company, "summarized_view": 1}
- )
- report = execute(filters=filters)
-
- row = report[1][0]
- self.assertEqual(row["employee"], self.employee)
-
- # 4 present + half day absent 0.5
- self.assertEqual(row["total_present"], 4.5)
- # 1 present + half day absent 0.5
- self.assertEqual(row["total_absent"], 1.5)
- # leave days + half day leave 0.5
- self.assertEqual(row["total_leaves"], leave_application.total_leave_days + 0.5)
-
- self.assertEqual(row["_test_leave_type"], leave_application.total_leave_days)
- self.assertEqual(row["total_late_entries"], 1)
- self.assertEqual(row["total_early_exits"], 1)
-
- @set_holiday_list("Salary Slip Test Holiday List", "_Test Company")
- def test_attendance_with_group_by_filter(self):
- now = now_datetime()
- previous_month = now.month - 1
- previous_month_first = now.replace(day=1).replace(month=previous_month).date()
-
- company = frappe.db.get_value("Employee", self.employee, "company")
-
- # attendance with shift
- mark_attendance(self.employee, previous_month_first, "Absent", "Day Shift")
- mark_attendance(
- self.employee, previous_month_first + relativedelta(days=1), "Present", "Day Shift"
- )
-
- # attendance without shift
- mark_attendance(self.employee, previous_month_first + relativedelta(days=2), "On Leave")
- mark_attendance(self.employee, previous_month_first + relativedelta(days=3), "Present")
-
- filters = frappe._dict(
- {"month": previous_month, "year": now.year, "company": company, "group_by": "Department"}
- )
- report = execute(filters=filters)
-
- department = frappe.db.get_value("Employee", self.employee, "department")
- department_row = report[1][0]
- self.assertIn(department, department_row["department"])
-
- day_shift_row = report[1][1]
- row_without_shift = report[1][2]
-
- self.assertEqual(day_shift_row["shift"], "Day Shift")
- self.assertEqual(day_shift_row[1], "A") # absent on the 1st day of the month
- self.assertEqual(day_shift_row[2], "P") # present on the 2nd day
-
- self.assertEqual(row_without_shift["shift"], None)
- self.assertEqual(row_without_shift[3], "L") # on leave on the 3rd day
- self.assertEqual(row_without_shift[4], "P") # present on the 4th day
-
- def test_attendance_with_employee_filter(self):
- now = now_datetime()
- previous_month = now.month - 1
- previous_month_first = now.replace(day=1).replace(month=previous_month).date()
-
- company = frappe.db.get_value("Employee", self.employee, "company")
-
- # mark different attendance status on first 3 days of previous month
- mark_attendance(self.employee, previous_month_first, "Absent")
- mark_attendance(self.employee, previous_month_first + relativedelta(days=1), "Present")
- mark_attendance(self.employee, previous_month_first + relativedelta(days=2), "On Leave")
-
- filters = frappe._dict(
- {"month": previous_month, "year": now.year, "company": company, "employee": self.employee}
- )
- report = execute(filters=filters)
-
- record = report[1][0]
- datasets = report[3]["data"]["datasets"]
- absent = datasets[0]["values"]
- present = datasets[1]["values"]
- leaves = datasets[2]["values"]
-
- # ensure correct attendance is reflected on the report
- self.assertEqual(self.employee, record.get("employee"))
- self.assertEqual(absent[0], 1)
- self.assertEqual(present[1], 1)
- self.assertEqual(leaves[2], 1)
-
- @set_holiday_list("Salary Slip Test Holiday List", "_Test Company")
- def test_validations(self):
- # validation error for filters without month and year
- self.assertRaises(frappe.ValidationError, execute_report_with_invalid_filters)
-
- # execute report without attendance record
- now = now_datetime()
- previous_month = now.month - 1
-
- company = frappe.db.get_value("Employee", self.employee, "company")
- filters = frappe._dict(
- {"month": previous_month, "year": now.year, "company": company, "group_by": "Department"}
- )
- report = execute(filters=filters)
- self.assertEqual(report, ([], [], None, None))
-
-
-def get_leave_application(employee):
- now = now_datetime()
- previous_month = now.month - 1
-
- date = getdate()
- year_start = getdate(get_year_start(date))
- year_end = getdate(get_year_ending(date))
- make_allocation_record(employee=employee, from_date=year_start, to_date=year_end)
-
- from_date = now.replace(day=7).replace(month=previous_month).date()
- to_date = now.replace(day=8).replace(month=previous_month).date()
- return make_leave_application(employee, from_date, to_date, "_Test Leave Type")
-
-
-def execute_report_with_invalid_filters():
- filters = frappe._dict({"company": "_Test Company", "group_by": "Department"})
- execute(filters=filters)