feat: show shift-wise attendance in monthly attendance sheet

This commit is contained in:
Rucha Mahabal 2022-03-29 12:36:58 +05:30
parent 865204a541
commit 41cfcdba44

View File

@ -7,10 +7,9 @@ from itertools import groupby
from typing import Dict, List, Optional, Tuple
import frappe
from frappe import _, msgprint
from frappe.utils import cint, cstr, getdate
from frappe import _
from frappe.query_builder.functions import Count, Extract, Sum
from frappe.utils import cint, cstr, getdate
Filters = frappe._dict
@ -32,10 +31,16 @@ def execute(filters: Optional[Filters] = None) -> Tuple:
if not (filters.month and filters.year):
frappe.throw(_('Please select month and year.'))
attendance_map = get_attendance_map(filters)
if not attendance_map:
frappe.msgprint(_('No attendance records found.'), alert=True, indicator='orange')
return [], [], None, None
columns = get_columns(filters)
data, attendance_map = get_data(filters)
data = get_data(filters, attendance_map)
if not data:
frappe.msgprint(_('No attendance records found for this criteria.'), alert=True, indicator='orange')
return columns, [], None, None
message = get_message() if not filters.summarized_view else ''
@ -69,24 +74,25 @@ def get_columns(filters: Filters) -> List[Dict]:
)
columns.extend([
{'label': _('Employee'), 'fieldname': 'employee', 'fieldtype': 'Link', 'options': 'Employee', 'width': 120},
{'label': _('Employee'), 'fieldname': 'employee', 'fieldtype': 'Link', 'options': 'Employee', 'width': 135},
{'label': _('Employee Name'), 'fieldname': 'employee_name', 'fieldtype': 'Data', 'width': 120}
])
if filters.summarized_view:
columns.extend([
{'label': _('Total Present'), 'fieldname': 'total_present', 'fieldtype': 'Float', 'width': 120},
{'label': _('Total Leaves'), 'fieldname': 'total_leaves', 'fieldtype': 'Float', 'width': 120},
{'label': _('Total Absent'), 'fieldname': 'total_absent', 'fieldtype': 'Float', 'width': 120},
{'label': _('Total Present'), 'fieldname': 'total_present', 'fieldtype': 'Float', 'width': 110},
{'label': _('Total Leaves'), 'fieldname': 'total_leaves', 'fieldtype': 'Float', 'width': 110},
{'label': _('Total Absent'), 'fieldname': 'total_absent', 'fieldtype': 'Float', 'width': 110},
{'label': _('Total Holidays'), 'fieldname': 'total_holidays', 'fieldtype': 'Float', 'width': 120},
{'label': _('Unmarked Days'), 'fieldname': 'unmarked_days', 'fieldtype': 'Float', 'width': 120}
{'label': _('Unmarked Days'), 'fieldname': 'unmarked_days', 'fieldtype': 'Float', 'width': 130}
])
columns.extend(get_columns_for_leave_types())
columns.extend([
{'label': _('Total Late Entries'), 'fieldname': 'total_late_entries', 'fieldtype': 'Float', 'width': 120},
{'label': _('Total Early Exits'), 'fieldname': 'total_early_exits', 'fieldtype': 'Float', 'width': 120}
{'label': _('Total Late Entries'), 'fieldname': 'total_late_entries', 'fieldtype': 'Float', 'width': 140},
{'label': _('Total Early Exits'), 'fieldname': 'total_early_exits', 'fieldtype': 'Float', 'width': 140}
])
else:
columns.append({'label': _('Shift'), 'fieldname': 'shift', 'fieldtype': 'Data', 'width': 120})
columns.extend(get_columns_for_days(filters))
return columns
@ -130,16 +136,9 @@ def get_total_days_in_month(filters: Filters) -> int:
return monthrange(cint(filters.year), cint(filters.month))[1]
def get_data(filters: Filters) -> Tuple[List, Dict]:
attendance_map = get_attendance_map(filters)
if not attendance_map:
frappe.msgprint(_('No attendance records found.'), alert=True, indicator='orange')
return [], {}
def get_data(filters: Filters, attendance_map: Dict) -> List[Dict]:
employee_details, group_by_param_values = get_employee_related_details(filters.group_by, filters.company)
holiday_map = get_holiday_map(filters)
data = []
if filters.group_by:
@ -149,12 +148,7 @@ def get_data(filters: Filters) -> Tuple[List, Dict]:
if not value:
continue
records = get_rows(
employee_details[value],
attendance_map,
filters,
holiday_map
)
records = get_rows(employee_details[value], filters, holiday_map, attendance_map)
if records:
data.append({
@ -162,30 +156,21 @@ def get_data(filters: Filters) -> Tuple[List, Dict]:
})
data.extend(records)
else:
data = get_rows(
employee_details,
attendance_map,
filters,
holiday_map
)
data = get_rows(employee_details, filters, holiday_map, attendance_map)
if not data:
frappe.msgprint(_('No attendance records found for this criteria.'), alert=True, indicator='orange')
return [], {}
return data, attendance_map
return data
def get_attendance_map(filters: Filters) -> Dict[str, Dict[int, str]]:
"""Returns a dictionary of employee wise attendance map for all the days of the month like
def get_attendance_map(filters: Filters) -> Dict:
"""Returns a dictionary of employee wise attendance map as per shifts for all the days of the month like
{
'employee1': {
1: 'Present',
2: 'Absent'
'Morning Shift': {1: 'Present', 2: 'Absent', ...}
'Evening Shift': {1: 'Absent', 2: 'Present', ...}
},
'employee2': {
1: 'Absent',
2: 'Present'
'Afternoon Shift': {1: 'Present', 2: 'Absent', ...}
'Night Shift': {1: 'Absent', 2: 'Absent', ...}
}
}
"""
@ -195,7 +180,8 @@ def get_attendance_map(filters: Filters) -> Dict[str, Dict[int, str]]:
.select(
Attendance.employee,
Extract('day', Attendance.attendance_date).as_('day_of_month'),
Attendance.status
Attendance.status,
Attendance.shift
).where(
(Attendance.docstatus == 1)
& (Attendance.company == filters.company)
@ -205,18 +191,14 @@ def get_attendance_map(filters: Filters) -> Dict[str, Dict[int, str]]:
)
if filters.employee:
query = query.where(Attendance.employee == filters.employee)
query = query.orderby(Attendance.employee, Attendance.attendance_date)
attendance_list = query.run(as_dict=1)
if not attendance_list:
frappe.msgprint(_('No attendance records found'), alert=True, indicator='orange')
attendance_map = {}
for d in attendance_list:
attendance_map.setdefault(d.employee, frappe._dict()).setdefault(d.day_of_month, '')
attendance_map[d.employee][d.day_of_month] = d.status
attendance_map.setdefault(d.employee, frappe._dict()).setdefault(d.shift, frappe._dict())
attendance_map[d.employee][d.shift][d.day_of_month] = d.status
return attendance_map
@ -304,86 +286,150 @@ def get_holiday_map(filters: Filters) -> Dict[str, List[Dict]]:
return holiday_map
def get_rows(employee_details: Dict, attendance_map: Dict, filters: Filters, holiday_map: Dict) -> List[Dict]:
def get_rows(employee_details: Dict, filters: Filters, holiday_map: Dict, attendance_map: Dict) -> List[Dict]:
records = []
default_holiday_list = frappe.get_cached_value('Company', filters.company, 'default_holiday_list')
employees_with_attendance = list(attendance_map.keys())
for employee, details in employee_details.items():
if employee not in employees_with_attendance:
continue
row = {
'employee': employee,
'employee_name': details.employee_name
}
if filters.summarized_view:
# set defaults for summarized view
for entry in get_columns(filters):
if entry.get('fieldtype') == 'Float':
row[entry.get('fieldname')] = 0.0
emp_holiday_list = details.holiday_list or default_holiday_list
holidays = holiday_map[emp_holiday_list]
attendance_for_employee = get_attendance_status_for_employee(employee, filters, attendance_map, holidays)
row.update(attendance_for_employee)
if filters.summarized_view:
attendance = get_attendance_status_for_summarized_view(employee, filters, holidays)
if not attendance:
continue
leave_summary = get_leave_summary(employee, filters)
entry_exits_summary = get_entry_exits_summary(employee, filters)
row = {'employee': employee, 'employee_name': details.employee_name}
set_defaults_for_summarized_view(filters, row)
row.update(attendance)
row.update(leave_summary)
row.update(entry_exits_summary)
records.append(row)
else:
employee_attendance = attendance_map.get(employee)
if not employee_attendance:
continue
attendance_for_employee = get_attendance_status_for_detailed_view(employee, filters, employee_attendance, holidays)
# set employee details in the first row
attendance_for_employee[0].update({
'employee': employee,
'employee_name': details.employee_name
})
records.extend(attendance_for_employee)
return records
def get_attendance_status_for_employee(employee: str, filters: Filters, attendance_map: Dict, holidays: List) -> Dict:
"""Returns dict of attendance status for employee
- for summarized view: {'total_present': 1.5, 'total_leaves': 0.5, 'total_absent': 13.5, 'total_holidays': 8, 'unmarked_days': 5}
- for detailed view (day wise): {1: 'A', 2: 'P', 3: 'A'....}
def set_defaults_for_summarized_view(filters, row):
for entry in get_columns(filters):
if entry.get('fieldtype') == 'Float':
row[entry.get('fieldname')] = 0.0
def get_attendance_status_for_summarized_view(employee: str, filters: Filters, holidays: List) -> Dict:
"""Returns dict of attendance status for employee like
{'total_present': 1.5, 'total_leaves': 0.5, 'total_absent': 13.5, 'total_holidays': 8, 'unmarked_days': 5}
"""
emp_attendance = {}
summary, attendance_days = get_attendance_summary_and_days(employee, filters)
if not any(summary.values()):
return {}
total_days = get_total_days_in_month(filters)
totals = {'total_present': 0, 'total_leaves': 0, 'total_absent': 0, 'total_holidays': 0, 'unmarked_days': 0}
total_holidays = total_unmarked_days = 0
for day in range(1, total_days + 1):
status = None
employee_attendance = attendance_map.get(employee)
if employee_attendance:
status = employee_attendance.get(day)
if day in attendance_days:
continue
status = get_holiday_status(day, holidays)
if status in ['Weekly Off', 'Holiday']:
total_holidays += 1
elif not status:
total_unmarked_days += 1
return {
'total_present': summary.total_present + summary.total_half_days,
'total_leaves': summary.total_leaves + summary.total_half_days,
'total_absent': summary.total_absent + summary.total_half_days,
'total_holidays': total_holidays,
'unmarked_days': total_unmarked_days
}
def get_attendance_summary_and_days(employee: str, filters: Filters) -> Tuple[Dict, List]:
Attendance = frappe.qb.DocType('Attendance')
present_case = frappe.qb.terms.Case().when(((Attendance.status == 'Present') | (Attendance.status == 'Work From Home')), 1).else_(0)
sum_present = Sum(present_case).as_('total_present')
absent_case = frappe.qb.terms.Case().when(Attendance.status == 'Absent', 1).else_(0)
sum_absent = Sum(absent_case).as_('total_absent')
leave_case = frappe.qb.terms.Case().when(Attendance.status == 'On Leave', 1).else_(0)
sum_leave = Sum(leave_case).as_('total_leaves')
half_day_case = frappe.qb.terms.Case().when(Attendance.status == 'Half Day', 0.5).else_(0)
sum_half_day = Sum(half_day_case).as_('total_half_days')
summary = (
frappe.qb.from_(Attendance)
.select(
sum_present, sum_absent, sum_leave, sum_half_day,
).where(
(Attendance.docstatus == 1)
& (Attendance.employee == employee)
& (Attendance.company == filters.company)
& (Extract('month', Attendance.attendance_date) == filters.month)
& (Extract('year', Attendance.attendance_date) == filters.year)
)
).run(as_dict=True)
days = (
frappe.qb.from_(Attendance)
.select(Extract('day', Attendance.attendance_date).as_('day_of_month'))
.distinct()
.where(
(Attendance.docstatus == 1)
& (Attendance.employee == employee)
& (Attendance.company == filters.company)
& (Extract('month', Attendance.attendance_date) == filters.month)
& (Extract('year', Attendance.attendance_date) == filters.year)
)
).run(pluck=True)
return summary[0], days
def get_attendance_status_for_detailed_view(employee: str, filters: Filters, employee_attendance: Dict, holidays: List) -> List[Dict]:
"""Returns list of shift-wise attendance status for employee
[
{'shift': 'Morning Shift', 1: 'A', 2: 'P', 3: 'A'....},
{'shift': 'Evening Shift', 1: 'P', 2: 'A', 3: 'P'....}
]
"""
total_days = get_total_days_in_month(filters)
attendance_values = []
for shift, status_dict in employee_attendance.items():
row = {'shift': shift}
for day in range(1, total_days + 1):
status = status_dict.get(day)
if status is None and holidays:
status = get_holiday_status(day, holidays)
if filters.summarized_view:
if status in ['Present', 'Work From Home']:
totals['total_present'] += 1
elif status in ['Weekly Off', 'Holiday']:
totals['total_holidays'] += 1
elif status == 'Absent':
totals['total_absent'] += 1
elif status == 'On Leave':
totals['total_leaves'] += 1
elif status == 'Half Day':
totals['total_present'] += 0.5
totals['total_absent'] += 0.5
totals['total_leaves'] += 0.5
elif not status:
totals['unmarked_days'] += 1
else:
abbr = status_map.get(status, '')
emp_attendance[day] = abbr
row[day] = abbr
if filters.summarized_view:
emp_attendance.update(totals)
attendance_values.append(row)
return emp_attendance
return attendance_values
def get_holiday_status(day: int, holidays: List) -> str:
@ -411,6 +457,7 @@ def get_leave_summary(employee: str, filters: Filters) -> Dict[str, float]:
.select(Attendance.leave_type, sum_leave_days)
.where(
(Attendance.employee == employee)
& (Attendance.docstatus == 1)
& (Attendance.company == filters.company)
& ((Attendance.leave_type.isnotnull()) | (Attendance.leave_type != ''))
& (Extract('month', Attendance.attendance_date) == filters.month)
@ -442,7 +489,8 @@ def get_entry_exits_summary(employee: str, filters: Filters) -> Dict[str, float]
frappe.qb.from_(Attendance)
.select(count_late_entries, count_early_exits)
.where(
(Attendance.employee == employee)
(Attendance.docstatus == 1)
& (Attendance.employee == employee)
& (Attendance.company == filters.company)
& (Extract('month', Attendance.attendance_date) == filters.month)
& (Extract('year', Attendance.attendance_date) == filters.year)
@ -481,10 +529,9 @@ def get_chart_data(attendance_map: Dict, filters: Filters) -> Dict:
labels.append(day['label'])
total_absent_on_day = total_leaves_on_day = total_present_on_day = 0
for employee, attendance in attendance_map.items():
for employee, attendance_dict in attendance_map.items():
for shift, attendance in attendance_dict.items():
attendance_on_day = attendance.get(day['fieldname'])
if not attendance_on_day:
continue
if attendance_on_day == 'Absent':
total_absent_on_day += 1
@ -512,5 +559,3 @@ def get_chart_data(attendance_map: Dict, filters: Filters) -> Dict:
'type': 'line',
'colors': ['red', 'green', 'blue'],
}
return chart