refactor: Monthly Attendance Sheet
- split into smaller functions - add type hints - get rid of unnecessary db calls and loops - add docstrings for functions
This commit is contained in:
parent
f2ee36579b
commit
e79d292233
@ -66,8 +66,7 @@ frappe.query_reports["Monthly Attendance Sheet"] = {
|
||||
"Default": 0,
|
||||
}
|
||||
],
|
||||
|
||||
"onload": function() {
|
||||
onload: function() {
|
||||
return frappe.call({
|
||||
method: "erpnext.hr.report.monthly_attendance_sheet.monthly_attendance_sheet.get_attendance_years",
|
||||
callback: function(r) {
|
||||
|
@ -3,365 +3,496 @@
|
||||
|
||||
|
||||
from calendar import monthrange
|
||||
from itertools import groupby
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
import frappe
|
||||
from frappe import _, msgprint
|
||||
from frappe.utils import cint, cstr, getdate
|
||||
|
||||
from frappe.query_builder.functions import Count, Extract, Sum
|
||||
|
||||
Filters = frappe._dict
|
||||
|
||||
status_map = {
|
||||
"Absent": "A",
|
||||
"Half Day": "HD",
|
||||
"Holiday": "<b>H</b>",
|
||||
"Weekly Off": "<b>WO</b>",
|
||||
"On Leave": "L",
|
||||
"Present": "P",
|
||||
"Work From Home": "WFH",
|
||||
'Absent': 'A',
|
||||
'Half Day': 'HD',
|
||||
'Holiday': 'H',
|
||||
'Weekly Off': 'WO',
|
||||
'On Leave': 'L',
|
||||
'Present': 'P',
|
||||
'Work From Home': 'WFH'
|
||||
}
|
||||
|
||||
day_abbr = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
|
||||
day_abbr = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
|
||||
|
||||
def execute(filters: Optional[Filters] = None) -> Tuple:
|
||||
filters = frappe._dict(filters or {})
|
||||
|
||||
def execute(filters=None):
|
||||
if not filters:
|
||||
filters = {}
|
||||
if not (filters.month and filters.year):
|
||||
frappe.throw(_('Please select month and year.'))
|
||||
|
||||
if filters.hide_year_field == 1:
|
||||
filters.year = 2020
|
||||
columns = get_columns(filters)
|
||||
data, attendance_map = get_data(filters)
|
||||
|
||||
conditions, filters = get_conditions(filters)
|
||||
columns, days = get_columns(filters)
|
||||
att_map = get_attendance_list(conditions, filters)
|
||||
if not att_map:
|
||||
if not data:
|
||||
return columns, [], None, None
|
||||
|
||||
if filters.group_by:
|
||||
emp_map, group_by_parameters = get_employee_details(filters.group_by, filters.company)
|
||||
holiday_list = []
|
||||
for parameter in group_by_parameters:
|
||||
h_list = [
|
||||
emp_map[parameter][d]["holiday_list"]
|
||||
for d in emp_map[parameter]
|
||||
if emp_map[parameter][d]["holiday_list"]
|
||||
]
|
||||
holiday_list += h_list
|
||||
else:
|
||||
emp_map = get_employee_details(filters.group_by, filters.company)
|
||||
holiday_list = [emp_map[d]["holiday_list"] for d in emp_map if emp_map[d]["holiday_list"]]
|
||||
chart = get_chart_data(attendance_map, filters)
|
||||
|
||||
default_holiday_list = frappe.get_cached_value(
|
||||
"Company", filters.get("company"), "default_holiday_list"
|
||||
)
|
||||
holiday_list.append(default_holiday_list)
|
||||
holiday_list = list(set(holiday_list))
|
||||
holiday_map = get_holiday(holiday_list, filters["month"])
|
||||
|
||||
data = []
|
||||
|
||||
leave_types = frappe.db.get_list("Leave Type")
|
||||
leave_list = None
|
||||
if filters.summarized_view:
|
||||
leave_list = [d.name + ":Float:120" for d in leave_types]
|
||||
columns.extend(leave_list)
|
||||
columns.extend([_("Total Late Entries") + ":Float:120", _("Total Early Exits") + ":Float:120"])
|
||||
|
||||
if filters.group_by:
|
||||
emp_att_map = {}
|
||||
for parameter in group_by_parameters:
|
||||
emp_map_set = set([key for key in emp_map[parameter].keys()])
|
||||
att_map_set = set([key for key in att_map.keys()])
|
||||
if att_map_set & emp_map_set:
|
||||
parameter_row = ["<b>" + parameter + "</b>"] + [
|
||||
"" for day in range(filters["total_days_in_month"] + 2)
|
||||
]
|
||||
data.append(parameter_row)
|
||||
record, emp_att_data = add_data(
|
||||
emp_map[parameter],
|
||||
att_map,
|
||||
filters,
|
||||
holiday_map,
|
||||
conditions,
|
||||
default_holiday_list,
|
||||
leave_types=leave_types,
|
||||
)
|
||||
emp_att_map.update(emp_att_data)
|
||||
data += record
|
||||
else:
|
||||
record, emp_att_map = add_data(
|
||||
emp_map,
|
||||
att_map,
|
||||
filters,
|
||||
holiday_map,
|
||||
conditions,
|
||||
default_holiday_list,
|
||||
leave_types=leave_types,
|
||||
)
|
||||
data += record
|
||||
|
||||
chart_data = get_chart_data(emp_att_map, days)
|
||||
|
||||
return columns, data, None, chart_data
|
||||
return columns, data, None, chart
|
||||
|
||||
|
||||
def get_chart_data(emp_att_map, days):
|
||||
labels = []
|
||||
datasets = [
|
||||
{"name": "Absent", "values": []},
|
||||
{"name": "Present", "values": []},
|
||||
{"name": "Leave", "values": []},
|
||||
]
|
||||
for idx, day in enumerate(days, start=0):
|
||||
p = day.replace("::65", "")
|
||||
labels.append(day.replace("::65", ""))
|
||||
total_absent_on_day = 0
|
||||
total_leave_on_day = 0
|
||||
total_present_on_day = 0
|
||||
total_holiday = 0
|
||||
for emp in emp_att_map.keys():
|
||||
if emp_att_map[emp][idx]:
|
||||
if emp_att_map[emp][idx] == "A":
|
||||
total_absent_on_day += 1
|
||||
if emp_att_map[emp][idx] in ["P", "WFH"]:
|
||||
total_present_on_day += 1
|
||||
if emp_att_map[emp][idx] == "HD":
|
||||
total_present_on_day += 0.5
|
||||
total_leave_on_day += 0.5
|
||||
if emp_att_map[emp][idx] == "L":
|
||||
total_leave_on_day += 1
|
||||
|
||||
datasets[0]["values"].append(total_absent_on_day)
|
||||
datasets[1]["values"].append(total_present_on_day)
|
||||
datasets[2]["values"].append(total_leave_on_day)
|
||||
|
||||
chart = {"data": {"labels": labels, "datasets": datasets}}
|
||||
|
||||
chart["type"] = "line"
|
||||
|
||||
return chart
|
||||
|
||||
|
||||
def add_data(
|
||||
employee_map, att_map, filters, holiday_map, conditions, default_holiday_list, leave_types=None
|
||||
):
|
||||
|
||||
record = []
|
||||
emp_att_map = {}
|
||||
for emp in employee_map:
|
||||
emp_det = employee_map.get(emp)
|
||||
if not emp_det or emp not in att_map:
|
||||
continue
|
||||
|
||||
row = []
|
||||
if filters.group_by:
|
||||
row += [" "]
|
||||
row += [emp, emp_det.employee_name]
|
||||
|
||||
total_p = total_a = total_l = total_h = total_um = 0.0
|
||||
emp_status_map = []
|
||||
for day in range(filters["total_days_in_month"]):
|
||||
status = None
|
||||
status = att_map.get(emp).get(day + 1)
|
||||
|
||||
if status is None and holiday_map:
|
||||
emp_holiday_list = emp_det.holiday_list if emp_det.holiday_list else default_holiday_list
|
||||
|
||||
if emp_holiday_list in holiday_map:
|
||||
for idx, ele in enumerate(holiday_map[emp_holiday_list]):
|
||||
if day + 1 == holiday_map[emp_holiday_list][idx][0]:
|
||||
if holiday_map[emp_holiday_list][idx][1]:
|
||||
status = "Weekly Off"
|
||||
else:
|
||||
status = "Holiday"
|
||||
total_h += 1
|
||||
|
||||
abbr = status_map.get(status, "")
|
||||
emp_status_map.append(abbr)
|
||||
|
||||
if filters.summarized_view:
|
||||
if status == "Present" or status == "Work From Home":
|
||||
total_p += 1
|
||||
elif status == "Absent":
|
||||
total_a += 1
|
||||
elif status == "On Leave":
|
||||
total_l += 1
|
||||
elif status == "Half Day":
|
||||
total_p += 0.5
|
||||
total_a += 0.5
|
||||
total_l += 0.5
|
||||
elif not status:
|
||||
total_um += 1
|
||||
|
||||
if not filters.summarized_view:
|
||||
row += emp_status_map
|
||||
|
||||
if filters.summarized_view:
|
||||
row += [total_p, total_l, total_a, total_h, total_um]
|
||||
|
||||
if not filters.get("employee"):
|
||||
filters.update({"employee": emp})
|
||||
conditions += " and employee = %(employee)s"
|
||||
elif not filters.get("employee") == emp:
|
||||
filters.update({"employee": emp})
|
||||
|
||||
if filters.summarized_view:
|
||||
leave_details = frappe.db.sql(
|
||||
"""select leave_type, status, count(*) as count from `tabAttendance`\
|
||||
where leave_type is not NULL %s group by leave_type, status"""
|
||||
% conditions,
|
||||
filters,
|
||||
as_dict=1,
|
||||
)
|
||||
|
||||
time_default_counts = frappe.db.sql(
|
||||
"""select (select count(*) from `tabAttendance` where \
|
||||
late_entry = 1 %s) as late_entry_count, (select count(*) from tabAttendance where \
|
||||
early_exit = 1 %s) as early_exit_count"""
|
||||
% (conditions, conditions),
|
||||
filters,
|
||||
)
|
||||
|
||||
leaves = {}
|
||||
for d in leave_details:
|
||||
if d.status == "Half Day":
|
||||
d.count = d.count * 0.5
|
||||
if d.leave_type in leaves:
|
||||
leaves[d.leave_type] += d.count
|
||||
else:
|
||||
leaves[d.leave_type] = d.count
|
||||
|
||||
for d in leave_types:
|
||||
if d.name in leaves:
|
||||
row.append(leaves[d.name])
|
||||
else:
|
||||
row.append("0.0")
|
||||
|
||||
row.extend([time_default_counts[0][0], time_default_counts[0][1]])
|
||||
emp_att_map[emp] = emp_status_map
|
||||
record.append(row)
|
||||
|
||||
return record, emp_att_map
|
||||
|
||||
|
||||
def get_columns(filters):
|
||||
|
||||
def get_columns(filters: Filters) -> List[Dict]:
|
||||
columns = []
|
||||
|
||||
if filters.group_by:
|
||||
columns = [_(filters.group_by) + ":Link/Branch:120"]
|
||||
columns.append(
|
||||
{'label': _(filters.group_by), 'fieldname': frappe.scrub(filters.group_by), 'fieldtype': 'Link', 'options': 'Branch', 'width': 120}
|
||||
)
|
||||
|
||||
columns += [_("Employee") + ":Link/Employee:120", _("Employee Name") + ":Data/:120"]
|
||||
days = []
|
||||
for day in range(filters["total_days_in_month"]):
|
||||
date = str(filters.year) + "-" + str(filters.month) + "-" + str(day + 1)
|
||||
day_name = day_abbr[getdate(date).weekday()]
|
||||
days.append(cstr(day + 1) + " " + day_name + "::65")
|
||||
if not filters.summarized_view:
|
||||
columns += days
|
||||
columns.extend([
|
||||
{'label': _('Employee'), 'fieldname': 'employee', 'fieldtype': 'Link', 'options': 'Employee', 'width': 120},
|
||||
{'label': _('Employee Name'), 'fieldname': 'employee_name', 'fieldtype': 'Data', 'width': 120}
|
||||
])
|
||||
|
||||
if filters.summarized_view:
|
||||
columns += [
|
||||
_("Total Present") + ":Float:120",
|
||||
_("Total Leaves") + ":Float:120",
|
||||
_("Total Absent") + ":Float:120",
|
||||
_("Total Holidays") + ":Float:120",
|
||||
_("Unmarked Days") + ":Float:120",
|
||||
]
|
||||
return columns, days
|
||||
columns.extend([
|
||||
{'label': _('Total Present'), 'fieldname': 'total_present', 'fieldtype': 'Float', 'width': 120},
|
||||
{'label': _('Total Leaves'), 'fieldname': 'total_leaves', 'fieldtype': 'Float', 'width': 120},
|
||||
{'label': _('Total Absent'), 'fieldname': 'total_absent', 'fieldtype': 'Float', 'width': 120},
|
||||
{'label': _('Total Holidays'), 'fieldname': 'total_holidays', 'fieldtype': 'Float', 'width': 120},
|
||||
{'label': _('Unmarked Days'), 'fieldname': 'unmarked_days', 'fieldtype': 'Float', 'width': 120}
|
||||
])
|
||||
columns.extend(get_columns_for_leave_types())
|
||||
columns.extend([
|
||||
{'label': _('Total Late Entries'), 'fieldname': 'total_late_entries', 'fieldtype': 'Float', 'width': 120},
|
||||
{'label': _('Total Early Exits'), 'fieldname': 'total_early_exits', 'fieldtype': 'Float', 'width': 120}
|
||||
])
|
||||
else:
|
||||
columns.extend(get_columns_for_days(filters))
|
||||
|
||||
return columns
|
||||
|
||||
|
||||
def get_attendance_list(conditions, filters):
|
||||
attendance_list = frappe.db.sql(
|
||||
"""select employee, day(attendance_date) as day_of_month,
|
||||
status from tabAttendance where docstatus = 1 %s order by employee, attendance_date"""
|
||||
% conditions,
|
||||
filters,
|
||||
as_dict=1,
|
||||
def get_columns_for_leave_types() -> List[Dict]:
|
||||
leave_types = frappe.db.get_all('Leave Type', pluck='name')
|
||||
types = []
|
||||
for entry in leave_types:
|
||||
types.append({'label': entry, 'fieldname': frappe.scrub(entry), 'fieldtype': 'Float', 'width': 120})
|
||||
|
||||
return types
|
||||
|
||||
|
||||
def get_columns_for_days(filters: Filters) -> List[Dict]:
|
||||
total_days = get_total_days_in_month(filters)
|
||||
days = []
|
||||
|
||||
for day in range(1, total_days+1):
|
||||
# forms the dates from selected year and month from filters
|
||||
date = '{}-{}-{}'.format(
|
||||
cstr(filters.year),
|
||||
cstr(filters.month),
|
||||
cstr(day)
|
||||
)
|
||||
# gets abbr from weekday number
|
||||
weekday = day_abbr[getdate(date).weekday()]
|
||||
# sets days as 1 Mon, 2 Tue, 3 Wed
|
||||
label = '{} {}'.format(cstr(day), weekday)
|
||||
days.append({
|
||||
'label': label,
|
||||
'fieldtype': 'Data',
|
||||
'fieldname': day,
|
||||
'width': 65
|
||||
})
|
||||
|
||||
return days
|
||||
|
||||
|
||||
def get_total_days_in_month(filters: Filters) -> int:
|
||||
return monthrange(cint(filters.year), cint(filters.month))[1]
|
||||
|
||||
|
||||
def get_data(filters: Filters) -> Tuple[List, Dict]:
|
||||
attendance_map = get_attendance_map(filters)
|
||||
|
||||
if not attendance_map:
|
||||
frappe.msgprint(_('No attendance records found.'), alert=True, indicator='orange')
|
||||
return [], {}
|
||||
|
||||
employee_details, group_by_param_values = get_employee_related_details(filters.group_by, filters.company)
|
||||
holiday_map = get_holiday_map(filters)
|
||||
|
||||
data = []
|
||||
|
||||
if filters.group_by:
|
||||
group_by_column = frappe.scrub(filters.group_by)
|
||||
|
||||
for value in group_by_param_values:
|
||||
if not value:
|
||||
continue
|
||||
|
||||
records = get_rows(
|
||||
employee_details[value],
|
||||
attendance_map,
|
||||
filters,
|
||||
holiday_map
|
||||
)
|
||||
|
||||
if records:
|
||||
data.append({
|
||||
group_by_column: frappe.bold(value)
|
||||
})
|
||||
data.extend(records)
|
||||
else:
|
||||
data = get_rows(
|
||||
employee_details,
|
||||
attendance_map,
|
||||
filters,
|
||||
holiday_map
|
||||
)
|
||||
|
||||
if not data:
|
||||
frappe.msgprint(_('No attendance records found for this criteria.'), alert=True, indicator='orange')
|
||||
return [], {}
|
||||
|
||||
return data, attendance_map
|
||||
|
||||
|
||||
def get_attendance_map(filters: Filters) -> Dict[str, Dict[int, str]]:
|
||||
"""Returns a dictionary of employee wise attendance map for all the days of the month like
|
||||
{
|
||||
'employee1': {
|
||||
1: 'Present',
|
||||
2: 'Absent'
|
||||
},
|
||||
'employee2': {
|
||||
1: 'Absent',
|
||||
2: 'Present'
|
||||
}
|
||||
}
|
||||
"""
|
||||
Attendance = frappe.qb.DocType('Attendance')
|
||||
query = (
|
||||
frappe.qb.from_(Attendance)
|
||||
.select(
|
||||
Attendance.employee,
|
||||
Extract('day', Attendance.attendance_date).as_('day_of_month'),
|
||||
Attendance.status
|
||||
).where(
|
||||
(Attendance.docstatus == 1)
|
||||
& (Attendance.company == filters.company)
|
||||
& (Extract('month', Attendance.attendance_date) == filters.month)
|
||||
& (Extract('year', Attendance.attendance_date) == filters.year)
|
||||
)
|
||||
)
|
||||
if filters.employee:
|
||||
query = query.where(Attendance.employee == filters.employee)
|
||||
|
||||
query = query.orderby(Attendance.employee, Attendance.attendance_date)
|
||||
|
||||
attendance_list = query.run(as_dict=1)
|
||||
|
||||
if not attendance_list:
|
||||
msgprint(_("No attendance record found"), alert=True, indicator="orange")
|
||||
frappe.msgprint(_('No attendance records found'), alert=True, indicator='orange')
|
||||
|
||||
att_map = {}
|
||||
attendance_map = {}
|
||||
for d in attendance_list:
|
||||
att_map.setdefault(d.employee, frappe._dict()).setdefault(d.day_of_month, "")
|
||||
att_map[d.employee][d.day_of_month] = d.status
|
||||
attendance_map.setdefault(d.employee, frappe._dict()).setdefault(d.day_of_month, '')
|
||||
attendance_map[d.employee][d.day_of_month] = d.status
|
||||
|
||||
return att_map
|
||||
return attendance_map
|
||||
|
||||
|
||||
def get_conditions(filters):
|
||||
if not (filters.get("month") and filters.get("year")):
|
||||
msgprint(_("Please select month and year"), raise_exception=1)
|
||||
|
||||
filters["total_days_in_month"] = monthrange(cint(filters.year), cint(filters.month))[1]
|
||||
|
||||
conditions = " and month(attendance_date) = %(month)s and year(attendance_date) = %(year)s"
|
||||
|
||||
if filters.get("company"):
|
||||
conditions += " and company = %(company)s"
|
||||
if filters.get("employee"):
|
||||
conditions += " and employee = %(employee)s"
|
||||
|
||||
return conditions, filters
|
||||
|
||||
|
||||
def get_employee_details(group_by, company):
|
||||
emp_map = {}
|
||||
query = """select name, employee_name, designation, department, branch, company,
|
||||
holiday_list from `tabEmployee` where company = %s """ % frappe.db.escape(
|
||||
company
|
||||
def get_employee_related_details(group_by: str, company: str) -> Tuple[Dict, List]:
|
||||
"""Returns
|
||||
1. nested dict for employee details
|
||||
2. list of values for the group by filter
|
||||
eg: if group by filter is set to "Department" then returns a list like ['HR', 'Support', 'Engineering']
|
||||
"""
|
||||
Employee = frappe.qb.DocType('Employee')
|
||||
query = (
|
||||
frappe.qb.from_(Employee)
|
||||
.select(
|
||||
Employee.name, Employee.employee_name, Employee.designation,
|
||||
Employee.grade, Employee.department, Employee.branch,
|
||||
Employee.company, Employee.holiday_list
|
||||
).where(Employee.company == company)
|
||||
)
|
||||
|
||||
if group_by:
|
||||
group_by = group_by.lower()
|
||||
query += " order by " + group_by + " ASC"
|
||||
query = query.orderby(group_by)
|
||||
|
||||
employee_details = frappe.db.sql(query, as_dict=1)
|
||||
employee_details = query.run(as_dict=True)
|
||||
|
||||
group_by_param_values = []
|
||||
emp_map = {}
|
||||
|
||||
group_by_parameters = []
|
||||
if group_by:
|
||||
for parameter, employees in groupby(employee_details, key=lambda d: d[group_by]):
|
||||
group_by_param_values.append(parameter)
|
||||
emp_map.setdefault(parameter, frappe._dict())
|
||||
|
||||
group_by_parameters = list(
|
||||
set(detail.get(group_by, "") for detail in employee_details if detail.get(group_by, ""))
|
||||
)
|
||||
for parameter in group_by_parameters:
|
||||
emp_map[parameter] = {}
|
||||
|
||||
for d in employee_details:
|
||||
if group_by and len(group_by_parameters):
|
||||
if d.get(group_by, None):
|
||||
|
||||
emp_map[d.get(group_by)][d.name] = d
|
||||
else:
|
||||
emp_map[d.name] = d
|
||||
|
||||
if not group_by:
|
||||
return emp_map
|
||||
for emp in employees:
|
||||
emp_map[parameter][emp.name] = emp
|
||||
else:
|
||||
return emp_map, group_by_parameters
|
||||
for emp in employee_details:
|
||||
emp_map[emp.name] = emp
|
||||
|
||||
return emp_map, group_by_param_values
|
||||
|
||||
|
||||
def get_holiday(holiday_list, month):
|
||||
def get_holiday_map(filters: Filters) -> Dict[str, List[Dict]]:
|
||||
"""
|
||||
Returns a dict of holidays falling in the filter month and year
|
||||
with list name as key and list of holidays as values like
|
||||
{
|
||||
'Holiday List 1': [
|
||||
{'day_of_month': '0' , 'weekly_off': 1},
|
||||
{'day_of_month': '1', 'weekly_off': 0}
|
||||
],
|
||||
'Holiday List 2': [
|
||||
{'day_of_month': '0' , 'weekly_off': 1},
|
||||
{'day_of_month': '1', 'weekly_off': 0}
|
||||
]
|
||||
}
|
||||
"""
|
||||
# add default holiday list too
|
||||
holiday_lists = frappe.db.get_all('Holiday List', pluck='name')
|
||||
default_holiday_list = frappe.get_cached_value('Company', filters.company, 'default_holiday_list')
|
||||
holiday_lists.append(default_holiday_list)
|
||||
|
||||
holiday_map = frappe._dict()
|
||||
for d in holiday_list:
|
||||
if d:
|
||||
holiday_map.setdefault(
|
||||
d,
|
||||
frappe.db.sql(
|
||||
"""select day(holiday_date), weekly_off from `tabHoliday`
|
||||
where parent=%s and month(holiday_date)=%s""",
|
||||
(d, month),
|
||||
),
|
||||
Holiday = frappe.qb.DocType('Holiday')
|
||||
|
||||
for d in holiday_lists:
|
||||
if not d:
|
||||
continue
|
||||
|
||||
holidays = (
|
||||
frappe.qb.from_(Holiday)
|
||||
.select(
|
||||
Extract('day', Holiday.holiday_date).as_('day_of_month'),
|
||||
Holiday.weekly_off
|
||||
).where(
|
||||
(Holiday.parent == d)
|
||||
& (Extract('month', Holiday.holiday_date) == filters.month)
|
||||
& (Extract('year', Holiday.holiday_date) == filters.year)
|
||||
)
|
||||
).run(as_dict=True)
|
||||
|
||||
holiday_map.setdefault(d, holidays)
|
||||
|
||||
return holiday_map
|
||||
|
||||
|
||||
def get_rows(employee_details: Dict, attendance_map: Dict, filters: Filters, holiday_map: Dict) -> List[Dict]:
|
||||
records = []
|
||||
default_holiday_list = frappe.get_cached_value('Company', filters.company, 'default_holiday_list')
|
||||
employees_with_attendance = list(attendance_map.keys())
|
||||
|
||||
for employee, details in employee_details.items():
|
||||
if employee not in employees_with_attendance:
|
||||
continue
|
||||
|
||||
row = {
|
||||
'employee': employee,
|
||||
'employee_name': details.employee_name
|
||||
}
|
||||
|
||||
if filters.summarized_view:
|
||||
# set defaults for summarized view
|
||||
for entry in get_columns(filters):
|
||||
if entry.get('fieldtype') == 'Float':
|
||||
row[entry.get('fieldname')] = 0.0
|
||||
|
||||
emp_holiday_list = details.holiday_list or default_holiday_list
|
||||
holidays = holiday_map[emp_holiday_list]
|
||||
|
||||
attendance_for_employee = get_attendance_status_for_employee(employee, filters, attendance_map, holidays)
|
||||
row.update(attendance_for_employee)
|
||||
|
||||
if filters.summarized_view:
|
||||
leave_summary = get_leave_summary(employee, filters)
|
||||
entry_exits_summary = get_entry_exits_summary(employee, filters)
|
||||
|
||||
row.update(leave_summary)
|
||||
row.update(entry_exits_summary)
|
||||
|
||||
records.append(row)
|
||||
|
||||
return records
|
||||
|
||||
|
||||
def get_attendance_status_for_employee(employee: str, filters: Filters, attendance_map: Dict, holidays: List) -> Dict:
|
||||
"""Returns dict of attendance status for employee
|
||||
- for summarized view: {'total_present': 1.5, 'total_leaves': 0.5, 'total_absent': 13.5, 'total_holidays': 8, 'unmarked_days': 5}
|
||||
- for detailed view (day wise): {1: 'A', 2: 'P', 3: 'A'....}
|
||||
"""
|
||||
emp_attendance = {}
|
||||
|
||||
total_days = get_total_days_in_month(filters)
|
||||
totals = {'total_present': 0, 'total_leaves': 0, 'total_absent': 0, 'total_holidays': 0, 'unmarked_days': 0}
|
||||
|
||||
for day in range(1, total_days + 1):
|
||||
status = None
|
||||
employee_attendance = attendance_map.get(employee)
|
||||
if employee_attendance:
|
||||
status = employee_attendance.get(day)
|
||||
|
||||
if status is None and holidays:
|
||||
status = get_holiday_status(day, holidays)
|
||||
|
||||
if filters.summarized_view:
|
||||
if status in ['Present', 'Work From Home']:
|
||||
totals['total_present'] += 1
|
||||
elif status in ['Weekly Off', 'Holiday']:
|
||||
totals['total_holidays'] += 1
|
||||
elif status == 'Absent':
|
||||
totals['total_absent'] += 1
|
||||
elif status == 'On Leave':
|
||||
totals['total_leaves'] += 1
|
||||
elif status == 'Half Day':
|
||||
totals['total_present'] += 0.5
|
||||
totals['total_absent'] += 0.5
|
||||
totals['total_leaves'] += 0.5
|
||||
elif not status:
|
||||
totals['unmarked_days'] += 1
|
||||
else:
|
||||
abbr = status_map.get(status, '')
|
||||
emp_attendance[day] = abbr
|
||||
|
||||
if filters.summarized_view:
|
||||
emp_attendance.update(totals)
|
||||
|
||||
return emp_attendance
|
||||
|
||||
|
||||
def get_holiday_status(day: int, holidays: List) -> str:
|
||||
status = None
|
||||
for holiday in holidays:
|
||||
if day == holiday.get('day_of_month'):
|
||||
if holiday.get('weekly_off'):
|
||||
status = 'Weekly Off'
|
||||
else:
|
||||
status = 'Holiday'
|
||||
break
|
||||
return status
|
||||
|
||||
|
||||
def get_leave_summary(employee: str, filters: Filters) -> Dict[str, float]:
|
||||
"""Returns a dict of leave type and corresponding leaves taken by employee like:
|
||||
{'leave_without_pay': 1.0, 'sick_leave': 2.0}
|
||||
"""
|
||||
Attendance = frappe.qb.DocType('Attendance')
|
||||
day_case = frappe.qb.terms.Case().when(Attendance.status == 'Half Day', 0.5).else_(1)
|
||||
sum_leave_days = Sum(day_case).as_('leave_days')
|
||||
|
||||
leave_details = (
|
||||
frappe.qb.from_(Attendance)
|
||||
.select(Attendance.leave_type, sum_leave_days)
|
||||
.where(
|
||||
(Attendance.employee == employee)
|
||||
& (Attendance.company == filters.company)
|
||||
& ((Attendance.leave_type.isnotnull()) | (Attendance.leave_type != ''))
|
||||
& (Extract('month', Attendance.attendance_date) == filters.month)
|
||||
& (Extract('year', Attendance.attendance_date) == filters.year)
|
||||
).groupby(Attendance.leave_type)
|
||||
).run(as_dict=True)
|
||||
|
||||
leaves = {}
|
||||
for d in leave_details:
|
||||
leave_type = frappe.scrub(d.leave_type)
|
||||
leaves[leave_type] = d.leave_days
|
||||
|
||||
return leaves
|
||||
|
||||
|
||||
def get_entry_exits_summary(employee: str, filters: Filters) -> Dict[str, float]:
|
||||
"""Returns total late entries and total early exits for employee like:
|
||||
{'total_late_entries': 5, 'total_early_exits': 2}
|
||||
"""
|
||||
Attendance = frappe.qb.DocType('Attendance')
|
||||
|
||||
late_entry_case = frappe.qb.terms.Case().when(Attendance.late_entry == '1', '1')
|
||||
count_late_entries = Count(late_entry_case).as_('total_late_entries')
|
||||
|
||||
early_exit_case = frappe.qb.terms.Case().when(Attendance.early_exit == '1', '1')
|
||||
count_early_exits = Count(early_exit_case).as_('total_early_exits')
|
||||
|
||||
entry_exits = (
|
||||
frappe.qb.from_(Attendance)
|
||||
.select(count_late_entries, count_early_exits)
|
||||
.where(
|
||||
(Attendance.employee == employee)
|
||||
& (Attendance.company == filters.company)
|
||||
& (Extract('month', Attendance.attendance_date) == filters.month)
|
||||
& (Extract('year', Attendance.attendance_date) == filters.year)
|
||||
)
|
||||
).run(as_dict=True)
|
||||
|
||||
return entry_exits[0]
|
||||
|
||||
|
||||
@frappe.whitelist()
|
||||
def get_attendance_years():
|
||||
year_list = frappe.db.sql_list(
|
||||
"""select distinct YEAR(attendance_date) from tabAttendance ORDER BY YEAR(attendance_date) DESC"""
|
||||
)
|
||||
if not year_list:
|
||||
def get_attendance_years() -> str:
|
||||
"""Returns all the years for which attendance records exist"""
|
||||
Attendance = frappe.qb.DocType('Attendance')
|
||||
year_list = (
|
||||
frappe.qb.from_(Attendance)
|
||||
.select(Extract('year', Attendance.attendance_date).as_('year'))
|
||||
.distinct()
|
||||
).run(as_dict=True)
|
||||
|
||||
if year_list:
|
||||
year_list.sort(key=lambda d: d.year, reverse=True)
|
||||
else:
|
||||
year_list = [getdate().year]
|
||||
|
||||
return "\n".join(str(year) for year in year_list)
|
||||
return "\n".join(cstr(entry.year) for entry in year_list)
|
||||
|
||||
|
||||
def get_chart_data(attendance_map: Dict, filters: Filters) -> Dict:
|
||||
days = get_columns_for_days(filters)
|
||||
labels = []
|
||||
absent = []
|
||||
present = []
|
||||
leave = []
|
||||
|
||||
for day in days:
|
||||
labels.append(day['label'])
|
||||
total_absent_on_day = total_leaves_on_day = total_present_on_day = 0
|
||||
|
||||
for employee, attendance in attendance_map.items():
|
||||
attendance_on_day = attendance.get(day['fieldname'])
|
||||
if not attendance_on_day:
|
||||
continue
|
||||
|
||||
if attendance_on_day == 'Absent':
|
||||
total_absent_on_day += 1
|
||||
elif attendance_on_day in ['Present', 'Work From Home']:
|
||||
total_present_on_day += 1
|
||||
elif attendance_on_day == 'Half Day':
|
||||
total_present_on_day += 0.5
|
||||
total_leaves_on_day += 0.5
|
||||
elif attendance_on_day == 'On Leave':
|
||||
total_leaves_on_day += 1
|
||||
|
||||
absent.append(total_absent_on_day)
|
||||
present.append(total_present_on_day)
|
||||
leave.append(total_leaves_on_day)
|
||||
|
||||
return {
|
||||
'data': {
|
||||
'labels': labels,
|
||||
'datasets': [
|
||||
{'name': 'Absent', 'values': absent},
|
||||
{'name': 'Present', 'values': present},
|
||||
{'name': 'Leave', 'values': leave},
|
||||
]
|
||||
},
|
||||
'type': 'line'
|
||||
}
|
||||
|
||||
return chart
|
@ -33,14 +33,15 @@ class TestMonthlyAttendanceSheet(FrappeTestCase):
|
||||
}
|
||||
)
|
||||
report = execute(filters=filters)
|
||||
employees = report[1][0]
|
||||
datasets = report[3]["data"]["datasets"]
|
||||
absent = datasets[0]["values"]
|
||||
present = datasets[1]["values"]
|
||||
leaves = datasets[2]["values"]
|
||||
|
||||
record = report[1][0]
|
||||
datasets = report[3]['data']['datasets']
|
||||
absent = datasets[0]['values']
|
||||
present = datasets[1]['values']
|
||||
leaves = datasets[2]['values']
|
||||
|
||||
# ensure correct attendance is reflect on the report
|
||||
self.assertIn(self.employee, employees)
|
||||
self.assertEqual(self.employee, record.get('employee'))
|
||||
self.assertEqual(absent[0], 1)
|
||||
self.assertEqual(present[1], 1)
|
||||
self.assertEqual(leaves[2], 1)
|
||||
|
Loading…
x
Reference in New Issue
Block a user