refactor: mark absent for employees with no attendance

- break down into smaller functions

- make it work with multiple shifts

- this will mark employee as absent per shift, meaning employee can be present for one shift and absent for another on the same day
This commit is contained in:
Rucha Mahabal 2022-02-23 09:50:11 +05:30
parent 742c8f0790
commit 4ef2911953

View File

@ -3,21 +3,23 @@
import itertools
from datetime import timedelta
from datetime import datetime, timedelta
import frappe
from frappe.model.document import Document
from frappe.utils import cint, get_datetime, getdate
from frappe.utils import cint, get_datetime, get_time, getdate
from erpnext.buying.doctype.supplier_scorecard.supplier_scorecard import daterange
from erpnext.hr.doctype.attendance.attendance import mark_attendance
from erpnext.hr.doctype.employee.employee import get_holiday_list_for_employee
from erpnext.hr.doctype.employee_checkin.employee_checkin import (
calculate_working_hours,
mark_attendance_and_link_log,
)
from erpnext.hr.doctype.holiday_list.holiday_list import is_holiday
from erpnext.hr.doctype.shift_assignment.shift_assignment import (
get_actual_start_end_datetime_of_shift,
get_employee_shift,
get_shift_details
)
@ -90,46 +92,60 @@ class ShiftType(Document):
"""Marks Absents for the given employee on working days in this shift which have no attendance marked.
The Absent is marked starting from 'process_attendance_after' or employee creation date.
"""
date_of_joining, relieving_date, employee_creation = frappe.db.get_value(
"Employee", employee, ["date_of_joining", "relieving_date", "creation"]
)
if not date_of_joining:
date_of_joining = employee_creation.date()
start_date = max(getdate(self.process_attendance_after), date_of_joining)
actual_shift_datetime = get_actual_start_end_datetime_of_shift(employee, get_datetime(self.last_sync_of_checkin), True)
last_shift_time = actual_shift_datetime.actual_start if actual_shift_datetime else get_datetime(self.last_sync_of_checkin)
prev_shift = get_employee_shift(employee, last_shift_time - timedelta(days=1), True, 'reverse')
if prev_shift:
end_date = (
min(prev_shift.start_datetime.date(), relieving_date)
if relieving_date
else prev_shift.start_datetime.date()
)
else:
start_date, end_date = self.get_start_and_end_dates(employee)
# no shift assignment found, no need to process absent attendance records
if end_date is None:
return
holiday_list_name = self.holiday_list
if not holiday_list_name:
holiday_list_name = get_holiday_list_for_employee(employee, False)
dates = get_filtered_date_list(employee, start_date, end_date, holiday_list=holiday_list_name)
for date in dates:
shift_details = get_employee_shift(employee, get_datetime(date), True)
start_time = get_time(self.start_time)
for date in daterange(getdate(start_date), getdate(end_date)):
if is_holiday(holiday_list_name, date):
# skip marking absent on a holiday
continue
timestamp = datetime.combine(date, start_time)
shift_details = get_employee_shift(employee, timestamp, True)
if shift_details and shift_details.shift_type.name == self.name:
mark_attendance(employee, date, "Absent", self.name)
def get_assigned_employee(self, from_date=None, consider_default_shift=False):
filters = {"start_date": (">", from_date), "shift_type": self.name, "docstatus": "1"}
if not from_date:
del filters["start_date"]
def get_start_and_end_dates(self, employee):
date_of_joining, relieving_date, employee_creation = frappe.db.get_value("Employee", employee,
["date_of_joining", "relieving_date", "creation"])
assigned_employees = frappe.get_all("Shift Assignment", "employee", filters, as_list=True)
assigned_employees = [x[0] for x in assigned_employees]
if not date_of_joining:
date_of_joining = employee_creation.date()
start_date = max(getdate(self.process_attendance_after), date_of_joining)
end_date = None
shift_details = get_shift_details(self.name, get_datetime(self.last_sync_of_checkin))
last_shift_time = shift_details.actual_start if shift_details else get_datetime(self.last_sync_of_checkin)
prev_shift = get_employee_shift(employee, last_shift_time - timedelta(days=1), True, 'reverse')
if prev_shift:
end_date = min(prev_shift.start_datetime.date(), relieving_date) if relieving_date else prev_shift.start_datetime.date()
return start_date, end_date
def get_assigned_employee(self, from_date=None, consider_default_shift=False):
filters = {'shift_type': self.name, 'docstatus': '1'}
if from_date:
filters['start_date'] = ('>', from_date)
assigned_employees = frappe.get_all('Shift Assignment', filters=filters, pluck='employee')
if consider_default_shift:
filters = {"default_shift": self.name, "status": ["!=", "Inactive"]}
default_shift_employees = frappe.get_all("Employee", "name", filters, as_list=True)
default_shift_employees = [x[0] for x in default_shift_employees]
return list(set(assigned_employees + default_shift_employees))
filters = {'default_shift': self.name, 'status': ['!=', 'Inactive']}
default_shift_employees = frappe.get_all('Employee', filters=filters, pluck='name')
return list(set(assigned_employees+default_shift_employees))
return assigned_employees
@ -138,42 +154,3 @@ def process_auto_attendance_for_all_shifts():
for shift in shift_list:
doc = frappe.get_doc("Shift Type", shift[0])
doc.process_auto_attendance()
def get_filtered_date_list(
employee, start_date, end_date, filter_attendance=True, holiday_list=None
):
"""Returns a list of dates after removing the dates with attendance and holidays"""
base_dates_query = """select adddate(%(start_date)s, t2.i*100 + t1.i*10 + t0.i) selected_date from
(select 0 i union select 1 union select 2 union select 3 union select 4 union select 5 union select 6 union select 7 union select 8 union select 9) t0,
(select 0 i union select 1 union select 2 union select 3 union select 4 union select 5 union select 6 union select 7 union select 8 union select 9) t1,
(select 0 i union select 1 union select 2 union select 3 union select 4 union select 5 union select 6 union select 7 union select 8 union select 9) t2"""
condition_query = ""
if filter_attendance:
condition_query += """ and a.selected_date not in (
select attendance_date from `tabAttendance`
where docstatus = 1 and employee = %(employee)s
and attendance_date between %(start_date)s and %(end_date)s)"""
if holiday_list:
condition_query += """ and a.selected_date not in (
select holiday_date from `tabHoliday` where parenttype = 'Holiday List' and
parentfield = 'holidays' and parent = %(holiday_list)s
and holiday_date between %(start_date)s and %(end_date)s)"""
dates = frappe.db.sql(
"""select * from
({base_dates_query}) as a
where a.selected_date <= %(end_date)s {condition_query}
""".format(
base_dates_query=base_dates_query, condition_query=condition_query
),
{
"employee": employee,
"start_date": start_date,
"end_date": end_date,
"holiday_list": holiday_list,
},
as_list=True,
)
return [getdate(date[0]) for date in dates]