Merge branch 'develop' into payment_entry_taxes_unallocated_amount

This commit is contained in:
Saqib 2021-07-14 13:55:09 +05:30 committed by GitHub
commit d86cdb4368
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 596 additions and 130 deletions

View File

@ -975,8 +975,17 @@ class TestPurchaseInvoice(unittest.TestCase):
acc_settings.save()
def test_gain_loss_with_advance_entry(self):
unlink_enabled = frappe.db.get_value("Accounts Settings", "Accounts Settings", "unlink_payment_on_cancel_of_invoice")
frappe.db.set_value("Accounts Settings", "Accounts Settings", "unlink_payment_on_cancel_of_invoice", 1)
unlink_enabled = frappe.db.get_value(
"Accounts Settings", "Accounts Settings",
"unlink_payment_on_cancel_of_invoice")
frappe.db.set_value(
"Accounts Settings", "Accounts Settings",
"unlink_payment_on_cancel_of_invoice", 1)
original_account = frappe.db.get_value("Company", "_Test Company", "exchange_gain_loss_account")
frappe.db.set_value("Company", "_Test Company", "exchange_gain_loss_account", "Exchange Gain/Loss - _TC")
pay = frappe.get_doc({
'doctype': 'Payment Entry',
'company': '_Test Company',
@ -1016,7 +1025,8 @@ class TestPurchaseInvoice(unittest.TestCase):
gl_entries = frappe.db.sql("""
select account, sum(debit - credit) as balance from `tabGL Entry`
where voucher_no=%s
group by account order by account asc""", (pi.name), as_dict=1)
group by account
order by account asc""", (pi.name), as_dict=1)
for i, gle in enumerate(gl_entries):
self.assertEqual(expected_gle[i][0], gle.account)
@ -1076,6 +1086,7 @@ class TestPurchaseInvoice(unittest.TestCase):
pay.cancel()
frappe.db.set_value("Accounts Settings", "Accounts Settings", "unlink_payment_on_cancel_of_invoice", unlink_enabled)
frappe.db.set_value("Company", "_Test Company", "exchange_gain_loss_account", original_account)
def test_purchase_invoice_advance_taxes(self):
from erpnext.buying.doctype.purchase_order.test_purchase_order import create_purchase_order

View File

@ -13,7 +13,7 @@ from erpnext.accounts.utils import get_account_currency
from erpnext.stock.doctype.delivery_note.delivery_note import update_billed_amount_based_on_so
from erpnext.projects.doctype.timesheet.timesheet import get_projectwise_timesheet_data
from erpnext.assets.doctype.asset.depreciation \
import get_disposal_account_and_cost_center, get_gl_entries_on_asset_disposal
import get_disposal_account_and_cost_center, get_gl_entries_on_asset_disposal, get_gl_entries_on_asset_regain
from erpnext.stock.doctype.batch.batch import set_batch_nos
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos, get_delivery_note_serial_no
from erpnext.setup.doctype.company.company import update_company_current_month_sales
@ -149,7 +149,7 @@ class SalesInvoice(SellingController):
if self.update_stock:
frappe.throw(_("'Update Stock' cannot be checked for fixed asset sale"))
elif asset.status in ("Scrapped", "Cancelled", "Sold"):
elif asset.status in ("Scrapped", "Cancelled") or (asset.status == "Sold" and not self.is_return):
frappe.throw(_("Row #{0}: Asset {1} cannot be submitted, it is already {2}").format(d.idx, d.asset, asset.status))
def validate_item_cost_centers(self):
@ -918,22 +918,33 @@ class SalesInvoice(SellingController):
for item in self.get("items"):
if flt(item.base_net_amount, item.precision("base_net_amount")):
if item.is_fixed_asset:
asset = frappe.get_doc("Asset", item.asset)
if item.get('asset'):
asset = frappe.get_doc("Asset", item.asset)
else:
frappe.throw(_(
"Row #{0}: You must select an Asset for Item {1}.").format(item.idx, item.item_name),
title=_("Missing Asset")
)
if (len(asset.finance_books) > 1 and not item.finance_book
and asset.finance_books[0].finance_book):
frappe.throw(_("Select finance book for the item {0} at row {1}")
.format(item.item_code, item.idx))
fixed_asset_gl_entries = get_gl_entries_on_asset_disposal(asset,
item.base_net_amount, item.finance_book)
if self.is_return:
fixed_asset_gl_entries = get_gl_entries_on_asset_regain(asset,
item.base_net_amount, item.finance_book)
asset.db_set("disposal_date", None)
else:
fixed_asset_gl_entries = get_gl_entries_on_asset_disposal(asset,
item.base_net_amount, item.finance_book)
asset.db_set("disposal_date", self.posting_date)
for gle in fixed_asset_gl_entries:
gle["against"] = self.customer
gl_entries.append(self.get_gl_dict(gle, item=item))
asset.db_set("disposal_date", self.posting_date)
asset.set_status("Sold" if self.docstatus==1 else None)
self.set_asset_status(asset)
else:
# Do not book income for transfer within same company
if not self.is_internal_transfer():
@ -959,6 +970,12 @@ class SalesInvoice(SellingController):
erpnext.is_perpetual_inventory_enabled(self.company):
gl_entries += super(SalesInvoice, self).get_gl_entries()
def set_asset_status(self, asset):
if self.is_return:
asset.set_status()
else:
asset.set_status("Sold" if self.docstatus==1 else None)
def make_loyalty_point_redemption_gle(self, gl_entries):
if cint(self.redeem_loyalty_points):
gl_entries.append(

View File

@ -10,6 +10,7 @@ from frappe.model.dynamic_links import get_dynamic_link_map
from erpnext.stock.doctype.stock_entry.test_stock_entry import make_stock_entry, get_qty_after_transaction
from erpnext.accounts.doctype.purchase_invoice.test_purchase_invoice import unlink_payment_on_cancel_of_invoice
from erpnext.accounts.doctype.pos_profile.test_pos_profile import make_pos_profile
from erpnext.assets.doctype.asset.test_asset import create_asset, create_asset_data
from erpnext.exceptions import InvalidAccountCurrency, InvalidCurrency
from erpnext.stock.doctype.serial_no.serial_no import SerialNoWarehouseError
from frappe.model.naming import make_autoname
@ -1069,6 +1070,36 @@ class TestSalesInvoice(unittest.TestCase):
self.assertFalse(si1.outstanding_amount)
self.assertEqual(frappe.db.get_value("Sales Invoice", si.name, "outstanding_amount"), 1500)
def test_gle_made_when_asset_is_returned(self):
create_asset_data()
asset = create_asset(item_code="Macbook Pro")
si = create_sales_invoice(item_code="Macbook Pro", asset=asset.name, qty=1, rate=90000)
return_si = create_sales_invoice(is_return=1, return_against=si.name, item_code="Macbook Pro", asset=asset.name, qty=-1, rate=90000)
disposal_account = frappe.get_cached_value("Company", "_Test Company", "disposal_account")
# Asset value is 100,000 but it was sold for 90,000, so there should be a loss of 10,000
loss_for_si = frappe.get_all(
"GL Entry",
filters = {
"voucher_no": si.name,
"account": disposal_account
},
fields = ["credit", "debit"]
)[0]
loss_for_return_si = frappe.get_all(
"GL Entry",
filters = {
"voucher_no": return_si.name,
"account": disposal_account
},
fields = ["credit", "debit"]
)[0]
self.assertEqual(loss_for_si['credit'], loss_for_return_si['debit'])
self.assertEqual(loss_for_si['debit'], loss_for_return_si['credit'])
def test_discount_on_net_total(self):
si = frappe.copy_doc(test_records[2])
@ -2164,6 +2195,7 @@ def create_sales_invoice(**args):
"rate": args.rate if args.get("rate") is not None else 100,
"income_account": args.income_account or "Sales - _TC",
"expense_account": args.expense_account or "Cost of Goods Sold - _TC",
"asset": args.asset or None,
"cost_center": args.cost_center or "_Test Cost Center - _TC",
"serial_no": args.serial_no,
"conversion_factor": 1

View File

@ -743,7 +743,6 @@
"fieldname": "asset",
"fieldtype": "Link",
"label": "Asset",
"no_copy": 1,
"options": "Asset"
},
{
@ -826,7 +825,7 @@
"idx": 1,
"istable": 1,
"links": [],
"modified": "2021-02-23 01:05:22.123527",
"modified": "2021-06-21 23:03:11.599901",
"modified_by": "Administrator",
"module": "Accounts",
"name": "Sales Invoice Item",

View File

@ -176,22 +176,34 @@ def restore_asset(asset_name):
asset.set_status()
@frappe.whitelist()
def get_gl_entries_on_asset_regain(asset, selling_amount=0, finance_book=None):
fixed_asset_account, asset, depreciation_cost_center, accumulated_depr_account, accumulated_depr_amount, disposal_account, value_after_depreciation = \
get_asset_details(asset, finance_book)
gl_entries = [
{
"account": fixed_asset_account,
"debit_in_account_currency": asset.gross_purchase_amount,
"debit": asset.gross_purchase_amount,
"cost_center": depreciation_cost_center
},
{
"account": accumulated_depr_account,
"credit_in_account_currency": accumulated_depr_amount,
"credit": accumulated_depr_amount,
"cost_center": depreciation_cost_center
}
]
profit_amount = abs(flt(value_after_depreciation)) - abs(flt(selling_amount))
if profit_amount:
get_profit_gl_entries(profit_amount, gl_entries, disposal_account, depreciation_cost_center)
return gl_entries
def get_gl_entries_on_asset_disposal(asset, selling_amount=0, finance_book=None):
fixed_asset_account, accumulated_depr_account, depr_expense_account = get_depreciation_accounts(asset)
disposal_account, depreciation_cost_center = get_disposal_account_and_cost_center(asset.company)
depreciation_cost_center = asset.cost_center or depreciation_cost_center
idx = 1
if finance_book:
for d in asset.finance_books:
if d.finance_book == finance_book:
idx = d.idx
break
value_after_depreciation = (asset.finance_books[idx - 1].value_after_depreciation
if asset.calculate_depreciation else asset.value_after_depreciation)
accumulated_depr_amount = flt(asset.gross_purchase_amount) - flt(value_after_depreciation)
fixed_asset_account, asset, depreciation_cost_center, accumulated_depr_account, accumulated_depr_amount, disposal_account, value_after_depreciation = \
get_asset_details(asset, finance_book)
gl_entries = [
{
@ -210,16 +222,37 @@ def get_gl_entries_on_asset_disposal(asset, selling_amount=0, finance_book=None)
profit_amount = flt(selling_amount) - flt(value_after_depreciation)
if profit_amount:
debit_or_credit = "debit" if profit_amount < 0 else "credit"
gl_entries.append({
"account": disposal_account,
"cost_center": depreciation_cost_center,
debit_or_credit: abs(profit_amount),
debit_or_credit + "_in_account_currency": abs(profit_amount)
})
get_profit_gl_entries(profit_amount, gl_entries, disposal_account, depreciation_cost_center)
return gl_entries
def get_asset_details(asset, finance_book=None):
fixed_asset_account, accumulated_depr_account, depr_expense_account = get_depreciation_accounts(asset)
disposal_account, depreciation_cost_center = get_disposal_account_and_cost_center(asset.company)
depreciation_cost_center = asset.cost_center or depreciation_cost_center
idx = 1
if finance_book:
for d in asset.finance_books:
if d.finance_book == finance_book:
idx = d.idx
break
value_after_depreciation = (asset.finance_books[idx - 1].value_after_depreciation
if asset.calculate_depreciation else asset.value_after_depreciation)
accumulated_depr_amount = flt(asset.gross_purchase_amount) - flt(value_after_depreciation)
return fixed_asset_account, asset, depreciation_cost_center, accumulated_depr_account, accumulated_depr_amount, disposal_account, value_after_depreciation
def get_profit_gl_entries(profit_amount, gl_entries, disposal_account, depreciation_cost_center):
debit_or_credit = "debit" if profit_amount < 0 else "credit"
gl_entries.append({
"account": disposal_account,
"cost_center": depreciation_cost_center,
debit_or_credit: abs(profit_amount),
debit_or_credit + "_in_account_currency": abs(profit_amount)
})
@frappe.whitelist()
def get_disposal_account_and_cost_center(company):
disposal_account, depreciation_cost_center = frappe.get_cached_value('Company', company,

View File

@ -245,7 +245,10 @@ doc_events = {
"erpnext.portal.utils.set_default_role"]
},
"Communication": {
"on_update": "erpnext.support.doctype.service_level_agreement.service_level_agreement.update_hold_time"
"on_update": [
"erpnext.support.doctype.service_level_agreement.service_level_agreement.update_hold_time",
"erpnext.support.doctype.issue.issue.set_first_response_time"
]
},
("Sales Taxes and Charges Template", 'Price List'): {
"on_update": "erpnext.shopping_cart.doctype.shopping_cart_settings.shopping_cart_settings.validate_cart_settings"

View File

@ -12,10 +12,14 @@ from frappe.desk.notifications import clear_notifications
class TransactionDeletionRecord(Document):
def validate(self):
frappe.only_for('System Manager')
self.validate_doctypes_to_be_ignored()
def validate_doctypes_to_be_ignored(self):
doctypes_to_be_ignored_list = get_doctypes_to_be_ignored()
for doctype in self.doctypes_to_be_ignored:
if doctype.doctype_name not in doctypes_to_be_ignored_list:
frappe.throw(_("DocTypes should not be added manually to the 'Excluded DocTypes' table. You are only allowed to remove entries from it. "), title=_("Not Allowed"))
frappe.throw(_("DocTypes should not be added manually to the 'Excluded DocTypes' table. You are only allowed to remove entries from it. "),
title=_("Not Allowed"))
def before_submit(self):
if not self.doctypes_to_be_ignored:
@ -23,54 +27,9 @@ class TransactionDeletionRecord(Document):
self.delete_bins()
self.delete_lead_addresses()
company_obj = frappe.get_doc('Company', self.company)
# reset company values
company_obj.total_monthly_sales = 0
company_obj.sales_monthly_history = None
company_obj.save()
# Clear notification counts
self.reset_company_values()
clear_notifications()
singles = frappe.get_all('DocType', filters = {'issingle': 1}, pluck = 'name')
tables = frappe.get_all('DocType', filters = {'istable': 1}, pluck = 'name')
doctypes_to_be_ignored_list = singles
for doctype in self.doctypes_to_be_ignored:
doctypes_to_be_ignored_list.append(doctype.doctype_name)
docfields = frappe.get_all('DocField',
filters = {
'fieldtype': 'Link',
'options': 'Company',
'parent': ['not in', doctypes_to_be_ignored_list]},
fields=['parent', 'fieldname'])
for docfield in docfields:
if docfield['parent'] != self.doctype:
no_of_docs = frappe.db.count(docfield['parent'], {
docfield['fieldname'] : self.company
})
if no_of_docs > 0:
self.delete_version_log(docfield['parent'], docfield['fieldname'])
self.delete_communications(docfield['parent'], docfield['fieldname'])
# populate DocTypes table
if docfield['parent'] not in tables:
self.append('doctypes', {
'doctype_name' : docfield['parent'],
'no_of_docs' : no_of_docs
})
# delete the docs linked with the specified company
frappe.db.delete(docfield['parent'], {
docfield['fieldname'] : self.company
})
naming_series = frappe.db.get_value('DocType', docfield['parent'], 'autoname')
if naming_series:
if '#' in naming_series:
self.update_naming_series(naming_series, docfield['parent'])
self.delete_company_transactions()
def populate_doctypes_to_be_ignored_table(self):
doctypes_to_be_ignored_list = get_doctypes_to_be_ignored()
@ -79,6 +38,111 @@ class TransactionDeletionRecord(Document):
'doctype_name' : doctype
})
def delete_bins(self):
frappe.db.sql("""delete from tabBin where warehouse in
(select name from tabWarehouse where company=%s)""", self.company)
def delete_lead_addresses(self):
"""Delete addresses to which leads are linked"""
leads = frappe.get_all('Lead', filters={'company': self.company})
leads = ["'%s'" % row.get("name") for row in leads]
addresses = []
if leads:
addresses = frappe.db.sql_list("""select parent from `tabDynamic Link` where link_name
in ({leads})""".format(leads=",".join(leads)))
if addresses:
addresses = ["%s" % frappe.db.escape(addr) for addr in addresses]
frappe.db.sql("""delete from tabAddress where name in ({addresses}) and
name not in (select distinct dl1.parent from `tabDynamic Link` dl1
inner join `tabDynamic Link` dl2 on dl1.parent=dl2.parent
and dl1.link_doctype<>dl2.link_doctype)""".format(addresses=",".join(addresses)))
frappe.db.sql("""delete from `tabDynamic Link` where link_doctype='Lead'
and parenttype='Address' and link_name in ({leads})""".format(leads=",".join(leads)))
frappe.db.sql("""update tabCustomer set lead_name=NULL where lead_name in ({leads})""".format(leads=",".join(leads)))
def reset_company_values(self):
company_obj = frappe.get_doc('Company', self.company)
company_obj.total_monthly_sales = 0
company_obj.sales_monthly_history = None
company_obj.save()
def delete_company_transactions(self):
doctypes_to_be_ignored_list = self.get_doctypes_to_be_ignored_list()
docfields = self.get_doctypes_with_company_field(doctypes_to_be_ignored_list)
tables = self.get_all_child_doctypes()
for docfield in docfields:
if docfield['parent'] != self.doctype:
no_of_docs = self.get_number_of_docs_linked_with_specified_company(docfield['parent'], docfield['fieldname'])
if no_of_docs > 0:
self.delete_version_log(docfield['parent'], docfield['fieldname'])
self.delete_communications(docfield['parent'], docfield['fieldname'])
self.populate_doctypes_table(tables, docfield['parent'], no_of_docs)
self.delete_child_tables(docfield['parent'], docfield['fieldname'])
self.delete_docs_linked_with_specified_company(docfield['parent'], docfield['fieldname'])
naming_series = frappe.db.get_value('DocType', docfield['parent'], 'autoname')
if naming_series:
if '#' in naming_series:
self.update_naming_series(naming_series, docfield['parent'])
def get_doctypes_to_be_ignored_list(self):
singles = frappe.get_all('DocType', filters = {'issingle': 1}, pluck = 'name')
doctypes_to_be_ignored_list = singles
for doctype in self.doctypes_to_be_ignored:
doctypes_to_be_ignored_list.append(doctype.doctype_name)
return doctypes_to_be_ignored_list
def get_doctypes_with_company_field(self, doctypes_to_be_ignored_list):
docfields = frappe.get_all('DocField',
filters = {
'fieldtype': 'Link',
'options': 'Company',
'parent': ['not in', doctypes_to_be_ignored_list]},
fields=['parent', 'fieldname'])
return docfields
def get_all_child_doctypes(self):
return frappe.get_all('DocType', filters = {'istable': 1}, pluck = 'name')
def get_number_of_docs_linked_with_specified_company(self, doctype, company_fieldname):
return frappe.db.count(doctype, {company_fieldname : self.company})
def populate_doctypes_table(self, tables, doctype, no_of_docs):
if doctype not in tables:
self.append('doctypes', {
'doctype_name' : doctype,
'no_of_docs' : no_of_docs
})
def delete_child_tables(self, doctype, company_fieldname):
parent_docs_to_be_deleted = frappe.get_all(doctype, {
company_fieldname : self.company
}, pluck = 'name')
child_tables = frappe.get_all('DocField', filters = {
'fieldtype': 'Table',
'parent': doctype
}, pluck = 'options')
for table in child_tables:
frappe.db.delete(table, {
'parent': ['in', parent_docs_to_be_deleted]
})
def delete_docs_linked_with_specified_company(self, doctype, company_fieldname):
frappe.db.delete(doctype, {
company_fieldname : self.company
})
def update_naming_series(self, naming_series, doctype_name):
if '.' in naming_series:
prefix, hashes = naming_series.rsplit('.', 1)
@ -107,32 +171,6 @@ class TransactionDeletionRecord(Document):
frappe.delete_doc('Communication', communication_names, ignore_permissions=True)
def delete_bins(self):
frappe.db.sql("""delete from tabBin where warehouse in
(select name from tabWarehouse where company=%s)""", self.company)
def delete_lead_addresses(self):
"""Delete addresses to which leads are linked"""
leads = frappe.get_all('Lead', filters={'company': self.company})
leads = ["'%s'" % row.get("name") for row in leads]
addresses = []
if leads:
addresses = frappe.db.sql_list("""select parent from `tabDynamic Link` where link_name
in ({leads})""".format(leads=",".join(leads)))
if addresses:
addresses = ["%s" % frappe.db.escape(addr) for addr in addresses]
frappe.db.sql("""delete from tabAddress where name in ({addresses}) and
name not in (select distinct dl1.parent from `tabDynamic Link` dl1
inner join `tabDynamic Link` dl2 on dl1.parent=dl2.parent
and dl1.link_doctype<>dl2.link_doctype)""".format(addresses=",".join(addresses)))
frappe.db.sql("""delete from `tabDynamic Link` where link_doctype='Lead'
and parenttype='Address' and link_name in ({leads})""".format(leads=",".join(leads)))
frappe.db.sql("""update tabCustomer set lead_name=NULL where lead_name in ({leads})""".format(leads=",".join(leads)))
@frappe.whitelist()
def get_doctypes_to_be_ignored():
doctypes_to_be_ignored_list = ['Account', 'Cost Center', 'Warehouse', 'Budget',

View File

@ -587,8 +587,8 @@ def make_item_variant():
test_records = frappe.get_test_records('Item')
def create_item(item_code, is_stock_item=1, valuation_rate=0, warehouse="_Test Warehouse - _TC",
is_customer_provided_item=None, customer=None, is_purchase_item=None, opening_stock=0,
company="_Test Company"):
is_customer_provided_item=None, customer=None, is_purchase_item=None, opening_stock=0, is_fixed_asset=0,
asset_category=None, company="_Test Company"):
if not frappe.db.exists("Item", item_code):
item = frappe.new_doc("Item")
item.item_code = item_code
@ -596,6 +596,8 @@ def create_item(item_code, is_stock_item=1, valuation_rate=0, warehouse="_Test W
item.description = item_code
item.item_group = "All Item Groups"
item.is_stock_item = is_stock_item
item.is_fixed_asset = is_fixed_asset
item.asset_category = asset_category
item.opening_stock = opening_stock
item.valuation_rate = valuation_rate
item.is_purchase_item = is_purchase_item

View File

@ -5,10 +5,10 @@ from __future__ import unicode_literals
import frappe
import json
from frappe import _
from frappe import utils
from frappe.model.document import Document
from frappe.utils import now_datetime
from datetime import datetime, timedelta
from frappe.utils import now_datetime, time_diff_in_seconds, get_datetime, date_diff
from frappe.core.utils import get_parent_doc
from datetime import timedelta
from frappe.model.mapper import get_mapped_doc
from frappe.utils.user import is_website_user
from frappe.email.inbox import link_communication_to_document
@ -212,7 +212,129 @@ def make_issue_from_communication(communication, ignore_communication_links=Fals
return issue.name
def get_time_in_timedelta(time):
"""
Converts datetime.time(10, 36, 55, 961454) to datetime.timedelta(seconds=38215)
"""
return timedelta(hours=time.hour, minutes=time.minute, seconds=time.second)
def set_first_response_time(communication, method):
if communication.get('reference_doctype') == "Issue":
issue = get_parent_doc(communication)
if is_first_response(issue):
first_response_time = calculate_first_response_time(issue, get_datetime(issue.first_responded_on))
issue.db_set("first_response_time", first_response_time)
def is_first_response(issue):
responses = frappe.get_all('Communication', filters = {'reference_name': issue.name, 'sent_or_received': 'Sent'})
if len(responses) == 1:
return True
return False
def calculate_first_response_time(issue, first_responded_on):
issue_creation_date = issue.creation
issue_creation_time = get_time_in_seconds(issue_creation_date)
first_responded_on_in_seconds = get_time_in_seconds(first_responded_on)
support_hours = frappe.get_cached_doc("Service Level Agreement", issue.service_level_agreement).support_and_resolution
if issue_creation_date.day == first_responded_on.day:
if is_work_day(issue_creation_date, support_hours):
start_time, end_time = get_working_hours(issue_creation_date, support_hours)
# issue creation and response on the same day during working hours
if is_during_working_hours(issue_creation_date, support_hours) and is_during_working_hours(first_responded_on, support_hours):
return get_elapsed_time(issue_creation_date, first_responded_on)
# issue creation is during working hours, but first response was after working hours
elif is_during_working_hours(issue_creation_date, support_hours):
return get_elapsed_time(issue_creation_time, end_time)
# issue creation was before working hours but first response is during working hours
elif is_during_working_hours(first_responded_on, support_hours):
return get_elapsed_time(start_time, first_responded_on_in_seconds)
# both issue creation and first response were after working hours
else:
return 1.0 # this should ideally be zero, but it gets reset when the next response is sent if the value is zero
else:
return 1.0
else:
# response on the next day
if date_diff(first_responded_on, issue_creation_date) == 1:
first_response_time = 0
else:
first_response_time = calculate_initial_frt(issue_creation_date, date_diff(first_responded_on, issue_creation_date)- 1, support_hours)
# time taken on day of issue creation
if is_work_day(issue_creation_date, support_hours):
start_time, end_time = get_working_hours(issue_creation_date, support_hours)
if is_during_working_hours(issue_creation_date, support_hours):
first_response_time += get_elapsed_time(issue_creation_time, end_time)
elif is_before_working_hours(issue_creation_date, support_hours):
first_response_time += get_elapsed_time(start_time, end_time)
# time taken on day of first response
if is_work_day(first_responded_on, support_hours):
start_time, end_time = get_working_hours(first_responded_on, support_hours)
if is_during_working_hours(first_responded_on, support_hours):
first_response_time += get_elapsed_time(start_time, first_responded_on_in_seconds)
elif not is_before_working_hours(first_responded_on, support_hours):
first_response_time += get_elapsed_time(start_time, end_time)
if first_response_time:
return first_response_time
else:
return 1.0
def get_time_in_seconds(date):
return timedelta(hours=date.hour, minutes=date.minute, seconds=date.second)
def get_working_hours(date, support_hours):
if is_work_day(date, support_hours):
weekday = frappe.utils.get_weekday(date)
for day in support_hours:
if day.workday == weekday:
return day.start_time, day.end_time
def is_work_day(date, support_hours):
weekday = frappe.utils.get_weekday(date)
for day in support_hours:
if day.workday == weekday:
return True
return False
def is_during_working_hours(date, support_hours):
start_time, end_time = get_working_hours(date, support_hours)
time = get_time_in_seconds(date)
if time >= start_time and time <= end_time:
return True
return False
def get_elapsed_time(start_time, end_time):
return round(time_diff_in_seconds(end_time, start_time), 2)
def calculate_initial_frt(issue_creation_date, days_in_between, support_hours):
initial_frt = 0
for i in range(days_in_between):
date = issue_creation_date + timedelta(days = (i+1))
if is_work_day(date, support_hours):
start_time, end_time = get_working_hours(date, support_hours)
initial_frt += get_elapsed_time(start_time, end_time)
return initial_frt
def is_before_working_hours(date, support_hours):
start_time, end_time = get_working_hours(date, support_hours)
time = get_time_in_seconds(date)
if time < start_time:
return True
return False
def get_holidays(holiday_list_name):
holiday_list = frappe.get_cached_doc("Holiday List", holiday_list_name)
holidays = [holiday.holiday_date for holiday in holiday_list.holidays]
return holidays
return holidays

View File

@ -5,16 +5,18 @@ from __future__ import unicode_literals
import frappe
import unittest
from erpnext.support.doctype.service_level_agreement.test_service_level_agreement import create_service_level_agreements_for_issues
from frappe.utils import now_datetime, get_datetime, flt
from frappe.core.doctype.user_permission.test_user_permission import create_user
from frappe.utils import get_datetime, flt
import datetime
from datetime import timedelta
class TestIssue(unittest.TestCase):
class TestSetUp(unittest.TestCase):
def setUp(self):
frappe.db.sql("delete from `tabService Level Agreement`")
frappe.db.set_value("Support Settings", None, "track_service_level_agreement", 1)
create_service_level_agreements_for_issues()
class TestIssue(TestSetUp):
def test_response_time_and_resolution_time_based_on_different_sla(self):
creation = datetime.datetime(2019, 3, 4, 12, 0)
@ -133,6 +135,223 @@ class TestIssue(unittest.TestCase):
issue.reload()
self.assertEqual(flt(issue.total_hold_time, 2), 2700)
class TestFirstResponseTime(TestSetUp):
# working hours used in all cases: Mon-Fri, 10am to 6pm
# all dates are in the mm-dd-yyyy format
# issue creation and first response are on the same day
def test_first_response_time_case1(self):
"""
Test frt when issue creation and first response are during working hours on the same day.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 11:00"), get_datetime("06-28-2021 12:00"))
self.assertEqual(issue.first_response_time, 3600.0)
def test_first_response_time_case2(self):
"""
Test frt when issue creation was during working hours, but first response is sent after working hours on the same day.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 12:00"), get_datetime("06-28-2021 20:00"))
self.assertEqual(issue.first_response_time, 21600.0)
def test_first_response_time_case3(self):
"""
Test frt when issue creation was before working hours but first response is sent during working hours on the same day.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 6:00"), get_datetime("06-28-2021 12:00"))
self.assertEqual(issue.first_response_time, 7200.0)
def test_first_response_time_case4(self):
"""
Test frt when both issue creation and first response were after working hours on the same day.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 19:00"), get_datetime("06-28-2021 20:00"))
self.assertEqual(issue.first_response_time, 1.0)
def test_first_response_time_case5(self):
"""
Test frt when both issue creation and first response are on the same day, but it's not a work day.
"""
issue = create_issue_and_communication(get_datetime("06-27-2021 10:00"), get_datetime("06-27-2021 11:00"))
self.assertEqual(issue.first_response_time, 1.0)
# issue creation and first response are on consecutive days
def test_first_response_time_case6(self):
"""
Test frt when the issue was created before working hours and the first response is also sent before working hours, but on the next day.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 6:00"), get_datetime("06-29-2021 6:00"))
self.assertEqual(issue.first_response_time, 28800.0)
def test_first_response_time_case7(self):
"""
Test frt when the issue was created before working hours and the first response is sent during working hours, but on the next day.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 6:00"), get_datetime("06-29-2021 11:00"))
self.assertEqual(issue.first_response_time, 32400.0)
def test_first_response_time_case8(self):
"""
Test frt when the issue was created before working hours and the first response is sent after working hours, but on the next day.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 6:00"), get_datetime("06-29-2021 20:00"))
self.assertEqual(issue.first_response_time, 57600.0)
def test_first_response_time_case9(self):
"""
Test frt when the issue was created before working hours and the first response is sent on the next day, which is not a work day.
"""
issue = create_issue_and_communication(get_datetime("06-25-2021 6:00"), get_datetime("06-26-2021 11:00"))
self.assertEqual(issue.first_response_time, 28800.0)
def test_first_response_time_case10(self):
"""
Test frt when the issue was created during working hours and the first response is sent before working hours, but on the next day.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 12:00"), get_datetime("06-29-2021 6:00"))
self.assertEqual(issue.first_response_time, 21600.0)
def test_first_response_time_case11(self):
"""
Test frt when the issue was created during working hours and the first response is also sent during working hours, but on the next day.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 12:00"), get_datetime("06-29-2021 11:00"))
self.assertEqual(issue.first_response_time, 25200.0)
def test_first_response_time_case12(self):
"""
Test frt when the issue was created during working hours and the first response is sent after working hours, but on the next day.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 12:00"), get_datetime("06-29-2021 20:00"))
self.assertEqual(issue.first_response_time, 50400.0)
def test_first_response_time_case13(self):
"""
Test frt when the issue was created during working hours and the first response is sent on the next day, which is not a work day.
"""
issue = create_issue_and_communication(get_datetime("06-25-2021 12:00"), get_datetime("06-26-2021 11:00"))
self.assertEqual(issue.first_response_time, 21600.0)
def test_first_response_time_case14(self):
"""
Test frt when the issue was created after working hours and the first response is sent before working hours, but on the next day.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 20:00"), get_datetime("06-29-2021 6:00"))
self.assertEqual(issue.first_response_time, 1.0)
def test_first_response_time_case15(self):
"""
Test frt when the issue was created after working hours and the first response is sent during working hours, but on the next day.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 20:00"), get_datetime("06-29-2021 11:00"))
self.assertEqual(issue.first_response_time, 3600.0)
def test_first_response_time_case16(self):
"""
Test frt when the issue was created after working hours and the first response is also sent after working hours, but on the next day.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 20:00"), get_datetime("06-29-2021 20:00"))
self.assertEqual(issue.first_response_time, 28800.0)
def test_first_response_time_case17(self):
"""
Test frt when the issue was created after working hours and the first response is sent on the next day, which is not a work day.
"""
issue = create_issue_and_communication(get_datetime("06-25-2021 20:00"), get_datetime("06-26-2021 11:00"))
self.assertEqual(issue.first_response_time, 1.0)
# issue creation and first response are a few days apart
def test_first_response_time_case18(self):
"""
Test frt when the issue was created before working hours and the first response is also sent before working hours, but after a few days.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 6:00"), get_datetime("07-01-2021 6:00"))
self.assertEqual(issue.first_response_time, 86400.0)
def test_first_response_time_case19(self):
"""
Test frt when the issue was created before working hours and the first response is sent during working hours, but after a few days.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 6:00"), get_datetime("07-01-2021 11:00"))
self.assertEqual(issue.first_response_time, 90000.0)
def test_first_response_time_case20(self):
"""
Test frt when the issue was created before working hours and the first response is sent after working hours, but after a few days.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 6:00"), get_datetime("07-01-2021 20:00"))
self.assertEqual(issue.first_response_time, 115200.0)
def test_first_response_time_case21(self):
"""
Test frt when the issue was created before working hours and the first response is sent after a few days, on a holiday.
"""
issue = create_issue_and_communication(get_datetime("06-25-2021 6:00"), get_datetime("06-27-2021 11:00"))
self.assertEqual(issue.first_response_time, 28800.0)
def test_first_response_time_case22(self):
"""
Test frt when the issue was created during working hours and the first response is sent before working hours, but after a few days.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 12:00"), get_datetime("07-01-2021 6:00"))
self.assertEqual(issue.first_response_time, 79200.0)
def test_first_response_time_case23(self):
"""
Test frt when the issue was created during working hours and the first response is also sent during working hours, but after a few days.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 12:00"), get_datetime("07-01-2021 11:00"))
self.assertEqual(issue.first_response_time, 82800.0)
def test_first_response_time_case24(self):
"""
Test frt when the issue was created during working hours and the first response is sent after working hours, but after a few days.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 12:00"), get_datetime("07-01-2021 20:00"))
self.assertEqual(issue.first_response_time, 108000.0)
def test_first_response_time_case25(self):
"""
Test frt when the issue was created during working hours and the first response is sent after a few days, on a holiday.
"""
issue = create_issue_and_communication(get_datetime("06-25-2021 12:00"), get_datetime("06-27-2021 11:00"))
self.assertEqual(issue.first_response_time, 21600.0)
def test_first_response_time_case26(self):
"""
Test frt when the issue was created after working hours and the first response is sent before working hours, but after a few days.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 20:00"), get_datetime("07-01-2021 6:00"))
self.assertEqual(issue.first_response_time, 57600.0)
def test_first_response_time_case27(self):
"""
Test frt when the issue was created after working hours and the first response is sent during working hours, but after a few days.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 20:00"), get_datetime("07-01-2021 11:00"))
self.assertEqual(issue.first_response_time, 61200.0)
def test_first_response_time_case28(self):
"""
Test frt when the issue was created after working hours and the first response is also sent after working hours, but after a few days.
"""
issue = create_issue_and_communication(get_datetime("06-28-2021 20:00"), get_datetime("07-01-2021 20:00"))
self.assertEqual(issue.first_response_time, 86400.0)
def test_first_response_time_case29(self):
"""
Test frt when the issue was created after working hours and the first response is sent after a few days, on a holiday.
"""
issue = create_issue_and_communication(get_datetime("06-25-2021 20:00"), get_datetime("06-27-2021 11:00"))
self.assertEqual(issue.first_response_time, 1.0)
def create_issue_and_communication(issue_creation, first_responded_on):
issue = make_issue(issue_creation, index=1)
sender = create_user("test@admin.com")
create_communication(issue.name, sender.email, "Sent", first_responded_on)
issue.reload()
return issue
def make_issue(creation=None, customer=None, index=0, priority=None, issue_type=None):
issue = frappe.get_doc({
@ -185,7 +404,7 @@ def create_territory(territory):
def create_communication(reference_name, sender, sent_or_received, creation):
issue = frappe.get_doc({
communication = frappe.get_doc({
"doctype": "Communication",
"communication_type": "Communication",
"communication_medium": "Email",
@ -199,4 +418,4 @@ def create_communication(reference_name, sender, sent_or_received, creation):
"creation": creation,
"reference_name": reference_name
})
issue.save()
communication.save()

View File

@ -339,16 +339,6 @@ def create_service_level_agreement(default_service_level_agreement, holiday_list
"workday": "Friday",
"start_time": "10:00:00",
"end_time": "18:00:00",
},
{
"workday": "Saturday",
"start_time": "10:00:00",
"end_time": "18:00:00",
},
{
"workday": "Sunday",
"start_time": "10:00:00",
"end_time": "18:00:00",
}
]
})