diff --git a/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py b/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py index 299b092eae..a98afe41cc 100644 --- a/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py +++ b/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py @@ -7,10 +7,9 @@ from itertools import groupby from typing import Dict, List, Optional, Tuple import frappe -from frappe import _, msgprint -from frappe.utils import cint, cstr, getdate - +from frappe import _ from frappe.query_builder.functions import Count, Extract, Sum +from frappe.utils import cint, cstr, getdate Filters = frappe._dict @@ -32,10 +31,16 @@ def execute(filters: Optional[Filters] = None) -> Tuple: if not (filters.month and filters.year): frappe.throw(_('Please select month and year.')) + attendance_map = get_attendance_map(filters) + if not attendance_map: + frappe.msgprint(_('No attendance records found.'), alert=True, indicator='orange') + return [], [], None, None + columns = get_columns(filters) - data, attendance_map = get_data(filters) + data = get_data(filters, attendance_map) if not data: + frappe.msgprint(_('No attendance records found for this criteria.'), alert=True, indicator='orange') return columns, [], None, None message = get_message() if not filters.summarized_view else '' @@ -69,24 +74,25 @@ def get_columns(filters: Filters) -> List[Dict]: ) columns.extend([ - {'label': _('Employee'), 'fieldname': 'employee', 'fieldtype': 'Link', 'options': 'Employee', 'width': 120}, + {'label': _('Employee'), 'fieldname': 'employee', 'fieldtype': 'Link', 'options': 'Employee', 'width': 135}, {'label': _('Employee Name'), 'fieldname': 'employee_name', 'fieldtype': 'Data', 'width': 120} ]) if filters.summarized_view: columns.extend([ - {'label': _('Total Present'), 'fieldname': 'total_present', 'fieldtype': 'Float', 'width': 120}, - {'label': _('Total Leaves'), 'fieldname': 'total_leaves', 'fieldtype': 'Float', 'width': 120}, - {'label': _('Total Absent'), 'fieldname': 'total_absent', 'fieldtype': 'Float', 'width': 120}, + {'label': _('Total Present'), 'fieldname': 'total_present', 'fieldtype': 'Float', 'width': 110}, + {'label': _('Total Leaves'), 'fieldname': 'total_leaves', 'fieldtype': 'Float', 'width': 110}, + {'label': _('Total Absent'), 'fieldname': 'total_absent', 'fieldtype': 'Float', 'width': 110}, {'label': _('Total Holidays'), 'fieldname': 'total_holidays', 'fieldtype': 'Float', 'width': 120}, - {'label': _('Unmarked Days'), 'fieldname': 'unmarked_days', 'fieldtype': 'Float', 'width': 120} + {'label': _('Unmarked Days'), 'fieldname': 'unmarked_days', 'fieldtype': 'Float', 'width': 130} ]) columns.extend(get_columns_for_leave_types()) columns.extend([ - {'label': _('Total Late Entries'), 'fieldname': 'total_late_entries', 'fieldtype': 'Float', 'width': 120}, - {'label': _('Total Early Exits'), 'fieldname': 'total_early_exits', 'fieldtype': 'Float', 'width': 120} + {'label': _('Total Late Entries'), 'fieldname': 'total_late_entries', 'fieldtype': 'Float', 'width': 140}, + {'label': _('Total Early Exits'), 'fieldname': 'total_early_exits', 'fieldtype': 'Float', 'width': 140} ]) else: + columns.append({'label': _('Shift'), 'fieldname': 'shift', 'fieldtype': 'Data', 'width': 120}) columns.extend(get_columns_for_days(filters)) return columns @@ -130,16 +136,9 @@ def get_total_days_in_month(filters: Filters) -> int: return monthrange(cint(filters.year), cint(filters.month))[1] -def get_data(filters: Filters) -> Tuple[List, Dict]: - attendance_map = get_attendance_map(filters) - - if not attendance_map: - frappe.msgprint(_('No attendance records found.'), alert=True, indicator='orange') - return [], {} - +def get_data(filters: Filters, attendance_map: Dict) -> List[Dict]: employee_details, group_by_param_values = get_employee_related_details(filters.group_by, filters.company) holiday_map = get_holiday_map(filters) - data = [] if filters.group_by: @@ -149,12 +148,7 @@ def get_data(filters: Filters) -> Tuple[List, Dict]: if not value: continue - records = get_rows( - employee_details[value], - attendance_map, - filters, - holiday_map - ) + records = get_rows(employee_details[value], filters, holiday_map, attendance_map) if records: data.append({ @@ -162,30 +156,21 @@ def get_data(filters: Filters) -> Tuple[List, Dict]: }) data.extend(records) else: - data = get_rows( - employee_details, - attendance_map, - filters, - holiday_map - ) + data = get_rows(employee_details, filters, holiday_map, attendance_map) - if not data: - frappe.msgprint(_('No attendance records found for this criteria.'), alert=True, indicator='orange') - return [], {} - - return data, attendance_map + return data -def get_attendance_map(filters: Filters) -> Dict[str, Dict[int, str]]: - """Returns a dictionary of employee wise attendance map for all the days of the month like +def get_attendance_map(filters: Filters) -> Dict: + """Returns a dictionary of employee wise attendance map as per shifts for all the days of the month like { 'employee1': { - 1: 'Present', - 2: 'Absent' + 'Morning Shift': {1: 'Present', 2: 'Absent', ...} + 'Evening Shift': {1: 'Absent', 2: 'Present', ...} }, 'employee2': { - 1: 'Absent', - 2: 'Present' + 'Afternoon Shift': {1: 'Present', 2: 'Absent', ...} + 'Night Shift': {1: 'Absent', 2: 'Absent', ...} } } """ @@ -195,7 +180,8 @@ def get_attendance_map(filters: Filters) -> Dict[str, Dict[int, str]]: .select( Attendance.employee, Extract('day', Attendance.attendance_date).as_('day_of_month'), - Attendance.status + Attendance.status, + Attendance.shift ).where( (Attendance.docstatus == 1) & (Attendance.company == filters.company) @@ -205,18 +191,14 @@ def get_attendance_map(filters: Filters) -> Dict[str, Dict[int, str]]: ) if filters.employee: query = query.where(Attendance.employee == filters.employee) - query = query.orderby(Attendance.employee, Attendance.attendance_date) attendance_list = query.run(as_dict=1) - - if not attendance_list: - frappe.msgprint(_('No attendance records found'), alert=True, indicator='orange') - attendance_map = {} + for d in attendance_list: - attendance_map.setdefault(d.employee, frappe._dict()).setdefault(d.day_of_month, '') - attendance_map[d.employee][d.day_of_month] = d.status + attendance_map.setdefault(d.employee, frappe._dict()).setdefault(d.shift, frappe._dict()) + attendance_map[d.employee][d.shift][d.day_of_month] = d.status return attendance_map @@ -304,86 +286,150 @@ def get_holiday_map(filters: Filters) -> Dict[str, List[Dict]]: return holiday_map -def get_rows(employee_details: Dict, attendance_map: Dict, filters: Filters, holiday_map: Dict) -> List[Dict]: +def get_rows(employee_details: Dict, filters: Filters, holiday_map: Dict, attendance_map: Dict) -> List[Dict]: records = [] default_holiday_list = frappe.get_cached_value('Company', filters.company, 'default_holiday_list') - employees_with_attendance = list(attendance_map.keys()) for employee, details in employee_details.items(): - if employee not in employees_with_attendance: - continue - - row = { - 'employee': employee, - 'employee_name': details.employee_name - } - - if filters.summarized_view: - # set defaults for summarized view - for entry in get_columns(filters): - if entry.get('fieldtype') == 'Float': - row[entry.get('fieldname')] = 0.0 - emp_holiday_list = details.holiday_list or default_holiday_list holidays = holiday_map[emp_holiday_list] - attendance_for_employee = get_attendance_status_for_employee(employee, filters, attendance_map, holidays) - row.update(attendance_for_employee) - if filters.summarized_view: + attendance = get_attendance_status_for_summarized_view(employee, filters, holidays) + if not attendance: + continue + leave_summary = get_leave_summary(employee, filters) entry_exits_summary = get_entry_exits_summary(employee, filters) + row = {'employee': employee, 'employee_name': details.employee_name} + set_defaults_for_summarized_view(filters, row) + row.update(attendance) row.update(leave_summary) row.update(entry_exits_summary) - records.append(row) + records.append(row) + else: + employee_attendance = attendance_map.get(employee) + if not employee_attendance: + continue + + attendance_for_employee = get_attendance_status_for_detailed_view(employee, filters, employee_attendance, holidays) + # set employee details in the first row + attendance_for_employee[0].update({ + 'employee': employee, + 'employee_name': details.employee_name + }) + + records.extend(attendance_for_employee) return records -def get_attendance_status_for_employee(employee: str, filters: Filters, attendance_map: Dict, holidays: List) -> Dict: - """Returns dict of attendance status for employee - - for summarized view: {'total_present': 1.5, 'total_leaves': 0.5, 'total_absent': 13.5, 'total_holidays': 8, 'unmarked_days': 5} - - for detailed view (day wise): {1: 'A', 2: 'P', 3: 'A'....} +def set_defaults_for_summarized_view(filters, row): + for entry in get_columns(filters): + if entry.get('fieldtype') == 'Float': + row[entry.get('fieldname')] = 0.0 + + +def get_attendance_status_for_summarized_view(employee: str, filters: Filters, holidays: List) -> Dict: + """Returns dict of attendance status for employee like + {'total_present': 1.5, 'total_leaves': 0.5, 'total_absent': 13.5, 'total_holidays': 8, 'unmarked_days': 5} """ - emp_attendance = {} + summary, attendance_days = get_attendance_summary_and_days(employee, filters) + if not any(summary.values()): + return {} total_days = get_total_days_in_month(filters) - totals = {'total_present': 0, 'total_leaves': 0, 'total_absent': 0, 'total_holidays': 0, 'unmarked_days': 0} + total_holidays = total_unmarked_days = 0 for day in range(1, total_days + 1): - status = None - employee_attendance = attendance_map.get(employee) - if employee_attendance: - status = employee_attendance.get(day) + if day in attendance_days: + continue - if status is None and holidays: - status = get_holiday_status(day, holidays) + status = get_holiday_status(day, holidays) + if status in ['Weekly Off', 'Holiday']: + total_holidays += 1 + elif not status: + total_unmarked_days += 1 + + return { + 'total_present': summary.total_present + summary.total_half_days, + 'total_leaves': summary.total_leaves + summary.total_half_days, + 'total_absent': summary.total_absent + summary.total_half_days, + 'total_holidays': total_holidays, + 'unmarked_days': total_unmarked_days + } + + +def get_attendance_summary_and_days(employee: str, filters: Filters) -> Tuple[Dict, List]: + Attendance = frappe.qb.DocType('Attendance') + + present_case = frappe.qb.terms.Case().when(((Attendance.status == 'Present') | (Attendance.status == 'Work From Home')), 1).else_(0) + sum_present = Sum(present_case).as_('total_present') + + absent_case = frappe.qb.terms.Case().when(Attendance.status == 'Absent', 1).else_(0) + sum_absent = Sum(absent_case).as_('total_absent') + + leave_case = frappe.qb.terms.Case().when(Attendance.status == 'On Leave', 1).else_(0) + sum_leave = Sum(leave_case).as_('total_leaves') + + half_day_case = frappe.qb.terms.Case().when(Attendance.status == 'Half Day', 0.5).else_(0) + sum_half_day = Sum(half_day_case).as_('total_half_days') + + summary = ( + frappe.qb.from_(Attendance) + .select( + sum_present, sum_absent, sum_leave, sum_half_day, + ).where( + (Attendance.docstatus == 1) + & (Attendance.employee == employee) + & (Attendance.company == filters.company) + & (Extract('month', Attendance.attendance_date) == filters.month) + & (Extract('year', Attendance.attendance_date) == filters.year) + ) + ).run(as_dict=True) + + days = ( + frappe.qb.from_(Attendance) + .select(Extract('day', Attendance.attendance_date).as_('day_of_month')) + .distinct() + .where( + (Attendance.docstatus == 1) + & (Attendance.employee == employee) + & (Attendance.company == filters.company) + & (Extract('month', Attendance.attendance_date) == filters.month) + & (Extract('year', Attendance.attendance_date) == filters.year) + ) + ).run(pluck=True) + + return summary[0], days + + +def get_attendance_status_for_detailed_view(employee: str, filters: Filters, employee_attendance: Dict, holidays: List) -> List[Dict]: + """Returns list of shift-wise attendance status for employee + [ + {'shift': 'Morning Shift', 1: 'A', 2: 'P', 3: 'A'....}, + {'shift': 'Evening Shift', 1: 'P', 2: 'A', 3: 'P'....} + ] + """ + total_days = get_total_days_in_month(filters) + attendance_values = [] + + for shift, status_dict in employee_attendance.items(): + row = {'shift': shift} + + for day in range(1, total_days + 1): + status = status_dict.get(day) + if status is None and holidays: + status = get_holiday_status(day, holidays) - if filters.summarized_view: - if status in ['Present', 'Work From Home']: - totals['total_present'] += 1 - elif status in ['Weekly Off', 'Holiday']: - totals['total_holidays'] += 1 - elif status == 'Absent': - totals['total_absent'] += 1 - elif status == 'On Leave': - totals['total_leaves'] += 1 - elif status == 'Half Day': - totals['total_present'] += 0.5 - totals['total_absent'] += 0.5 - totals['total_leaves'] += 0.5 - elif not status: - totals['unmarked_days'] += 1 - else: abbr = status_map.get(status, '') - emp_attendance[day] = abbr + row[day] = abbr - if filters.summarized_view: - emp_attendance.update(totals) + attendance_values.append(row) - return emp_attendance + return attendance_values def get_holiday_status(day: int, holidays: List) -> str: @@ -411,6 +457,7 @@ def get_leave_summary(employee: str, filters: Filters) -> Dict[str, float]: .select(Attendance.leave_type, sum_leave_days) .where( (Attendance.employee == employee) + & (Attendance.docstatus == 1) & (Attendance.company == filters.company) & ((Attendance.leave_type.isnotnull()) | (Attendance.leave_type != '')) & (Extract('month', Attendance.attendance_date) == filters.month) @@ -442,7 +489,8 @@ def get_entry_exits_summary(employee: str, filters: Filters) -> Dict[str, float] frappe.qb.from_(Attendance) .select(count_late_entries, count_early_exits) .where( - (Attendance.employee == employee) + (Attendance.docstatus == 1) + & (Attendance.employee == employee) & (Attendance.company == filters.company) & (Extract('month', Attendance.attendance_date) == filters.month) & (Extract('year', Attendance.attendance_date) == filters.year) @@ -481,20 +529,19 @@ def get_chart_data(attendance_map: Dict, filters: Filters) -> Dict: labels.append(day['label']) total_absent_on_day = total_leaves_on_day = total_present_on_day = 0 - for employee, attendance in attendance_map.items(): - attendance_on_day = attendance.get(day['fieldname']) - if not attendance_on_day: - continue + for employee, attendance_dict in attendance_map.items(): + for shift, attendance in attendance_dict.items(): + attendance_on_day = attendance.get(day['fieldname']) - if attendance_on_day == 'Absent': - total_absent_on_day += 1 - elif attendance_on_day in ['Present', 'Work From Home']: - total_present_on_day += 1 - elif attendance_on_day == 'Half Day': - total_present_on_day += 0.5 - total_leaves_on_day += 0.5 - elif attendance_on_day == 'On Leave': - total_leaves_on_day += 1 + if attendance_on_day == 'Absent': + total_absent_on_day += 1 + elif attendance_on_day in ['Present', 'Work From Home']: + total_present_on_day += 1 + elif attendance_on_day == 'Half Day': + total_present_on_day += 0.5 + total_leaves_on_day += 0.5 + elif attendance_on_day == 'On Leave': + total_leaves_on_day += 1 absent.append(total_absent_on_day) present.append(total_present_on_day) @@ -511,6 +558,4 @@ def get_chart_data(attendance_map: Dict, filters: Filters) -> Dict: }, 'type': 'line', 'colors': ['red', 'green', 'blue'], - } - - return chart \ No newline at end of file + } \ No newline at end of file