165 lines
4.4 KiB
Python
165 lines
4.4 KiB
Python
# Copyright (c) 2021, Frappe Technologies Pvt. Ltd. and Contributors
|
|
# License: GNU General Public License v3. See license.txt
|
|
|
|
import copy
|
|
import signal
|
|
import unittest
|
|
from contextlib import contextmanager
|
|
from typing import Any, Dict, NewType, Optional
|
|
|
|
import frappe
|
|
from frappe.core.doctype.report.report import get_report_module_dotted_path
|
|
|
|
ReportFilters = Dict[str, Any]
|
|
ReportName = NewType("ReportName", str)
|
|
|
|
|
|
class ERPNextTestCase(unittest.TestCase):
|
|
"""A sane default test class for ERPNext tests."""
|
|
|
|
|
|
@classmethod
|
|
def setUpClass(cls) -> None:
|
|
frappe.db.commit()
|
|
return super().setUpClass()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls) -> None:
|
|
frappe.db.rollback()
|
|
return super().tearDownClass()
|
|
|
|
|
|
def create_test_contact_and_address():
|
|
frappe.db.sql('delete from tabContact')
|
|
frappe.db.sql('delete from `tabContact Email`')
|
|
frappe.db.sql('delete from `tabContact Phone`')
|
|
frappe.db.sql('delete from tabAddress')
|
|
frappe.db.sql('delete from `tabDynamic Link`')
|
|
|
|
frappe.get_doc({
|
|
"doctype": "Address",
|
|
"address_title": "_Test Address for Customer",
|
|
"address_type": "Office",
|
|
"address_line1": "Station Road",
|
|
"city": "_Test City",
|
|
"state": "Test State",
|
|
"country": "India",
|
|
"links": [
|
|
{
|
|
"link_doctype": "Customer",
|
|
"link_name": "_Test Customer"
|
|
}
|
|
]
|
|
}).insert()
|
|
|
|
contact = frappe.get_doc({
|
|
"doctype": 'Contact',
|
|
"first_name": "_Test Contact for _Test Customer",
|
|
"links": [
|
|
{
|
|
"link_doctype": "Customer",
|
|
"link_name": "_Test Customer"
|
|
}
|
|
]
|
|
})
|
|
contact.add_email("test_contact_customer@example.com", is_primary=True)
|
|
contact.add_phone("+91 0000000000", is_primary_phone=True)
|
|
contact.insert()
|
|
|
|
|
|
@contextmanager
|
|
def change_settings(doctype, settings_dict):
|
|
""" A context manager to ensure that settings are changed before running
|
|
function and restored after running it regardless of exceptions occured.
|
|
This is useful in tests where you want to make changes in a function but
|
|
don't retain those changes.
|
|
import and use as decorator to cover full function or using `with` statement.
|
|
|
|
example:
|
|
@change_settings("Stock Settings", {"item_naming_by": "Naming Series"})
|
|
def test_case(self):
|
|
...
|
|
"""
|
|
|
|
try:
|
|
settings = frappe.get_doc(doctype)
|
|
# remember setting
|
|
previous_settings = copy.deepcopy(settings_dict)
|
|
for key in previous_settings:
|
|
previous_settings[key] = getattr(settings, key)
|
|
|
|
# change setting
|
|
for key, value in settings_dict.items():
|
|
setattr(settings, key, value)
|
|
settings.save()
|
|
yield # yield control to calling function
|
|
|
|
finally:
|
|
# restore settings
|
|
settings = frappe.get_doc(doctype)
|
|
for key, value in previous_settings.items():
|
|
setattr(settings, key, value)
|
|
settings.save()
|
|
|
|
|
|
def execute_script_report(
|
|
report_name: ReportName,
|
|
module: str,
|
|
filters: ReportFilters,
|
|
default_filters: Optional[ReportFilters] = None,
|
|
optional_filters: Optional[ReportFilters] = None
|
|
):
|
|
"""Util for testing execution of a report with specified filters.
|
|
|
|
Tests the execution of report with default_filters + filters.
|
|
Tests the execution using optional_filters one at a time.
|
|
|
|
Args:
|
|
report_name: Human readable name of report (unscrubbed)
|
|
module: module to which report belongs to
|
|
filters: specific values for filters
|
|
default_filters: default values for filters such as company name.
|
|
optional_filters: filters which should be tested one at a time in addition to default filters.
|
|
"""
|
|
|
|
if default_filters is None:
|
|
default_filters = {}
|
|
|
|
test_filters = []
|
|
report_execute_fn = frappe.get_attr(get_report_module_dotted_path(module, report_name) + ".execute")
|
|
report_filters = frappe._dict(default_filters).copy().update(filters)
|
|
|
|
test_filters.append(report_filters)
|
|
|
|
if optional_filters:
|
|
for key, value in optional_filters.items():
|
|
test_filters.append(report_filters.copy().update({key: value}))
|
|
|
|
for test_filter in test_filters:
|
|
try:
|
|
report_execute_fn(test_filter)
|
|
except Exception:
|
|
print(f"Report failed to execute with filters: {test_filter}")
|
|
raise
|
|
|
|
|
|
|
|
def timeout(seconds=30, error_message="Test timed out."):
|
|
""" Timeout decorator to ensure a test doesn't run for too long.
|
|
|
|
adapted from https://stackoverflow.com/a/2282656"""
|
|
def decorator(func):
|
|
def _handle_timeout(signum, frame):
|
|
raise Exception(error_message)
|
|
|
|
def wrapper(*args, **kwargs):
|
|
signal.signal(signal.SIGALRM, _handle_timeout)
|
|
signal.alarm(seconds)
|
|
try:
|
|
result = func(*args, **kwargs)
|
|
finally:
|
|
signal.alarm(0)
|
|
return result
|
|
return wrapper
|
|
return decorator
|