refactor: convert heatmap queries to QB (#33581)
Uses new `UnixTimestamp` function, don't backport.
This commit is contained in:
parent
7ee151880a
commit
fa4af2acce
@ -7,6 +7,8 @@ from email_reply_parser import EmailReplyParser
|
||||
from frappe import _
|
||||
from frappe.desk.reportview import get_match_cond
|
||||
from frappe.model.document import Document
|
||||
from frappe.query_builder import Interval
|
||||
from frappe.query_builder.functions import Count, CurDate, Date, UnixTimestamp
|
||||
from frappe.utils import add_days, flt, get_datetime, get_time, get_url, nowtime, today
|
||||
|
||||
from erpnext import get_default_company
|
||||
@ -297,17 +299,19 @@ class Project(Document):
|
||||
user.welcome_email_sent = 1
|
||||
|
||||
|
||||
def get_timeline_data(doctype, name):
|
||||
def get_timeline_data(doctype: str, name: str) -> dict[int, int]:
|
||||
"""Return timeline for attendance"""
|
||||
|
||||
timesheet_detail = frappe.qb.DocType("Timesheet Detail")
|
||||
|
||||
return dict(
|
||||
frappe.db.sql(
|
||||
"""select unix_timestamp(from_time), count(*)
|
||||
from `tabTimesheet Detail` where project=%s
|
||||
and from_time > date_sub(curdate(), interval 1 year)
|
||||
and docstatus < 2
|
||||
group by date(from_time)""",
|
||||
name,
|
||||
)
|
||||
frappe.qb.from_(timesheet_detail)
|
||||
.select(UnixTimestamp(timesheet_detail.from_time), Count("*"))
|
||||
.where(timesheet_detail.project == name)
|
||||
.where(timesheet_detail.from_time > CurDate() - Interval(years=1))
|
||||
.where(timesheet_detail.docstatus < 2)
|
||||
.groupby(Date(timesheet_detail.from_time))
|
||||
.run()
|
||||
)
|
||||
|
||||
|
||||
|
@ -2,8 +2,13 @@
|
||||
# License: GNU General Public License v3. See license.txt
|
||||
|
||||
|
||||
from collections import defaultdict
|
||||
from itertools import chain
|
||||
|
||||
import frappe
|
||||
from frappe import _
|
||||
from frappe.query_builder import Interval
|
||||
from frappe.query_builder.functions import Count, CurDate, UnixTimestamp
|
||||
from frappe.utils import flt
|
||||
from frappe.utils.nestedset import NestedSet, get_root_of
|
||||
|
||||
@ -77,61 +82,31 @@ def on_doctype_update():
|
||||
frappe.db.add_index("Sales Person", ["lft", "rgt"])
|
||||
|
||||
|
||||
def get_timeline_data(doctype, name):
|
||||
def get_timeline_data(doctype: str, name: str) -> dict[int, int]:
|
||||
def _fetch_activity(doctype: str, date_field: str):
|
||||
sales_team = frappe.qb.DocType("Sales Team")
|
||||
transaction = frappe.qb.DocType(doctype)
|
||||
|
||||
out = {}
|
||||
|
||||
out.update(
|
||||
dict(
|
||||
frappe.db.sql(
|
||||
"""select
|
||||
unix_timestamp(dt.transaction_date), count(st.parenttype)
|
||||
from
|
||||
`tabSales Order` dt, `tabSales Team` st
|
||||
where
|
||||
st.sales_person = %s and st.parent = dt.name and dt.transaction_date > date_sub(curdate(), interval 1 year)
|
||||
group by dt.transaction_date """,
|
||||
name,
|
||||
)
|
||||
return dict(
|
||||
frappe.qb.from_(transaction)
|
||||
.join(sales_team)
|
||||
.on(transaction.name == sales_team.parent)
|
||||
.select(UnixTimestamp(transaction[date_field]), Count("*"))
|
||||
.where(sales_team.sales_person == name)
|
||||
.where(transaction[date_field] > CurDate() - Interval(years=1))
|
||||
.groupby(transaction[date_field])
|
||||
.run()
|
||||
)
|
||||
)
|
||||
|
||||
sales_invoice = dict(
|
||||
frappe.db.sql(
|
||||
"""select
|
||||
unix_timestamp(dt.posting_date), count(st.parenttype)
|
||||
from
|
||||
`tabSales Invoice` dt, `tabSales Team` st
|
||||
where
|
||||
st.sales_person = %s and st.parent = dt.name and dt.posting_date > date_sub(curdate(), interval 1 year)
|
||||
group by dt.posting_date """,
|
||||
name,
|
||||
)
|
||||
)
|
||||
sales_order_activity = _fetch_activity("Sales Order", "transaction_date")
|
||||
sales_invoice_activity = _fetch_activity("Sales Invoice", "posting_date")
|
||||
delivery_note_activity = _fetch_activity("Delivery Note", "posting_date")
|
||||
|
||||
for key in sales_invoice:
|
||||
if out.get(key):
|
||||
out[key] += sales_invoice[key]
|
||||
else:
|
||||
out[key] = sales_invoice[key]
|
||||
merged_activities = defaultdict(int)
|
||||
|
||||
delivery_note = dict(
|
||||
frappe.db.sql(
|
||||
"""select
|
||||
unix_timestamp(dt.posting_date), count(st.parenttype)
|
||||
from
|
||||
`tabDelivery Note` dt, `tabSales Team` st
|
||||
where
|
||||
st.sales_person = %s and st.parent = dt.name and dt.posting_date > date_sub(curdate(), interval 1 year)
|
||||
group by dt.posting_date """,
|
||||
name,
|
||||
)
|
||||
)
|
||||
for ts, count in chain(
|
||||
sales_order_activity.items(), sales_invoice_activity.items(), delivery_note_activity.items()
|
||||
):
|
||||
merged_activities[ts] += count
|
||||
|
||||
for key in delivery_note:
|
||||
if out.get(key):
|
||||
out[key] += delivery_note[key]
|
||||
else:
|
||||
out[key] = delivery_note[key]
|
||||
|
||||
return out
|
||||
return merged_activities
|
||||
|
@ -8,6 +8,8 @@ from typing import Dict, List, Optional
|
||||
import frappe
|
||||
from frappe import _
|
||||
from frappe.model.document import Document
|
||||
from frappe.query_builder import Interval
|
||||
from frappe.query_builder.functions import Count, CurDate, UnixTimestamp
|
||||
from frappe.utils import (
|
||||
cint,
|
||||
cstr,
|
||||
@ -997,18 +999,19 @@ def make_item_price(item, price_list_name, item_price):
|
||||
).insert()
|
||||
|
||||
|
||||
def get_timeline_data(doctype, name):
|
||||
def get_timeline_data(doctype: str, name: str) -> dict[int, int]:
|
||||
"""get timeline data based on Stock Ledger Entry. This is displayed as heatmap on the item page."""
|
||||
|
||||
items = frappe.db.sql(
|
||||
"""select unix_timestamp(posting_date), count(*)
|
||||
from `tabStock Ledger Entry`
|
||||
where item_code=%s and posting_date > date_sub(curdate(), interval 1 year)
|
||||
group by posting_date""",
|
||||
name,
|
||||
)
|
||||
sle = frappe.qb.DocType("Stock Ledger Entry")
|
||||
|
||||
return dict(items)
|
||||
return dict(
|
||||
frappe.qb.from_(sle)
|
||||
.select(UnixTimestamp(sle.posting_date), Count("*"))
|
||||
.where(sle.item_code == name)
|
||||
.where(sle.posting_date > CurDate() - Interval(years=1))
|
||||
.groupby(sle.posting_date)
|
||||
.run()
|
||||
)
|
||||
|
||||
|
||||
def validate_end_of_life(item_code, end_of_life=None, disabled=None):
|
||||
|
Loading…
x
Reference in New Issue
Block a user