refactor!: remove Mpesa Settings

This commit is contained in:
s-aga-r 2023-09-21 07:32:39 +05:30
parent eded7871f3
commit eb419e8e59
9 changed files with 0 additions and 1170 deletions

View File

@ -1,28 +0,0 @@
{% if not jQuery.isEmptyObject(data) %}
<h5 style="margin-top: 20px;"> {{ __("Balance Details") }} </h5>
<table class="table table-bordered small">
<thead>
<tr>
<th style="width: 20%">{{ __("Account Type") }}</th>
<th style="width: 20%" class="text-right">{{ __("Current Balance") }}</th>
<th style="width: 20%" class="text-right">{{ __("Available Balance") }}</th>
<th style="width: 20%" class="text-right">{{ __("Reserved Balance") }}</th>
<th style="width: 20%" class="text-right">{{ __("Uncleared Balance") }}</th>
</tr>
</thead>
<tbody>
{% for(const [key, value] of Object.entries(data)) { %}
<tr>
<td> {%= key %} </td>
<td class="text-right"> {%= value["current_balance"] %} </td>
<td class="text-right"> {%= value["available_balance"] %} </td>
<td class="text-right"> {%= value["reserved_balance"] %} </td>
<td class="text-right"> {%= value["uncleared_balance"] %} </td>
</tr>
{% } %}
</tbody>
</table>
{% else %}
<p style="margin-top: 30px;"> Account Balance Information Not Available. </p>
{% endif %}

View File

@ -1,149 +0,0 @@
import base64
import datetime
import requests
from requests.auth import HTTPBasicAuth
class MpesaConnector:
def __init__(
self,
env="sandbox",
app_key=None,
app_secret=None,
sandbox_url="https://sandbox.safaricom.co.ke",
live_url="https://api.safaricom.co.ke",
):
"""Setup configuration for Mpesa connector and generate new access token."""
self.env = env
self.app_key = app_key
self.app_secret = app_secret
if env == "sandbox":
self.base_url = sandbox_url
else:
self.base_url = live_url
self.authenticate()
def authenticate(self):
"""
This method is used to fetch the access token required by Mpesa.
Returns:
access_token (str): This token is to be used with the Bearer header for further API calls to Mpesa.
"""
authenticate_uri = "/oauth/v1/generate?grant_type=client_credentials"
authenticate_url = "{0}{1}".format(self.base_url, authenticate_uri)
r = requests.get(authenticate_url, auth=HTTPBasicAuth(self.app_key, self.app_secret))
self.authentication_token = r.json()["access_token"]
return r.json()["access_token"]
def get_balance(
self,
initiator=None,
security_credential=None,
party_a=None,
identifier_type=None,
remarks=None,
queue_timeout_url=None,
result_url=None,
):
"""
This method uses Mpesa's Account Balance API to to enquire the balance on a M-Pesa BuyGoods (Till Number).
Args:
initiator (str): Username used to authenticate the transaction.
security_credential (str): Generate from developer portal.
command_id (str): AccountBalance.
party_a (int): Till number being queried.
identifier_type (int): Type of organization receiving the transaction. (MSISDN/Till Number/Organization short code)
remarks (str): Comments that are sent along with the transaction(maximum 100 characters).
queue_timeout_url (str): The url that handles information of timed out transactions.
result_url (str): The url that receives results from M-Pesa api call.
Returns:
OriginatorConverstionID (str): The unique request ID for tracking a transaction.
ConversationID (str): The unique request ID returned by mpesa for each request made
ResponseDescription (str): Response Description message
"""
payload = {
"Initiator": initiator,
"SecurityCredential": security_credential,
"CommandID": "AccountBalance",
"PartyA": party_a,
"IdentifierType": identifier_type,
"Remarks": remarks,
"QueueTimeOutURL": queue_timeout_url,
"ResultURL": result_url,
}
headers = {
"Authorization": "Bearer {0}".format(self.authentication_token),
"Content-Type": "application/json",
}
saf_url = "{0}{1}".format(self.base_url, "/mpesa/accountbalance/v1/query")
r = requests.post(saf_url, headers=headers, json=payload)
return r.json()
def stk_push(
self,
business_shortcode=None,
passcode=None,
amount=None,
callback_url=None,
reference_code=None,
phone_number=None,
description=None,
):
"""
This method uses Mpesa's Express API to initiate online payment on behalf of a customer.
Args:
business_shortcode (int): The short code of the organization.
passcode (str): Get from developer portal
amount (int): The amount being transacted
callback_url (str): A CallBack URL is a valid secure URL that is used to receive notifications from M-Pesa API.
reference_code(str): Account Reference: This is an Alpha-Numeric parameter that is defined by your system as an Identifier of the transaction for CustomerPayBillOnline transaction type.
phone_number(int): The Mobile Number to receive the STK Pin Prompt.
description(str): This is any additional information/comment that can be sent along with the request from your system. MAX 13 characters
Success Response:
CustomerMessage(str): Messages that customers can understand.
CheckoutRequestID(str): This is a global unique identifier of the processed checkout transaction request.
ResponseDescription(str): Describes Success or failure
MerchantRequestID(str): This is a global unique Identifier for any submitted payment request.
ResponseCode(int): 0 means success all others are error codes. e.g.404.001.03
Error Reponse:
requestId(str): This is a unique requestID for the payment request
errorCode(str): This is a predefined code that indicates the reason for request failure.
errorMessage(str): This is a predefined code that indicates the reason for request failure.
"""
time = (
str(datetime.datetime.now()).split(".")[0].replace("-", "").replace(" ", "").replace(":", "")
)
password = "{0}{1}{2}".format(str(business_shortcode), str(passcode), time)
encoded = base64.b64encode(bytes(password, encoding="utf8"))
payload = {
"BusinessShortCode": business_shortcode,
"Password": encoded.decode("utf-8"),
"Timestamp": time,
"Amount": amount,
"PartyA": int(phone_number),
"PartyB": reference_code,
"PhoneNumber": int(phone_number),
"CallBackURL": callback_url,
"AccountReference": reference_code,
"TransactionDesc": description,
"TransactionType": "CustomerPayBillOnline"
if self.env == "sandbox"
else "CustomerBuyGoodsOnline",
}
headers = {
"Authorization": "Bearer {0}".format(self.authentication_token),
"Content-Type": "application/json",
}
saf_url = "{0}{1}".format(self.base_url, "/mpesa/stkpush/v1/processrequest")
r = requests.post(saf_url, headers=headers, json=payload)
return r.json()

View File

@ -1,56 +0,0 @@
import frappe
from frappe.custom.doctype.custom_field.custom_field import create_custom_fields
def create_custom_pos_fields():
"""Create custom fields corresponding to POS Settings and POS Invoice."""
pos_field = {
"POS Invoice": [
{
"fieldname": "request_for_payment",
"label": "Request for Payment",
"fieldtype": "Button",
"hidden": 1,
"insert_after": "contact_email",
},
{
"fieldname": "mpesa_receipt_number",
"label": "Mpesa Receipt Number",
"fieldtype": "Data",
"read_only": 1,
"insert_after": "company",
},
]
}
if not frappe.get_meta("POS Invoice").has_field("request_for_payment"):
create_custom_fields(pos_field)
record_dict = [
{
"doctype": "POS Field",
"fieldname": "contact_mobile",
"label": "Mobile No",
"fieldtype": "Data",
"options": "Phone",
"parenttype": "POS Settings",
"parent": "POS Settings",
"parentfield": "invoice_fields",
},
{
"doctype": "POS Field",
"fieldname": "request_for_payment",
"label": "Request for Payment",
"fieldtype": "Button",
"parenttype": "POS Settings",
"parent": "POS Settings",
"parentfield": "invoice_fields",
},
]
create_pos_settings(record_dict)
def create_pos_settings(record_dict):
for record in record_dict:
if frappe.db.exists("POS Field", {"fieldname": record.get("fieldname")}):
continue
frappe.get_doc(record).insert()

View File

@ -1,39 +0,0 @@
// Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and contributors
// For license information, please see license.txt
frappe.ui.form.on('Mpesa Settings', {
onload_post_render: function(frm) {
frm.events.setup_account_balance_html(frm);
},
refresh: function(frm) {
erpnext.utils.check_payments_app();
frappe.realtime.on("refresh_mpesa_dashboard", function(){
frm.reload_doc();
frm.events.setup_account_balance_html(frm);
});
},
get_account_balance: function(frm) {
if (!frm.doc.initiator_name && !frm.doc.security_credential) {
frappe.throw(__("Please set the initiator name and the security credential"));
}
frappe.call({
method: "get_account_balance_info",
doc: frm.doc
});
},
setup_account_balance_html: function(frm) {
if (!frm.doc.account_balance) return;
$("div").remove(".form-dashboard-section.custom");
frm.dashboard.add_section(
frappe.render_template('account_balance', {
data: JSON.parse(frm.doc.account_balance)
})
);
frm.dashboard.show();
}
});

View File

@ -1,152 +0,0 @@
{
"actions": [],
"autoname": "field:payment_gateway_name",
"creation": "2020-09-10 13:21:27.398088",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"payment_gateway_name",
"consumer_key",
"consumer_secret",
"initiator_name",
"till_number",
"transaction_limit",
"sandbox",
"column_break_4",
"business_shortcode",
"online_passkey",
"security_credential",
"get_account_balance",
"account_balance"
],
"fields": [
{
"fieldname": "payment_gateway_name",
"fieldtype": "Data",
"label": "Payment Gateway Name",
"reqd": 1,
"unique": 1
},
{
"fieldname": "consumer_key",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Consumer Key",
"reqd": 1
},
{
"fieldname": "consumer_secret",
"fieldtype": "Password",
"in_list_view": 1,
"label": "Consumer Secret",
"reqd": 1
},
{
"fieldname": "till_number",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Till Number",
"reqd": 1
},
{
"default": "0",
"fieldname": "sandbox",
"fieldtype": "Check",
"label": "Sandbox"
},
{
"fieldname": "column_break_4",
"fieldtype": "Column Break"
},
{
"fieldname": "online_passkey",
"fieldtype": "Password",
"label": " Online PassKey",
"reqd": 1
},
{
"fieldname": "initiator_name",
"fieldtype": "Data",
"label": "Initiator Name"
},
{
"fieldname": "security_credential",
"fieldtype": "Small Text",
"label": "Security Credential"
},
{
"fieldname": "account_balance",
"fieldtype": "Long Text",
"hidden": 1,
"label": "Account Balance",
"read_only": 1
},
{
"fieldname": "get_account_balance",
"fieldtype": "Button",
"label": "Get Account Balance"
},
{
"depends_on": "eval:(doc.sandbox==0)",
"fieldname": "business_shortcode",
"fieldtype": "Data",
"label": "Business Shortcode",
"mandatory_depends_on": "eval:(doc.sandbox==0)"
},
{
"default": "150000",
"fieldname": "transaction_limit",
"fieldtype": "Float",
"label": "Transaction Limit",
"non_negative": 1
}
],
"links": [],
"modified": "2021-03-02 17:35:14.084342",
"modified_by": "Administrator",
"module": "ERPNext Integrations",
"name": "Mpesa Settings",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
},
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "Accounts Manager",
"share": 1,
"write": 1
},
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "Accounts User",
"share": 1,
"write": 1
}
],
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View File

@ -1,354 +0,0 @@
# Copyright (c) 2020, Frappe Technologies and contributors
# For license information, please see license.txt
from json import dumps, loads
import frappe
from frappe import _
from frappe.integrations.utils import create_request_log
from frappe.model.document import Document
from frappe.utils import call_hook_method, fmt_money, get_request_site_address
from erpnext.erpnext_integrations.doctype.mpesa_settings.mpesa_connector import MpesaConnector
from erpnext.erpnext_integrations.doctype.mpesa_settings.mpesa_custom_fields import (
create_custom_pos_fields,
)
from erpnext.erpnext_integrations.utils import create_mode_of_payment
from erpnext.utilities import payment_app_import_guard
class MpesaSettings(Document):
supported_currencies = ["KES"]
def validate_transaction_currency(self, currency):
if currency not in self.supported_currencies:
frappe.throw(
_(
"Please select another payment method. Mpesa does not support transactions in currency '{0}'"
).format(currency)
)
def on_update(self):
with payment_app_import_guard():
from payments.utils import create_payment_gateway
create_custom_pos_fields()
create_payment_gateway(
"Mpesa-" + self.payment_gateway_name,
settings="Mpesa Settings",
controller=self.payment_gateway_name,
)
call_hook_method(
"payment_gateway_enabled", gateway="Mpesa-" + self.payment_gateway_name, payment_channel="Phone"
)
# required to fetch the bank account details from the payment gateway account
frappe.db.commit()
create_mode_of_payment("Mpesa-" + self.payment_gateway_name, payment_type="Phone")
def request_for_payment(self, **kwargs):
args = frappe._dict(kwargs)
request_amounts = self.split_request_amount_according_to_transaction_limit(args)
for i, amount in enumerate(request_amounts):
args.request_amount = amount
if frappe.flags.in_test:
from erpnext.erpnext_integrations.doctype.mpesa_settings.test_mpesa_settings import (
get_payment_request_response_payload,
)
response = frappe._dict(get_payment_request_response_payload(amount))
else:
response = frappe._dict(generate_stk_push(**args))
self.handle_api_response("CheckoutRequestID", args, response)
def split_request_amount_according_to_transaction_limit(self, args):
request_amount = args.request_amount
if request_amount > self.transaction_limit:
# make multiple requests
request_amounts = []
requests_to_be_made = frappe.utils.ceil(
request_amount / self.transaction_limit
) # 480/150 = ceil(3.2) = 4
for i in range(requests_to_be_made):
amount = self.transaction_limit
if i == requests_to_be_made - 1:
amount = request_amount - (
self.transaction_limit * i
) # for 4th request, 480 - (150 * 3) = 30
request_amounts.append(amount)
else:
request_amounts = [request_amount]
return request_amounts
@frappe.whitelist()
def get_account_balance_info(self):
payload = dict(
reference_doctype="Mpesa Settings", reference_docname=self.name, doc_details=vars(self)
)
if frappe.flags.in_test:
from erpnext.erpnext_integrations.doctype.mpesa_settings.test_mpesa_settings import (
get_test_account_balance_response,
)
response = frappe._dict(get_test_account_balance_response())
else:
response = frappe._dict(get_account_balance(payload))
self.handle_api_response("ConversationID", payload, response)
def handle_api_response(self, global_id, request_dict, response):
"""Response received from API calls returns a global identifier for each transaction, this code is returned during the callback."""
# check error response
if getattr(response, "requestId"):
req_name = getattr(response, "requestId")
error = response
else:
# global checkout id used as request name
req_name = getattr(response, global_id)
error = None
if not frappe.db.exists("Integration Request", req_name):
create_request_log(request_dict, "Host", "Mpesa", req_name, error)
if error:
frappe.throw(_(getattr(response, "errorMessage")), title=_("Transaction Error"))
def generate_stk_push(**kwargs):
"""Generate stk push by making a API call to the stk push API."""
args = frappe._dict(kwargs)
try:
callback_url = (
get_request_site_address(True)
+ "/api/method/erpnext.erpnext_integrations.doctype.mpesa_settings.mpesa_settings.verify_transaction"
)
mpesa_settings = frappe.get_doc("Mpesa Settings", args.payment_gateway[6:])
env = "production" if not mpesa_settings.sandbox else "sandbox"
# for sandbox, business shortcode is same as till number
business_shortcode = (
mpesa_settings.business_shortcode if env == "production" else mpesa_settings.till_number
)
connector = MpesaConnector(
env=env,
app_key=mpesa_settings.consumer_key,
app_secret=mpesa_settings.get_password("consumer_secret"),
)
mobile_number = sanitize_mobile_number(args.sender)
response = connector.stk_push(
business_shortcode=business_shortcode,
amount=args.request_amount,
passcode=mpesa_settings.get_password("online_passkey"),
callback_url=callback_url,
reference_code=mpesa_settings.till_number,
phone_number=mobile_number,
description="POS Payment",
)
return response
except Exception:
frappe.log_error("Mpesa Express Transaction Error")
frappe.throw(
_("Issue detected with Mpesa configuration, check the error logs for more details"),
title=_("Mpesa Express Error"),
)
def sanitize_mobile_number(number):
"""Add country code and strip leading zeroes from the phone number."""
return "254" + str(number).lstrip("0")
@frappe.whitelist(allow_guest=True)
def verify_transaction(**kwargs):
"""Verify the transaction result received via callback from stk."""
transaction_response = frappe._dict(kwargs["Body"]["stkCallback"])
checkout_id = getattr(transaction_response, "CheckoutRequestID", "")
if not isinstance(checkout_id, str):
frappe.throw(_("Invalid Checkout Request ID"))
integration_request = frappe.get_doc("Integration Request", checkout_id)
transaction_data = frappe._dict(loads(integration_request.data))
total_paid = 0 # for multiple integration request made against a pos invoice
success = False # for reporting successfull callback to point of sale ui
if transaction_response["ResultCode"] == 0:
if integration_request.reference_doctype and integration_request.reference_docname:
try:
item_response = transaction_response["CallbackMetadata"]["Item"]
amount = fetch_param_value(item_response, "Amount", "Name")
mpesa_receipt = fetch_param_value(item_response, "MpesaReceiptNumber", "Name")
pr = frappe.get_doc(
integration_request.reference_doctype, integration_request.reference_docname
)
mpesa_receipts, completed_payments = get_completed_integration_requests_info(
integration_request.reference_doctype, integration_request.reference_docname, checkout_id
)
total_paid = amount + sum(completed_payments)
mpesa_receipts = ", ".join(mpesa_receipts + [mpesa_receipt])
if total_paid >= pr.grand_total:
pr.run_method("on_payment_authorized", "Completed")
success = True
frappe.db.set_value("POS Invoice", pr.reference_name, "mpesa_receipt_number", mpesa_receipts)
integration_request.handle_success(transaction_response)
except Exception:
integration_request.handle_failure(transaction_response)
frappe.log_error("Mpesa: Failed to verify transaction")
else:
integration_request.handle_failure(transaction_response)
frappe.publish_realtime(
event="process_phone_payment",
doctype="POS Invoice",
docname=transaction_data.payment_reference,
user=integration_request.owner,
message={
"amount": total_paid,
"success": success,
"failure_message": transaction_response["ResultDesc"]
if transaction_response["ResultCode"] != 0
else "",
},
)
def get_completed_integration_requests_info(reference_doctype, reference_docname, checkout_id):
output_of_other_completed_requests = frappe.get_all(
"Integration Request",
filters={
"name": ["!=", checkout_id],
"reference_doctype": reference_doctype,
"reference_docname": reference_docname,
"status": "Completed",
},
pluck="output",
)
mpesa_receipts, completed_payments = [], []
for out in output_of_other_completed_requests:
out = frappe._dict(loads(out))
item_response = out["CallbackMetadata"]["Item"]
completed_amount = fetch_param_value(item_response, "Amount", "Name")
completed_mpesa_receipt = fetch_param_value(item_response, "MpesaReceiptNumber", "Name")
completed_payments.append(completed_amount)
mpesa_receipts.append(completed_mpesa_receipt)
return mpesa_receipts, completed_payments
def get_account_balance(request_payload):
"""Call account balance API to send the request to the Mpesa Servers."""
try:
mpesa_settings = frappe.get_doc("Mpesa Settings", request_payload.get("reference_docname"))
env = "production" if not mpesa_settings.sandbox else "sandbox"
connector = MpesaConnector(
env=env,
app_key=mpesa_settings.consumer_key,
app_secret=mpesa_settings.get_password("consumer_secret"),
)
callback_url = (
get_request_site_address(True)
+ "/api/method/erpnext.erpnext_integrations.doctype.mpesa_settings.mpesa_settings.process_balance_info"
)
response = connector.get_balance(
mpesa_settings.initiator_name,
mpesa_settings.security_credential,
mpesa_settings.till_number,
4,
mpesa_settings.name,
callback_url,
callback_url,
)
return response
except Exception:
frappe.log_error("Mpesa: Failed to get account balance")
frappe.throw(_("Please check your configuration and try again"), title=_("Error"))
@frappe.whitelist(allow_guest=True)
def process_balance_info(**kwargs):
"""Process and store account balance information received via callback from the account balance API call."""
account_balance_response = frappe._dict(kwargs["Result"])
conversation_id = getattr(account_balance_response, "ConversationID", "")
if not isinstance(conversation_id, str):
frappe.throw(_("Invalid Conversation ID"))
request = frappe.get_doc("Integration Request", conversation_id)
if request.status == "Completed":
return
transaction_data = frappe._dict(loads(request.data))
if account_balance_response["ResultCode"] == 0:
try:
result_params = account_balance_response["ResultParameters"]["ResultParameter"]
balance_info = fetch_param_value(result_params, "AccountBalance", "Key")
balance_info = format_string_to_json(balance_info)
ref_doc = frappe.get_doc(transaction_data.reference_doctype, transaction_data.reference_docname)
ref_doc.db_set("account_balance", balance_info)
request.handle_success(account_balance_response)
frappe.publish_realtime(
"refresh_mpesa_dashboard",
doctype="Mpesa Settings",
docname=transaction_data.reference_docname,
user=transaction_data.owner,
)
except Exception:
request.handle_failure(account_balance_response)
frappe.log_error(
title="Mpesa Account Balance Processing Error", message=account_balance_response
)
else:
request.handle_failure(account_balance_response)
def format_string_to_json(balance_info):
"""
Format string to json.
e.g: '''Working Account|KES|481000.00|481000.00|0.00|0.00'''
=> {'Working Account': {'current_balance': '481000.00',
'available_balance': '481000.00',
'reserved_balance': '0.00',
'uncleared_balance': '0.00'}}
"""
balance_dict = frappe._dict()
for account_info in balance_info.split("&"):
account_info = account_info.split("|")
balance_dict[account_info[0]] = dict(
current_balance=fmt_money(account_info[2], currency="KES"),
available_balance=fmt_money(account_info[3], currency="KES"),
reserved_balance=fmt_money(account_info[4], currency="KES"),
uncleared_balance=fmt_money(account_info[5], currency="KES"),
)
return dumps(balance_dict)
def fetch_param_value(response, key, key_field):
"""Fetch the specified key from list of dictionary. Key is identified via the key field."""
for param in response:
if param[key_field] == key:
return param["Value"]

View File

@ -1,361 +0,0 @@
# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and Contributors
# See license.txt
import unittest
from json import dumps
import frappe
from erpnext.accounts.doctype.pos_invoice.test_pos_invoice import create_pos_invoice
from erpnext.erpnext_integrations.doctype.mpesa_settings.mpesa_settings import (
process_balance_info,
verify_transaction,
)
from erpnext.erpnext_integrations.utils import create_mode_of_payment
class TestMpesaSettings(unittest.TestCase):
def setUp(self):
# create payment gateway in setup
create_mpesa_settings(payment_gateway_name="_Test")
create_mpesa_settings(payment_gateway_name="_Account Balance")
create_mpesa_settings(payment_gateway_name="Payment")
def tearDown(self):
frappe.db.sql("delete from `tabMpesa Settings`")
frappe.db.sql("delete from `tabIntegration Request` where integration_request_service = 'Mpesa'")
def test_creation_of_payment_gateway(self):
mode_of_payment = create_mode_of_payment("Mpesa-_Test", payment_type="Phone")
self.assertTrue(frappe.db.exists("Payment Gateway Account", {"payment_gateway": "Mpesa-_Test"}))
self.assertTrue(mode_of_payment.name)
self.assertEqual(mode_of_payment.type, "Phone")
def test_processing_of_account_balance(self):
mpesa_doc = create_mpesa_settings(payment_gateway_name="_Account Balance")
mpesa_doc.get_account_balance_info()
callback_response = get_account_balance_callback_payload()
process_balance_info(**callback_response)
integration_request = frappe.get_doc("Integration Request", "AG_20200927_00007cdb1f9fb6494315")
# test integration request creation and successful update of the status on receiving callback response
self.assertTrue(integration_request)
self.assertEqual(integration_request.status, "Completed")
# test formatting of account balance received as string to json with appropriate currency symbol
mpesa_doc.reload()
self.assertEqual(
mpesa_doc.account_balance,
dumps(
{
"Working Account": {
"current_balance": "Sh 481,000.00",
"available_balance": "Sh 481,000.00",
"reserved_balance": "Sh 0.00",
"uncleared_balance": "Sh 0.00",
}
}
),
)
integration_request.delete()
def test_processing_of_callback_payload(self):
mpesa_account = frappe.db.get_value(
"Payment Gateway Account", {"payment_gateway": "Mpesa-Payment"}, "payment_account"
)
frappe.db.set_value("Account", mpesa_account, "account_currency", "KES")
frappe.db.set_value("Customer", "_Test Customer", "default_currency", "KES")
pos_invoice = create_pos_invoice(do_not_submit=1)
pos_invoice.append(
"payments", {"mode_of_payment": "Mpesa-Payment", "account": mpesa_account, "amount": 500}
)
pos_invoice.contact_mobile = "093456543894"
pos_invoice.currency = "KES"
pos_invoice.save()
pr = pos_invoice.create_payment_request()
# test payment request creation
self.assertEqual(pr.payment_gateway, "Mpesa-Payment")
# submitting payment request creates integration requests with random id
integration_req_ids = frappe.get_all(
"Integration Request",
filters={
"reference_doctype": pr.doctype,
"reference_docname": pr.name,
},
pluck="name",
)
callback_response = get_payment_callback_payload(
Amount=500, CheckoutRequestID=integration_req_ids[0]
)
verify_transaction(**callback_response)
# test creation of integration request
integration_request = frappe.get_doc("Integration Request", integration_req_ids[0])
# test integration request creation and successful update of the status on receiving callback response
self.assertTrue(integration_request)
self.assertEqual(integration_request.status, "Completed")
pos_invoice.reload()
integration_request.reload()
self.assertEqual(pos_invoice.mpesa_receipt_number, "LGR7OWQX0R")
self.assertEqual(integration_request.status, "Completed")
frappe.db.set_value("Customer", "_Test Customer", "default_currency", "")
integration_request.delete()
pr.reload()
pr.cancel()
pr.delete()
pos_invoice.delete()
def test_processing_of_multiple_callback_payload(self):
mpesa_account = frappe.db.get_value(
"Payment Gateway Account", {"payment_gateway": "Mpesa-Payment"}, "payment_account"
)
frappe.db.set_value("Account", mpesa_account, "account_currency", "KES")
frappe.db.set_value("Mpesa Settings", "Payment", "transaction_limit", "500")
frappe.db.set_value("Customer", "_Test Customer", "default_currency", "KES")
pos_invoice = create_pos_invoice(do_not_submit=1)
pos_invoice.append(
"payments", {"mode_of_payment": "Mpesa-Payment", "account": mpesa_account, "amount": 1000}
)
pos_invoice.contact_mobile = "093456543894"
pos_invoice.currency = "KES"
pos_invoice.save()
pr = pos_invoice.create_payment_request()
# test payment request creation
self.assertEqual(pr.payment_gateway, "Mpesa-Payment")
# submitting payment request creates integration requests with random id
integration_req_ids = frappe.get_all(
"Integration Request",
filters={
"reference_doctype": pr.doctype,
"reference_docname": pr.name,
},
pluck="name",
)
# create random receipt nos and send it as response to callback handler
mpesa_receipt_numbers = [frappe.utils.random_string(5) for d in integration_req_ids]
integration_requests = []
for i in range(len(integration_req_ids)):
callback_response = get_payment_callback_payload(
Amount=500,
CheckoutRequestID=integration_req_ids[i],
MpesaReceiptNumber=mpesa_receipt_numbers[i],
)
# handle response manually
verify_transaction(**callback_response)
# test completion of integration request
integration_request = frappe.get_doc("Integration Request", integration_req_ids[i])
self.assertEqual(integration_request.status, "Completed")
integration_requests.append(integration_request)
# check receipt number once all the integration requests are completed
pos_invoice.reload()
self.assertEqual(pos_invoice.mpesa_receipt_number, ", ".join(mpesa_receipt_numbers))
frappe.db.set_value("Customer", "_Test Customer", "default_currency", "")
[d.delete() for d in integration_requests]
pr.reload()
pr.cancel()
pr.delete()
pos_invoice.delete()
def test_processing_of_only_one_succes_callback_payload(self):
mpesa_account = frappe.db.get_value(
"Payment Gateway Account", {"payment_gateway": "Mpesa-Payment"}, "payment_account"
)
frappe.db.set_value("Account", mpesa_account, "account_currency", "KES")
frappe.db.set_value("Mpesa Settings", "Payment", "transaction_limit", "500")
frappe.db.set_value("Customer", "_Test Customer", "default_currency", "KES")
pos_invoice = create_pos_invoice(do_not_submit=1)
pos_invoice.append(
"payments", {"mode_of_payment": "Mpesa-Payment", "account": mpesa_account, "amount": 1000}
)
pos_invoice.contact_mobile = "093456543894"
pos_invoice.currency = "KES"
pos_invoice.save()
pr = pos_invoice.create_payment_request()
# test payment request creation
self.assertEqual(pr.payment_gateway, "Mpesa-Payment")
# submitting payment request creates integration requests with random id
integration_req_ids = frappe.get_all(
"Integration Request",
filters={
"reference_doctype": pr.doctype,
"reference_docname": pr.name,
},
pluck="name",
)
# create random receipt nos and send it as response to callback handler
mpesa_receipt_numbers = [frappe.utils.random_string(5) for d in integration_req_ids]
callback_response = get_payment_callback_payload(
Amount=500,
CheckoutRequestID=integration_req_ids[0],
MpesaReceiptNumber=mpesa_receipt_numbers[0],
)
# handle response manually
verify_transaction(**callback_response)
# test completion of integration request
integration_request = frappe.get_doc("Integration Request", integration_req_ids[0])
self.assertEqual(integration_request.status, "Completed")
# now one request is completed
# second integration request fails
# now retrying payment request should make only one integration request again
pr = pos_invoice.create_payment_request()
new_integration_req_ids = frappe.get_all(
"Integration Request",
filters={
"reference_doctype": pr.doctype,
"reference_docname": pr.name,
"name": ["not in", integration_req_ids],
},
pluck="name",
)
self.assertEqual(len(new_integration_req_ids), 1)
frappe.db.set_value("Customer", "_Test Customer", "default_currency", "")
frappe.db.sql("delete from `tabIntegration Request` where integration_request_service = 'Mpesa'")
pr.reload()
pr.cancel()
pr.delete()
pos_invoice.delete()
def create_mpesa_settings(payment_gateway_name="Express"):
if frappe.db.exists("Mpesa Settings", payment_gateway_name):
return frappe.get_doc("Mpesa Settings", payment_gateway_name)
doc = frappe.get_doc(
dict( # nosec
doctype="Mpesa Settings",
sandbox=1,
payment_gateway_name=payment_gateway_name,
consumer_key="5sMu9LVI1oS3oBGPJfh3JyvLHwZOdTKn",
consumer_secret="VI1oS3oBGPJfh3JyvLHw",
online_passkey="LVI1oS3oBGPJfh3JyvLHwZOd",
till_number="174379",
)
)
doc.insert(ignore_permissions=True)
return doc
def get_test_account_balance_response():
"""Response received after calling the account balance API."""
return {
"ResultType": 0,
"ResultCode": 0,
"ResultDesc": "The service request has been accepted successfully.",
"OriginatorConversationID": "10816-694520-2",
"ConversationID": "AG_20200927_00007cdb1f9fb6494315",
"TransactionID": "LGR0000000",
"ResultParameters": {
"ResultParameter": [
{"Key": "ReceiptNo", "Value": "LGR919G2AV"},
{"Key": "Conversation ID", "Value": "AG_20170727_00004492b1b6d0078fbe"},
{"Key": "FinalisedTime", "Value": 20170727101415},
{"Key": "Amount", "Value": 10},
{"Key": "TransactionStatus", "Value": "Completed"},
{"Key": "ReasonType", "Value": "Salary Payment via API"},
{"Key": "TransactionReason"},
{"Key": "DebitPartyCharges", "Value": "Fee For B2C Payment|KES|33.00"},
{"Key": "DebitAccountType", "Value": "Utility Account"},
{"Key": "InitiatedTime", "Value": 20170727101415},
{"Key": "Originator Conversation ID", "Value": "19455-773836-1"},
{"Key": "CreditPartyName", "Value": "254708374149 - John Doe"},
{"Key": "DebitPartyName", "Value": "600134 - Safaricom157"},
]
},
"ReferenceData": {"ReferenceItem": {"Key": "Occasion", "Value": "aaaa"}},
}
def get_payment_request_response_payload(Amount=500):
"""Response received after successfully calling the stk push process request API."""
CheckoutRequestID = frappe.utils.random_string(10)
return {
"MerchantRequestID": "8071-27184008-1",
"CheckoutRequestID": CheckoutRequestID,
"ResultCode": 0,
"ResultDesc": "The service request is processed successfully.",
"CallbackMetadata": {
"Item": [
{"Name": "Amount", "Value": Amount},
{"Name": "MpesaReceiptNumber", "Value": "LGR7OWQX0R"},
{"Name": "TransactionDate", "Value": 20201006113336},
{"Name": "PhoneNumber", "Value": 254723575670},
]
},
}
def get_payment_callback_payload(
Amount=500, CheckoutRequestID="ws_CO_061020201133231972", MpesaReceiptNumber="LGR7OWQX0R"
):
"""Response received from the server as callback after calling the stkpush process request API."""
return {
"Body": {
"stkCallback": {
"MerchantRequestID": "19465-780693-1",
"CheckoutRequestID": CheckoutRequestID,
"ResultCode": 0,
"ResultDesc": "The service request is processed successfully.",
"CallbackMetadata": {
"Item": [
{"Name": "Amount", "Value": Amount},
{"Name": "MpesaReceiptNumber", "Value": MpesaReceiptNumber},
{"Name": "Balance"},
{"Name": "TransactionDate", "Value": 20170727154800},
{"Name": "PhoneNumber", "Value": 254721566839},
]
},
}
}
}
def get_account_balance_callback_payload():
"""Response received from the server as callback after calling the account balance API."""
return {
"Result": {
"ResultType": 0,
"ResultCode": 0,
"ResultDesc": "The service request is processed successfully.",
"OriginatorConversationID": "16470-170099139-1",
"ConversationID": "AG_20200927_00007cdb1f9fb6494315",
"TransactionID": "OIR0000000",
"ResultParameters": {
"ResultParameter": [
{"Key": "AccountBalance", "Value": "Working Account|KES|481000.00|481000.00|0.00|0.00"},
{"Key": "BOCompletedTime", "Value": 20200927234123},
]
},
"ReferenceData": {
"ReferenceItem": {
"Key": "QueueTimeoutURL",
"Value": "https://internalsandbox.safaricom.co.ke/mpesa/abresults/v1/submit",
}
},
}
}

View File

@ -6,8 +6,6 @@ from urllib.parse import urlparse
import frappe
from frappe import _
from erpnext import get_default_company
def validate_webhooks_request(doctype, hmac_key, secret_key="secret"):
def innerfn(fn):
@ -47,35 +45,6 @@ def get_webhook_address(connector_name, method, exclude_uri=False, force_https=F
return server_url
def create_mode_of_payment(gateway, payment_type="General"):
payment_gateway_account = frappe.db.get_value(
"Payment Gateway Account", {"payment_gateway": gateway}, ["payment_account"]
)
mode_of_payment = frappe.db.exists("Mode of Payment", gateway)
if not mode_of_payment and payment_gateway_account:
mode_of_payment = frappe.get_doc(
{
"doctype": "Mode of Payment",
"mode_of_payment": gateway,
"enabled": 1,
"type": payment_type,
"accounts": [
{
"doctype": "Mode of Payment Account",
"company": get_default_company(),
"default_account": payment_gateway_account,
}
],
}
)
mode_of_payment.insert(ignore_permissions=True)
return mode_of_payment
elif mode_of_payment:
return frappe.get_doc("Mode of Payment", mode_of_payment)
def get_tracking_url(carrier, tracking_number):
# Return the formatted Tracking URL.
tracking_url = ""