Merge pull request #37182 from s-aga-r/MOVE-PAYMENTS-GATEWAYS

refactor!: move payment gateways to the payments app
This commit is contained in:
s-aga-r 2023-10-14 16:48:11 +05:30 committed by GitHub
commit afef9dc8df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 10 additions and 2305 deletions

View File

@ -20,7 +20,6 @@ from erpnext.accounts.doctype.payment_entry.payment_entry import (
from erpnext.accounts.doctype.subscription_plan.subscription_plan import get_plan_rate
from erpnext.accounts.party import get_party_account, get_party_bank_account
from erpnext.accounts.utils import get_account_currency
from erpnext.erpnext_integrations.stripe_integration import create_stripe_subscription
from erpnext.utilities import payment_app_import_guard
@ -393,6 +392,9 @@ class PaymentRequest(Document):
def create_subscription(self, payment_provider, gateway_controller, data):
if payment_provider == "stripe":
with payment_app_import_guard():
from payments.payment_gateways.stripe_integration import create_stripe_subscription
return create_stripe_subscription(gateway_controller, data)

View File

@ -1,5 +0,0 @@
// Copyright (c) 2018, Frappe Technologies and contributors
// For license information, please see license.txt
frappe.ui.form.on('GoCardless Mandate', {
});

View File

@ -1,184 +0,0 @@
{
"allow_copy": 0,
"allow_guest_to_view": 0,
"allow_import": 0,
"allow_rename": 0,
"autoname": "field:mandate",
"beta": 0,
"creation": "2018-02-08 11:33:15.721919",
"custom": 0,
"docstatus": 0,
"doctype": "DocType",
"document_type": "",
"editable_grid": 1,
"engine": "InnoDB",
"fields": [
{
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "disabled",
"fieldtype": "Check",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 0,
"in_standard_filter": 0,
"label": "Disabled",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "customer",
"fieldtype": "Link",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 1,
"in_standard_filter": 0,
"label": "Customer",
"length": 0,
"no_copy": 0,
"options": "Customer",
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 1,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "mandate",
"fieldtype": "Data",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 0,
"in_standard_filter": 0,
"label": "Mandate",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 1,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 1,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "gocardless_customer",
"fieldtype": "Data",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 1,
"in_standard_filter": 0,
"label": "GoCardless Customer",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 1,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 1,
"search_index": 0,
"set_only_once": 0,
"unique": 0
}
],
"has_web_view": 0,
"hide_heading": 0,
"hide_toolbar": 0,
"idx": 0,
"image_view": 0,
"in_create": 0,
"is_submittable": 0,
"issingle": 0,
"istable": 0,
"max_attachments": 0,
"modified": "2018-02-11 12:28:03.183095",
"modified_by": "Administrator",
"module": "ERPNext Integrations",
"name": "GoCardless Mandate",
"name_case": "",
"owner": "Administrator",
"permissions": [
{
"amend": 0,
"apply_user_permissions": 0,
"cancel": 0,
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"if_owner": 0,
"import": 0,
"permlevel": 0,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"set_user_permissions": 0,
"share": 1,
"submit": 0,
"write": 1
}
],
"quick_entry": 1,
"read_only": 0,
"read_only_onload": 0,
"show_name_in_global_search": 0,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1,
"track_seen": 0
}

View File

@ -1,9 +0,0 @@
# Copyright (c) 2018, Frappe Technologies and contributors
# For license information, please see license.txt
from frappe.model.document import Document
class GoCardlessMandate(Document):
pass

View File

@ -1,8 +0,0 @@
# Copyright (c) 2018, Frappe Technologies and Contributors
# See license.txt
import unittest
class TestGoCardlessMandate(unittest.TestCase):
pass

View File

@ -1,89 +0,0 @@
# Copyright (c) 2018, Frappe Technologies and contributors
# For license information, please see license.txt
import hashlib
import hmac
import json
import frappe
@frappe.whitelist(allow_guest=True)
def webhooks():
r = frappe.request
if not r:
return
if not authenticate_signature(r):
raise frappe.AuthenticationError
gocardless_events = json.loads(r.get_data()) or []
for event in gocardless_events["events"]:
set_status(event)
return 200
def set_status(event):
resource_type = event.get("resource_type", {})
if resource_type == "mandates":
set_mandate_status(event)
def set_mandate_status(event):
mandates = []
if isinstance(event["links"], (list,)):
for link in event["links"]:
mandates.append(link["mandate"])
else:
mandates.append(event["links"]["mandate"])
if (
event["action"] == "pending_customer_approval"
or event["action"] == "pending_submission"
or event["action"] == "submitted"
or event["action"] == "active"
):
disabled = 0
else:
disabled = 1
for mandate in mandates:
frappe.db.set_value("GoCardless Mandate", mandate, "disabled", disabled)
def authenticate_signature(r):
"""Returns True if the received signature matches the generated signature"""
received_signature = frappe.get_request_header("Webhook-Signature")
if not received_signature:
return False
for key in get_webhook_keys():
computed_signature = hmac.new(key.encode("utf-8"), r.get_data(), hashlib.sha256).hexdigest()
if hmac.compare_digest(str(received_signature), computed_signature):
return True
return False
def get_webhook_keys():
def _get_webhook_keys():
webhook_keys = [
d.webhooks_secret
for d in frappe.get_all(
"GoCardless Settings",
fields=["webhooks_secret"],
)
if d.webhooks_secret
]
return webhook_keys
return frappe.cache().get_value("gocardless_webhooks_secret", _get_webhook_keys)
def clear_cache():
frappe.cache().delete_value("gocardless_webhooks_secret")

View File

@ -1,8 +0,0 @@
// Copyright (c) 2018, Frappe Technologies and contributors
// For license information, please see license.txt
frappe.ui.form.on('GoCardless Settings', {
refresh: function(frm) {
erpnext.utils.check_payments_app();
}
});

View File

@ -1,211 +0,0 @@
{
"allow_copy": 0,
"allow_guest_to_view": 0,
"allow_import": 0,
"allow_rename": 0,
"autoname": "field:gateway_name",
"beta": 0,
"creation": "2018-02-06 16:11:10.028249",
"custom": 0,
"docstatus": 0,
"doctype": "DocType",
"document_type": "",
"editable_grid": 1,
"engine": "InnoDB",
"fields": [
{
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "gateway_name",
"fieldtype": "Data",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 1,
"in_standard_filter": 0,
"label": "Payment Gateway Name",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 1,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "section_break_2",
"fieldtype": "Section Break",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 0,
"in_standard_filter": 0,
"length": 0,
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "access_token",
"fieldtype": "Data",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 1,
"in_standard_filter": 0,
"label": "Access Token",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 1,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "webhooks_secret",
"fieldtype": "Data",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 0,
"in_standard_filter": 0,
"label": "Webhooks Secret",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
{
"allow_bulk_edit": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"columns": 0,
"fieldname": "use_sandbox",
"fieldtype": "Check",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_global_search": 0,
"in_list_view": 0,
"in_standard_filter": 0,
"label": "Use Sandbox",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"remember_last_selected_value": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
}
],
"has_web_view": 0,
"hide_heading": 0,
"hide_toolbar": 0,
"idx": 0,
"image_view": 0,
"in_create": 0,
"is_submittable": 0,
"issingle": 0,
"istable": 0,
"max_attachments": 0,
"modified": "2022-02-12 14:18:47.209114",
"modified_by": "Administrator",
"module": "ERPNext Integrations",
"name": "GoCardless Settings",
"name_case": "",
"owner": "Administrator",
"permissions": [
{
"amend": 0,
"apply_user_permissions": 0,
"cancel": 0,
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"if_owner": 0,
"import": 0,
"permlevel": 0,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"set_user_permissions": 0,
"share": 1,
"submit": 0,
"write": 1
}
],
"read_only": 0,
"read_only_onload": 0,
"show_name_in_global_search": 0,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1,
"track_seen": 0
}

View File

@ -1,220 +0,0 @@
# Copyright (c) 2018, Frappe Technologies and contributors
# For license information, please see license.txt
from urllib.parse import urlencode
import frappe
import gocardless_pro
from frappe import _
from frappe.integrations.utils import create_request_log
from frappe.model.document import Document
from frappe.utils import call_hook_method, cint, flt, get_url
from erpnext.utilities import payment_app_import_guard
class GoCardlessSettings(Document):
supported_currencies = ["EUR", "DKK", "GBP", "SEK", "AUD", "NZD", "CAD", "USD"]
def validate(self):
self.initialize_client()
def initialize_client(self):
self.environment = self.get_environment()
try:
self.client = gocardless_pro.Client(
access_token=self.access_token, environment=self.environment
)
return self.client
except Exception as e:
frappe.throw(e)
def on_update(self):
with payment_app_import_guard():
from payments.utils import create_payment_gateway
create_payment_gateway(
"GoCardless-" + self.gateway_name, settings="GoCardLess Settings", controller=self.gateway_name
)
call_hook_method("payment_gateway_enabled", gateway="GoCardless-" + self.gateway_name)
def on_payment_request_submission(self, data):
if data.reference_doctype != "Fees":
customer_data = frappe.db.get_value(
data.reference_doctype, data.reference_name, ["company", "customer_name"], as_dict=1
)
data = {
"amount": flt(data.grand_total, data.precision("grand_total")),
"title": customer_data.company.encode("utf-8"),
"description": data.subject.encode("utf-8"),
"reference_doctype": data.doctype,
"reference_docname": data.name,
"payer_email": data.email_to or frappe.session.user,
"payer_name": customer_data.customer_name,
"order_id": data.name,
"currency": data.currency,
}
valid_mandate = self.check_mandate_validity(data)
if valid_mandate is not None:
data.update(valid_mandate)
self.create_payment_request(data)
return False
else:
return True
def check_mandate_validity(self, data):
if frappe.db.exists("GoCardless Mandate", dict(customer=data.get("payer_name"), disabled=0)):
registered_mandate = frappe.db.get_value(
"GoCardless Mandate", dict(customer=data.get("payer_name"), disabled=0), "mandate"
)
self.initialize_client()
mandate = self.client.mandates.get(registered_mandate)
if (
mandate.status == "pending_customer_approval"
or mandate.status == "pending_submission"
or mandate.status == "submitted"
or mandate.status == "active"
):
return {"mandate": registered_mandate}
else:
return None
else:
return None
def get_environment(self):
if self.use_sandbox:
return "sandbox"
else:
return "live"
def validate_transaction_currency(self, currency):
if currency not in self.supported_currencies:
frappe.throw(
_(
"Please select another payment method. Go Cardless does not support transactions in currency '{0}'"
).format(currency)
)
def get_payment_url(self, **kwargs):
return get_url("./integrations/gocardless_checkout?{0}".format(urlencode(kwargs)))
def create_payment_request(self, data):
self.data = frappe._dict(data)
try:
self.integration_request = create_request_log(self.data, "Host", "GoCardless")
return self.create_charge_on_gocardless()
except Exception:
frappe.log_error("Gocardless payment reqeust failed")
return {
"redirect_to": frappe.redirect_to_message(
_("Server Error"),
_(
"There seems to be an issue with the server's GoCardless configuration. Don't worry, in case of failure, the amount will get refunded to your account."
),
),
"status": 401,
}
def create_charge_on_gocardless(self):
redirect_to = self.data.get("redirect_to") or None
redirect_message = self.data.get("redirect_message") or None
reference_doc = frappe.get_doc(
self.data.get("reference_doctype"), self.data.get("reference_docname")
)
self.initialize_client()
try:
payment = self.client.payments.create(
params={
"amount": cint(reference_doc.grand_total * 100),
"currency": reference_doc.currency,
"links": {"mandate": self.data.get("mandate")},
"metadata": {
"reference_doctype": reference_doc.doctype,
"reference_document": reference_doc.name,
},
},
headers={
"Idempotency-Key": self.data.get("reference_docname"),
},
)
if (
payment.status == "pending_submission"
or payment.status == "pending_customer_approval"
or payment.status == "submitted"
):
self.integration_request.db_set("status", "Authorized", update_modified=False)
self.flags.status_changed_to = "Completed"
self.integration_request.db_set("output", payment.status, update_modified=False)
elif payment.status == "confirmed" or payment.status == "paid_out":
self.integration_request.db_set("status", "Completed", update_modified=False)
self.flags.status_changed_to = "Completed"
self.integration_request.db_set("output", payment.status, update_modified=False)
elif (
payment.status == "cancelled"
or payment.status == "customer_approval_denied"
or payment.status == "charged_back"
):
self.integration_request.db_set("status", "Cancelled", update_modified=False)
frappe.log_error("Gocardless payment cancelled")
self.integration_request.db_set("error", payment.status, update_modified=False)
else:
self.integration_request.db_set("status", "Failed", update_modified=False)
frappe.log_error("Gocardless payment failed")
self.integration_request.db_set("error", payment.status, update_modified=False)
except Exception as e:
frappe.log_error("GoCardless Payment Error")
if self.flags.status_changed_to == "Completed":
status = "Completed"
if "reference_doctype" in self.data and "reference_docname" in self.data:
custom_redirect_to = None
try:
custom_redirect_to = frappe.get_doc(
self.data.get("reference_doctype"), self.data.get("reference_docname")
).run_method("on_payment_authorized", self.flags.status_changed_to)
except Exception:
frappe.log_error("Gocardless redirect failed")
if custom_redirect_to:
redirect_to = custom_redirect_to
redirect_url = redirect_to
else:
status = "Error"
redirect_url = "payment-failed"
if redirect_message:
redirect_url += "&" + urlencode({"redirect_message": redirect_message})
redirect_url = get_url(redirect_url)
return {"redirect_to": redirect_url, "status": status}
def get_gateway_controller(doc):
payment_request = frappe.get_doc("Payment Request", doc)
gateway_controller = frappe.db.get_value(
"Payment Gateway", payment_request.payment_gateway, "gateway_controller"
)
return gateway_controller
def gocardless_initialization(doc):
gateway_controller = get_gateway_controller(doc)
settings = frappe.get_doc("GoCardless Settings", gateway_controller)
client = settings.initialize_client()
return client

View File

@ -1,8 +0,0 @@
# Copyright (c) 2018, Frappe Technologies and Contributors
# See license.txt
import unittest
class TestGoCardlessSettings(unittest.TestCase):
pass

View File

@ -1,28 +0,0 @@
{% if not jQuery.isEmptyObject(data) %}
<h5 style="margin-top: 20px;"> {{ __("Balance Details") }} </h5>
<table class="table table-bordered small">
<thead>
<tr>
<th style="width: 20%">{{ __("Account Type") }}</th>
<th style="width: 20%" class="text-right">{{ __("Current Balance") }}</th>
<th style="width: 20%" class="text-right">{{ __("Available Balance") }}</th>
<th style="width: 20%" class="text-right">{{ __("Reserved Balance") }}</th>
<th style="width: 20%" class="text-right">{{ __("Uncleared Balance") }}</th>
</tr>
</thead>
<tbody>
{% for(const [key, value] of Object.entries(data)) { %}
<tr>
<td> {%= key %} </td>
<td class="text-right"> {%= value["current_balance"] %} </td>
<td class="text-right"> {%= value["available_balance"] %} </td>
<td class="text-right"> {%= value["reserved_balance"] %} </td>
<td class="text-right"> {%= value["uncleared_balance"] %} </td>
</tr>
{% } %}
</tbody>
</table>
{% else %}
<p style="margin-top: 30px;"> Account Balance Information Not Available. </p>
{% endif %}

View File

@ -1,149 +0,0 @@
import base64
import datetime
import requests
from requests.auth import HTTPBasicAuth
class MpesaConnector:
def __init__(
self,
env="sandbox",
app_key=None,
app_secret=None,
sandbox_url="https://sandbox.safaricom.co.ke",
live_url="https://api.safaricom.co.ke",
):
"""Setup configuration for Mpesa connector and generate new access token."""
self.env = env
self.app_key = app_key
self.app_secret = app_secret
if env == "sandbox":
self.base_url = sandbox_url
else:
self.base_url = live_url
self.authenticate()
def authenticate(self):
"""
This method is used to fetch the access token required by Mpesa.
Returns:
access_token (str): This token is to be used with the Bearer header for further API calls to Mpesa.
"""
authenticate_uri = "/oauth/v1/generate?grant_type=client_credentials"
authenticate_url = "{0}{1}".format(self.base_url, authenticate_uri)
r = requests.get(authenticate_url, auth=HTTPBasicAuth(self.app_key, self.app_secret))
self.authentication_token = r.json()["access_token"]
return r.json()["access_token"]
def get_balance(
self,
initiator=None,
security_credential=None,
party_a=None,
identifier_type=None,
remarks=None,
queue_timeout_url=None,
result_url=None,
):
"""
This method uses Mpesa's Account Balance API to to enquire the balance on a M-Pesa BuyGoods (Till Number).
Args:
initiator (str): Username used to authenticate the transaction.
security_credential (str): Generate from developer portal.
command_id (str): AccountBalance.
party_a (int): Till number being queried.
identifier_type (int): Type of organization receiving the transaction. (MSISDN/Till Number/Organization short code)
remarks (str): Comments that are sent along with the transaction(maximum 100 characters).
queue_timeout_url (str): The url that handles information of timed out transactions.
result_url (str): The url that receives results from M-Pesa api call.
Returns:
OriginatorConverstionID (str): The unique request ID for tracking a transaction.
ConversationID (str): The unique request ID returned by mpesa for each request made
ResponseDescription (str): Response Description message
"""
payload = {
"Initiator": initiator,
"SecurityCredential": security_credential,
"CommandID": "AccountBalance",
"PartyA": party_a,
"IdentifierType": identifier_type,
"Remarks": remarks,
"QueueTimeOutURL": queue_timeout_url,
"ResultURL": result_url,
}
headers = {
"Authorization": "Bearer {0}".format(self.authentication_token),
"Content-Type": "application/json",
}
saf_url = "{0}{1}".format(self.base_url, "/mpesa/accountbalance/v1/query")
r = requests.post(saf_url, headers=headers, json=payload)
return r.json()
def stk_push(
self,
business_shortcode=None,
passcode=None,
amount=None,
callback_url=None,
reference_code=None,
phone_number=None,
description=None,
):
"""
This method uses Mpesa's Express API to initiate online payment on behalf of a customer.
Args:
business_shortcode (int): The short code of the organization.
passcode (str): Get from developer portal
amount (int): The amount being transacted
callback_url (str): A CallBack URL is a valid secure URL that is used to receive notifications from M-Pesa API.
reference_code(str): Account Reference: This is an Alpha-Numeric parameter that is defined by your system as an Identifier of the transaction for CustomerPayBillOnline transaction type.
phone_number(int): The Mobile Number to receive the STK Pin Prompt.
description(str): This is any additional information/comment that can be sent along with the request from your system. MAX 13 characters
Success Response:
CustomerMessage(str): Messages that customers can understand.
CheckoutRequestID(str): This is a global unique identifier of the processed checkout transaction request.
ResponseDescription(str): Describes Success or failure
MerchantRequestID(str): This is a global unique Identifier for any submitted payment request.
ResponseCode(int): 0 means success all others are error codes. e.g.404.001.03
Error Reponse:
requestId(str): This is a unique requestID for the payment request
errorCode(str): This is a predefined code that indicates the reason for request failure.
errorMessage(str): This is a predefined code that indicates the reason for request failure.
"""
time = (
str(datetime.datetime.now()).split(".")[0].replace("-", "").replace(" ", "").replace(":", "")
)
password = "{0}{1}{2}".format(str(business_shortcode), str(passcode), time)
encoded = base64.b64encode(bytes(password, encoding="utf8"))
payload = {
"BusinessShortCode": business_shortcode,
"Password": encoded.decode("utf-8"),
"Timestamp": time,
"Amount": amount,
"PartyA": int(phone_number),
"PartyB": reference_code,
"PhoneNumber": int(phone_number),
"CallBackURL": callback_url,
"AccountReference": reference_code,
"TransactionDesc": description,
"TransactionType": "CustomerPayBillOnline"
if self.env == "sandbox"
else "CustomerBuyGoodsOnline",
}
headers = {
"Authorization": "Bearer {0}".format(self.authentication_token),
"Content-Type": "application/json",
}
saf_url = "{0}{1}".format(self.base_url, "/mpesa/stkpush/v1/processrequest")
r = requests.post(saf_url, headers=headers, json=payload)
return r.json()

View File

@ -1,56 +0,0 @@
import frappe
from frappe.custom.doctype.custom_field.custom_field import create_custom_fields
def create_custom_pos_fields():
"""Create custom fields corresponding to POS Settings and POS Invoice."""
pos_field = {
"POS Invoice": [
{
"fieldname": "request_for_payment",
"label": "Request for Payment",
"fieldtype": "Button",
"hidden": 1,
"insert_after": "contact_email",
},
{
"fieldname": "mpesa_receipt_number",
"label": "Mpesa Receipt Number",
"fieldtype": "Data",
"read_only": 1,
"insert_after": "company",
},
]
}
if not frappe.get_meta("POS Invoice").has_field("request_for_payment"):
create_custom_fields(pos_field)
record_dict = [
{
"doctype": "POS Field",
"fieldname": "contact_mobile",
"label": "Mobile No",
"fieldtype": "Data",
"options": "Phone",
"parenttype": "POS Settings",
"parent": "POS Settings",
"parentfield": "invoice_fields",
},
{
"doctype": "POS Field",
"fieldname": "request_for_payment",
"label": "Request for Payment",
"fieldtype": "Button",
"parenttype": "POS Settings",
"parent": "POS Settings",
"parentfield": "invoice_fields",
},
]
create_pos_settings(record_dict)
def create_pos_settings(record_dict):
for record in record_dict:
if frappe.db.exists("POS Field", {"fieldname": record.get("fieldname")}):
continue
frappe.get_doc(record).insert()

View File

@ -1,39 +0,0 @@
// Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and contributors
// For license information, please see license.txt
frappe.ui.form.on('Mpesa Settings', {
onload_post_render: function(frm) {
frm.events.setup_account_balance_html(frm);
},
refresh: function(frm) {
erpnext.utils.check_payments_app();
frappe.realtime.on("refresh_mpesa_dashboard", function(){
frm.reload_doc();
frm.events.setup_account_balance_html(frm);
});
},
get_account_balance: function(frm) {
if (!frm.doc.initiator_name && !frm.doc.security_credential) {
frappe.throw(__("Please set the initiator name and the security credential"));
}
frappe.call({
method: "get_account_balance_info",
doc: frm.doc
});
},
setup_account_balance_html: function(frm) {
if (!frm.doc.account_balance) return;
$("div").remove(".form-dashboard-section.custom");
frm.dashboard.add_section(
frappe.render_template('account_balance', {
data: JSON.parse(frm.doc.account_balance)
})
);
frm.dashboard.show();
}
});

View File

@ -1,152 +0,0 @@
{
"actions": [],
"autoname": "field:payment_gateway_name",
"creation": "2020-09-10 13:21:27.398088",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"payment_gateway_name",
"consumer_key",
"consumer_secret",
"initiator_name",
"till_number",
"transaction_limit",
"sandbox",
"column_break_4",
"business_shortcode",
"online_passkey",
"security_credential",
"get_account_balance",
"account_balance"
],
"fields": [
{
"fieldname": "payment_gateway_name",
"fieldtype": "Data",
"label": "Payment Gateway Name",
"reqd": 1,
"unique": 1
},
{
"fieldname": "consumer_key",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Consumer Key",
"reqd": 1
},
{
"fieldname": "consumer_secret",
"fieldtype": "Password",
"in_list_view": 1,
"label": "Consumer Secret",
"reqd": 1
},
{
"fieldname": "till_number",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Till Number",
"reqd": 1
},
{
"default": "0",
"fieldname": "sandbox",
"fieldtype": "Check",
"label": "Sandbox"
},
{
"fieldname": "column_break_4",
"fieldtype": "Column Break"
},
{
"fieldname": "online_passkey",
"fieldtype": "Password",
"label": " Online PassKey",
"reqd": 1
},
{
"fieldname": "initiator_name",
"fieldtype": "Data",
"label": "Initiator Name"
},
{
"fieldname": "security_credential",
"fieldtype": "Small Text",
"label": "Security Credential"
},
{
"fieldname": "account_balance",
"fieldtype": "Long Text",
"hidden": 1,
"label": "Account Balance",
"read_only": 1
},
{
"fieldname": "get_account_balance",
"fieldtype": "Button",
"label": "Get Account Balance"
},
{
"depends_on": "eval:(doc.sandbox==0)",
"fieldname": "business_shortcode",
"fieldtype": "Data",
"label": "Business Shortcode",
"mandatory_depends_on": "eval:(doc.sandbox==0)"
},
{
"default": "150000",
"fieldname": "transaction_limit",
"fieldtype": "Float",
"label": "Transaction Limit",
"non_negative": 1
}
],
"links": [],
"modified": "2021-03-02 17:35:14.084342",
"modified_by": "Administrator",
"module": "ERPNext Integrations",
"name": "Mpesa Settings",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
},
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "Accounts Manager",
"share": 1,
"write": 1
},
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "Accounts User",
"share": 1,
"write": 1
}
],
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View File

@ -1,354 +0,0 @@
# Copyright (c) 2020, Frappe Technologies and contributors
# For license information, please see license.txt
from json import dumps, loads
import frappe
from frappe import _
from frappe.integrations.utils import create_request_log
from frappe.model.document import Document
from frappe.utils import call_hook_method, fmt_money, get_request_site_address
from erpnext.erpnext_integrations.doctype.mpesa_settings.mpesa_connector import MpesaConnector
from erpnext.erpnext_integrations.doctype.mpesa_settings.mpesa_custom_fields import (
create_custom_pos_fields,
)
from erpnext.erpnext_integrations.utils import create_mode_of_payment
from erpnext.utilities import payment_app_import_guard
class MpesaSettings(Document):
supported_currencies = ["KES"]
def validate_transaction_currency(self, currency):
if currency not in self.supported_currencies:
frappe.throw(
_(
"Please select another payment method. Mpesa does not support transactions in currency '{0}'"
).format(currency)
)
def on_update(self):
with payment_app_import_guard():
from payments.utils import create_payment_gateway
create_custom_pos_fields()
create_payment_gateway(
"Mpesa-" + self.payment_gateway_name,
settings="Mpesa Settings",
controller=self.payment_gateway_name,
)
call_hook_method(
"payment_gateway_enabled", gateway="Mpesa-" + self.payment_gateway_name, payment_channel="Phone"
)
# required to fetch the bank account details from the payment gateway account
frappe.db.commit()
create_mode_of_payment("Mpesa-" + self.payment_gateway_name, payment_type="Phone")
def request_for_payment(self, **kwargs):
args = frappe._dict(kwargs)
request_amounts = self.split_request_amount_according_to_transaction_limit(args)
for i, amount in enumerate(request_amounts):
args.request_amount = amount
if frappe.flags.in_test:
from erpnext.erpnext_integrations.doctype.mpesa_settings.test_mpesa_settings import (
get_payment_request_response_payload,
)
response = frappe._dict(get_payment_request_response_payload(amount))
else:
response = frappe._dict(generate_stk_push(**args))
self.handle_api_response("CheckoutRequestID", args, response)
def split_request_amount_according_to_transaction_limit(self, args):
request_amount = args.request_amount
if request_amount > self.transaction_limit:
# make multiple requests
request_amounts = []
requests_to_be_made = frappe.utils.ceil(
request_amount / self.transaction_limit
) # 480/150 = ceil(3.2) = 4
for i in range(requests_to_be_made):
amount = self.transaction_limit
if i == requests_to_be_made - 1:
amount = request_amount - (
self.transaction_limit * i
) # for 4th request, 480 - (150 * 3) = 30
request_amounts.append(amount)
else:
request_amounts = [request_amount]
return request_amounts
@frappe.whitelist()
def get_account_balance_info(self):
payload = dict(
reference_doctype="Mpesa Settings", reference_docname=self.name, doc_details=vars(self)
)
if frappe.flags.in_test:
from erpnext.erpnext_integrations.doctype.mpesa_settings.test_mpesa_settings import (
get_test_account_balance_response,
)
response = frappe._dict(get_test_account_balance_response())
else:
response = frappe._dict(get_account_balance(payload))
self.handle_api_response("ConversationID", payload, response)
def handle_api_response(self, global_id, request_dict, response):
"""Response received from API calls returns a global identifier for each transaction, this code is returned during the callback."""
# check error response
if getattr(response, "requestId"):
req_name = getattr(response, "requestId")
error = response
else:
# global checkout id used as request name
req_name = getattr(response, global_id)
error = None
if not frappe.db.exists("Integration Request", req_name):
create_request_log(request_dict, "Host", "Mpesa", req_name, error)
if error:
frappe.throw(_(getattr(response, "errorMessage")), title=_("Transaction Error"))
def generate_stk_push(**kwargs):
"""Generate stk push by making a API call to the stk push API."""
args = frappe._dict(kwargs)
try:
callback_url = (
get_request_site_address(True)
+ "/api/method/erpnext.erpnext_integrations.doctype.mpesa_settings.mpesa_settings.verify_transaction"
)
mpesa_settings = frappe.get_doc("Mpesa Settings", args.payment_gateway[6:])
env = "production" if not mpesa_settings.sandbox else "sandbox"
# for sandbox, business shortcode is same as till number
business_shortcode = (
mpesa_settings.business_shortcode if env == "production" else mpesa_settings.till_number
)
connector = MpesaConnector(
env=env,
app_key=mpesa_settings.consumer_key,
app_secret=mpesa_settings.get_password("consumer_secret"),
)
mobile_number = sanitize_mobile_number(args.sender)
response = connector.stk_push(
business_shortcode=business_shortcode,
amount=args.request_amount,
passcode=mpesa_settings.get_password("online_passkey"),
callback_url=callback_url,
reference_code=mpesa_settings.till_number,
phone_number=mobile_number,
description="POS Payment",
)
return response
except Exception:
frappe.log_error("Mpesa Express Transaction Error")
frappe.throw(
_("Issue detected with Mpesa configuration, check the error logs for more details"),
title=_("Mpesa Express Error"),
)
def sanitize_mobile_number(number):
"""Add country code and strip leading zeroes from the phone number."""
return "254" + str(number).lstrip("0")
@frappe.whitelist(allow_guest=True)
def verify_transaction(**kwargs):
"""Verify the transaction result received via callback from stk."""
transaction_response = frappe._dict(kwargs["Body"]["stkCallback"])
checkout_id = getattr(transaction_response, "CheckoutRequestID", "")
if not isinstance(checkout_id, str):
frappe.throw(_("Invalid Checkout Request ID"))
integration_request = frappe.get_doc("Integration Request", checkout_id)
transaction_data = frappe._dict(loads(integration_request.data))
total_paid = 0 # for multiple integration request made against a pos invoice
success = False # for reporting successfull callback to point of sale ui
if transaction_response["ResultCode"] == 0:
if integration_request.reference_doctype and integration_request.reference_docname:
try:
item_response = transaction_response["CallbackMetadata"]["Item"]
amount = fetch_param_value(item_response, "Amount", "Name")
mpesa_receipt = fetch_param_value(item_response, "MpesaReceiptNumber", "Name")
pr = frappe.get_doc(
integration_request.reference_doctype, integration_request.reference_docname
)
mpesa_receipts, completed_payments = get_completed_integration_requests_info(
integration_request.reference_doctype, integration_request.reference_docname, checkout_id
)
total_paid = amount + sum(completed_payments)
mpesa_receipts = ", ".join(mpesa_receipts + [mpesa_receipt])
if total_paid >= pr.grand_total:
pr.run_method("on_payment_authorized", "Completed")
success = True
frappe.db.set_value("POS Invoice", pr.reference_name, "mpesa_receipt_number", mpesa_receipts)
integration_request.handle_success(transaction_response)
except Exception:
integration_request.handle_failure(transaction_response)
frappe.log_error("Mpesa: Failed to verify transaction")
else:
integration_request.handle_failure(transaction_response)
frappe.publish_realtime(
event="process_phone_payment",
doctype="POS Invoice",
docname=transaction_data.payment_reference,
user=integration_request.owner,
message={
"amount": total_paid,
"success": success,
"failure_message": transaction_response["ResultDesc"]
if transaction_response["ResultCode"] != 0
else "",
},
)
def get_completed_integration_requests_info(reference_doctype, reference_docname, checkout_id):
output_of_other_completed_requests = frappe.get_all(
"Integration Request",
filters={
"name": ["!=", checkout_id],
"reference_doctype": reference_doctype,
"reference_docname": reference_docname,
"status": "Completed",
},
pluck="output",
)
mpesa_receipts, completed_payments = [], []
for out in output_of_other_completed_requests:
out = frappe._dict(loads(out))
item_response = out["CallbackMetadata"]["Item"]
completed_amount = fetch_param_value(item_response, "Amount", "Name")
completed_mpesa_receipt = fetch_param_value(item_response, "MpesaReceiptNumber", "Name")
completed_payments.append(completed_amount)
mpesa_receipts.append(completed_mpesa_receipt)
return mpesa_receipts, completed_payments
def get_account_balance(request_payload):
"""Call account balance API to send the request to the Mpesa Servers."""
try:
mpesa_settings = frappe.get_doc("Mpesa Settings", request_payload.get("reference_docname"))
env = "production" if not mpesa_settings.sandbox else "sandbox"
connector = MpesaConnector(
env=env,
app_key=mpesa_settings.consumer_key,
app_secret=mpesa_settings.get_password("consumer_secret"),
)
callback_url = (
get_request_site_address(True)
+ "/api/method/erpnext.erpnext_integrations.doctype.mpesa_settings.mpesa_settings.process_balance_info"
)
response = connector.get_balance(
mpesa_settings.initiator_name,
mpesa_settings.security_credential,
mpesa_settings.till_number,
4,
mpesa_settings.name,
callback_url,
callback_url,
)
return response
except Exception:
frappe.log_error("Mpesa: Failed to get account balance")
frappe.throw(_("Please check your configuration and try again"), title=_("Error"))
@frappe.whitelist(allow_guest=True)
def process_balance_info(**kwargs):
"""Process and store account balance information received via callback from the account balance API call."""
account_balance_response = frappe._dict(kwargs["Result"])
conversation_id = getattr(account_balance_response, "ConversationID", "")
if not isinstance(conversation_id, str):
frappe.throw(_("Invalid Conversation ID"))
request = frappe.get_doc("Integration Request", conversation_id)
if request.status == "Completed":
return
transaction_data = frappe._dict(loads(request.data))
if account_balance_response["ResultCode"] == 0:
try:
result_params = account_balance_response["ResultParameters"]["ResultParameter"]
balance_info = fetch_param_value(result_params, "AccountBalance", "Key")
balance_info = format_string_to_json(balance_info)
ref_doc = frappe.get_doc(transaction_data.reference_doctype, transaction_data.reference_docname)
ref_doc.db_set("account_balance", balance_info)
request.handle_success(account_balance_response)
frappe.publish_realtime(
"refresh_mpesa_dashboard",
doctype="Mpesa Settings",
docname=transaction_data.reference_docname,
user=transaction_data.owner,
)
except Exception:
request.handle_failure(account_balance_response)
frappe.log_error(
title="Mpesa Account Balance Processing Error", message=account_balance_response
)
else:
request.handle_failure(account_balance_response)
def format_string_to_json(balance_info):
"""
Format string to json.
e.g: '''Working Account|KES|481000.00|481000.00|0.00|0.00'''
=> {'Working Account': {'current_balance': '481000.00',
'available_balance': '481000.00',
'reserved_balance': '0.00',
'uncleared_balance': '0.00'}}
"""
balance_dict = frappe._dict()
for account_info in balance_info.split("&"):
account_info = account_info.split("|")
balance_dict[account_info[0]] = dict(
current_balance=fmt_money(account_info[2], currency="KES"),
available_balance=fmt_money(account_info[3], currency="KES"),
reserved_balance=fmt_money(account_info[4], currency="KES"),
uncleared_balance=fmt_money(account_info[5], currency="KES"),
)
return dumps(balance_dict)
def fetch_param_value(response, key, key_field):
"""Fetch the specified key from list of dictionary. Key is identified via the key field."""
for param in response:
if param[key_field] == key:
return param["Value"]

View File

@ -1,361 +0,0 @@
# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and Contributors
# See license.txt
import unittest
from json import dumps
import frappe
from erpnext.accounts.doctype.pos_invoice.test_pos_invoice import create_pos_invoice
from erpnext.erpnext_integrations.doctype.mpesa_settings.mpesa_settings import (
process_balance_info,
verify_transaction,
)
from erpnext.erpnext_integrations.utils import create_mode_of_payment
class TestMpesaSettings(unittest.TestCase):
def setUp(self):
# create payment gateway in setup
create_mpesa_settings(payment_gateway_name="_Test")
create_mpesa_settings(payment_gateway_name="_Account Balance")
create_mpesa_settings(payment_gateway_name="Payment")
def tearDown(self):
frappe.db.sql("delete from `tabMpesa Settings`")
frappe.db.sql("delete from `tabIntegration Request` where integration_request_service = 'Mpesa'")
def test_creation_of_payment_gateway(self):
mode_of_payment = create_mode_of_payment("Mpesa-_Test", payment_type="Phone")
self.assertTrue(frappe.db.exists("Payment Gateway Account", {"payment_gateway": "Mpesa-_Test"}))
self.assertTrue(mode_of_payment.name)
self.assertEqual(mode_of_payment.type, "Phone")
def test_processing_of_account_balance(self):
mpesa_doc = create_mpesa_settings(payment_gateway_name="_Account Balance")
mpesa_doc.get_account_balance_info()
callback_response = get_account_balance_callback_payload()
process_balance_info(**callback_response)
integration_request = frappe.get_doc("Integration Request", "AG_20200927_00007cdb1f9fb6494315")
# test integration request creation and successful update of the status on receiving callback response
self.assertTrue(integration_request)
self.assertEqual(integration_request.status, "Completed")
# test formatting of account balance received as string to json with appropriate currency symbol
mpesa_doc.reload()
self.assertEqual(
mpesa_doc.account_balance,
dumps(
{
"Working Account": {
"current_balance": "Sh 481,000.00",
"available_balance": "Sh 481,000.00",
"reserved_balance": "Sh 0.00",
"uncleared_balance": "Sh 0.00",
}
}
),
)
integration_request.delete()
def test_processing_of_callback_payload(self):
mpesa_account = frappe.db.get_value(
"Payment Gateway Account", {"payment_gateway": "Mpesa-Payment"}, "payment_account"
)
frappe.db.set_value("Account", mpesa_account, "account_currency", "KES")
frappe.db.set_value("Customer", "_Test Customer", "default_currency", "KES")
pos_invoice = create_pos_invoice(do_not_submit=1)
pos_invoice.append(
"payments", {"mode_of_payment": "Mpesa-Payment", "account": mpesa_account, "amount": 500}
)
pos_invoice.contact_mobile = "093456543894"
pos_invoice.currency = "KES"
pos_invoice.save()
pr = pos_invoice.create_payment_request()
# test payment request creation
self.assertEqual(pr.payment_gateway, "Mpesa-Payment")
# submitting payment request creates integration requests with random id
integration_req_ids = frappe.get_all(
"Integration Request",
filters={
"reference_doctype": pr.doctype,
"reference_docname": pr.name,
},
pluck="name",
)
callback_response = get_payment_callback_payload(
Amount=500, CheckoutRequestID=integration_req_ids[0]
)
verify_transaction(**callback_response)
# test creation of integration request
integration_request = frappe.get_doc("Integration Request", integration_req_ids[0])
# test integration request creation and successful update of the status on receiving callback response
self.assertTrue(integration_request)
self.assertEqual(integration_request.status, "Completed")
pos_invoice.reload()
integration_request.reload()
self.assertEqual(pos_invoice.mpesa_receipt_number, "LGR7OWQX0R")
self.assertEqual(integration_request.status, "Completed")
frappe.db.set_value("Customer", "_Test Customer", "default_currency", "")
integration_request.delete()
pr.reload()
pr.cancel()
pr.delete()
pos_invoice.delete()
def test_processing_of_multiple_callback_payload(self):
mpesa_account = frappe.db.get_value(
"Payment Gateway Account", {"payment_gateway": "Mpesa-Payment"}, "payment_account"
)
frappe.db.set_value("Account", mpesa_account, "account_currency", "KES")
frappe.db.set_value("Mpesa Settings", "Payment", "transaction_limit", "500")
frappe.db.set_value("Customer", "_Test Customer", "default_currency", "KES")
pos_invoice = create_pos_invoice(do_not_submit=1)
pos_invoice.append(
"payments", {"mode_of_payment": "Mpesa-Payment", "account": mpesa_account, "amount": 1000}
)
pos_invoice.contact_mobile = "093456543894"
pos_invoice.currency = "KES"
pos_invoice.save()
pr = pos_invoice.create_payment_request()
# test payment request creation
self.assertEqual(pr.payment_gateway, "Mpesa-Payment")
# submitting payment request creates integration requests with random id
integration_req_ids = frappe.get_all(
"Integration Request",
filters={
"reference_doctype": pr.doctype,
"reference_docname": pr.name,
},
pluck="name",
)
# create random receipt nos and send it as response to callback handler
mpesa_receipt_numbers = [frappe.utils.random_string(5) for d in integration_req_ids]
integration_requests = []
for i in range(len(integration_req_ids)):
callback_response = get_payment_callback_payload(
Amount=500,
CheckoutRequestID=integration_req_ids[i],
MpesaReceiptNumber=mpesa_receipt_numbers[i],
)
# handle response manually
verify_transaction(**callback_response)
# test completion of integration request
integration_request = frappe.get_doc("Integration Request", integration_req_ids[i])
self.assertEqual(integration_request.status, "Completed")
integration_requests.append(integration_request)
# check receipt number once all the integration requests are completed
pos_invoice.reload()
self.assertEqual(pos_invoice.mpesa_receipt_number, ", ".join(mpesa_receipt_numbers))
frappe.db.set_value("Customer", "_Test Customer", "default_currency", "")
[d.delete() for d in integration_requests]
pr.reload()
pr.cancel()
pr.delete()
pos_invoice.delete()
def test_processing_of_only_one_succes_callback_payload(self):
mpesa_account = frappe.db.get_value(
"Payment Gateway Account", {"payment_gateway": "Mpesa-Payment"}, "payment_account"
)
frappe.db.set_value("Account", mpesa_account, "account_currency", "KES")
frappe.db.set_value("Mpesa Settings", "Payment", "transaction_limit", "500")
frappe.db.set_value("Customer", "_Test Customer", "default_currency", "KES")
pos_invoice = create_pos_invoice(do_not_submit=1)
pos_invoice.append(
"payments", {"mode_of_payment": "Mpesa-Payment", "account": mpesa_account, "amount": 1000}
)
pos_invoice.contact_mobile = "093456543894"
pos_invoice.currency = "KES"
pos_invoice.save()
pr = pos_invoice.create_payment_request()
# test payment request creation
self.assertEqual(pr.payment_gateway, "Mpesa-Payment")
# submitting payment request creates integration requests with random id
integration_req_ids = frappe.get_all(
"Integration Request",
filters={
"reference_doctype": pr.doctype,
"reference_docname": pr.name,
},
pluck="name",
)
# create random receipt nos and send it as response to callback handler
mpesa_receipt_numbers = [frappe.utils.random_string(5) for d in integration_req_ids]
callback_response = get_payment_callback_payload(
Amount=500,
CheckoutRequestID=integration_req_ids[0],
MpesaReceiptNumber=mpesa_receipt_numbers[0],
)
# handle response manually
verify_transaction(**callback_response)
# test completion of integration request
integration_request = frappe.get_doc("Integration Request", integration_req_ids[0])
self.assertEqual(integration_request.status, "Completed")
# now one request is completed
# second integration request fails
# now retrying payment request should make only one integration request again
pr = pos_invoice.create_payment_request()
new_integration_req_ids = frappe.get_all(
"Integration Request",
filters={
"reference_doctype": pr.doctype,
"reference_docname": pr.name,
"name": ["not in", integration_req_ids],
},
pluck="name",
)
self.assertEqual(len(new_integration_req_ids), 1)
frappe.db.set_value("Customer", "_Test Customer", "default_currency", "")
frappe.db.sql("delete from `tabIntegration Request` where integration_request_service = 'Mpesa'")
pr.reload()
pr.cancel()
pr.delete()
pos_invoice.delete()
def create_mpesa_settings(payment_gateway_name="Express"):
if frappe.db.exists("Mpesa Settings", payment_gateway_name):
return frappe.get_doc("Mpesa Settings", payment_gateway_name)
doc = frappe.get_doc(
dict( # nosec
doctype="Mpesa Settings",
sandbox=1,
payment_gateway_name=payment_gateway_name,
consumer_key="5sMu9LVI1oS3oBGPJfh3JyvLHwZOdTKn",
consumer_secret="VI1oS3oBGPJfh3JyvLHw",
online_passkey="LVI1oS3oBGPJfh3JyvLHwZOd",
till_number="174379",
)
)
doc.insert(ignore_permissions=True)
return doc
def get_test_account_balance_response():
"""Response received after calling the account balance API."""
return {
"ResultType": 0,
"ResultCode": 0,
"ResultDesc": "The service request has been accepted successfully.",
"OriginatorConversationID": "10816-694520-2",
"ConversationID": "AG_20200927_00007cdb1f9fb6494315",
"TransactionID": "LGR0000000",
"ResultParameters": {
"ResultParameter": [
{"Key": "ReceiptNo", "Value": "LGR919G2AV"},
{"Key": "Conversation ID", "Value": "AG_20170727_00004492b1b6d0078fbe"},
{"Key": "FinalisedTime", "Value": 20170727101415},
{"Key": "Amount", "Value": 10},
{"Key": "TransactionStatus", "Value": "Completed"},
{"Key": "ReasonType", "Value": "Salary Payment via API"},
{"Key": "TransactionReason"},
{"Key": "DebitPartyCharges", "Value": "Fee For B2C Payment|KES|33.00"},
{"Key": "DebitAccountType", "Value": "Utility Account"},
{"Key": "InitiatedTime", "Value": 20170727101415},
{"Key": "Originator Conversation ID", "Value": "19455-773836-1"},
{"Key": "CreditPartyName", "Value": "254708374149 - John Doe"},
{"Key": "DebitPartyName", "Value": "600134 - Safaricom157"},
]
},
"ReferenceData": {"ReferenceItem": {"Key": "Occasion", "Value": "aaaa"}},
}
def get_payment_request_response_payload(Amount=500):
"""Response received after successfully calling the stk push process request API."""
CheckoutRequestID = frappe.utils.random_string(10)
return {
"MerchantRequestID": "8071-27184008-1",
"CheckoutRequestID": CheckoutRequestID,
"ResultCode": 0,
"ResultDesc": "The service request is processed successfully.",
"CallbackMetadata": {
"Item": [
{"Name": "Amount", "Value": Amount},
{"Name": "MpesaReceiptNumber", "Value": "LGR7OWQX0R"},
{"Name": "TransactionDate", "Value": 20201006113336},
{"Name": "PhoneNumber", "Value": 254723575670},
]
},
}
def get_payment_callback_payload(
Amount=500, CheckoutRequestID="ws_CO_061020201133231972", MpesaReceiptNumber="LGR7OWQX0R"
):
"""Response received from the server as callback after calling the stkpush process request API."""
return {
"Body": {
"stkCallback": {
"MerchantRequestID": "19465-780693-1",
"CheckoutRequestID": CheckoutRequestID,
"ResultCode": 0,
"ResultDesc": "The service request is processed successfully.",
"CallbackMetadata": {
"Item": [
{"Name": "Amount", "Value": Amount},
{"Name": "MpesaReceiptNumber", "Value": MpesaReceiptNumber},
{"Name": "Balance"},
{"Name": "TransactionDate", "Value": 20170727154800},
{"Name": "PhoneNumber", "Value": 254721566839},
]
},
}
}
}
def get_account_balance_callback_payload():
"""Response received from the server as callback after calling the account balance API."""
return {
"Result": {
"ResultType": 0,
"ResultCode": 0,
"ResultDesc": "The service request is processed successfully.",
"OriginatorConversationID": "16470-170099139-1",
"ConversationID": "AG_20200927_00007cdb1f9fb6494315",
"TransactionID": "OIR0000000",
"ResultParameters": {
"ResultParameter": [
{"Key": "AccountBalance", "Value": "Working Account|KES|481000.00|481000.00|0.00|0.00"},
{"Key": "BOCompletedTime", "Value": 20200927234123},
]
},
"ReferenceData": {
"ReferenceItem": {
"Key": "QueueTimeoutURL",
"Value": "https://internalsandbox.safaricom.co.ke/mpesa/abresults/v1/submit",
}
},
}
}

View File

@ -43,40 +43,6 @@ class TestPlaidSettings(unittest.TestCase):
add_account_subtype("loan")
self.assertEqual(frappe.get_doc("Bank Account Subtype", "loan").name, "loan")
def test_default_bank_account(self):
if not frappe.db.exists("Bank", "Citi"):
frappe.get_doc({"doctype": "Bank", "bank_name": "Citi"}).insert()
bank_accounts = {
"account": {
"subtype": "checking",
"mask": "0000",
"type": "depository",
"id": "6GbM6RRQgdfy3lAqGz4JUnpmR948WZFg8DjQK",
"name": "Plaid Checking",
},
"account_id": "6GbM6RRQgdfy3lAqGz4JUnpmR948WZFg8DjQK",
"link_session_id": "db673d75-61aa-442a-864f-9b3f174f3725",
"accounts": [
{
"type": "depository",
"subtype": "checking",
"mask": "0000",
"id": "6GbM6RRQgdfy3lAqGz4JUnpmR948WZFg8DjQK",
"name": "Plaid Checking",
}
],
"institution": {"institution_id": "ins_6", "name": "Citi"},
}
bank = json.dumps(frappe.get_doc("Bank", "Citi").as_dict(), default=json_handler)
company = frappe.db.get_single_value("Global Defaults", "default_company")
frappe.db.set_value("Company", company, "default_bank_account", None)
self.assertRaises(
frappe.ValidationError, add_bank_accounts, response=bank_accounts, bank=bank, company=company
)
def test_new_transaction(self):
if not frappe.db.exists("Bank", "Citi"):
frappe.get_doc({"doctype": "Bank", "bank_name": "Citi"}).insert()

View File

@ -1,70 +0,0 @@
# Copyright (c) 2018, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
import frappe
from frappe import _
from frappe.integrations.utils import create_request_log
from erpnext.utilities import payment_app_import_guard
def create_stripe_subscription(gateway_controller, data):
with payment_app_import_guard():
import stripe
stripe_settings = frappe.get_doc("Stripe Settings", gateway_controller)
stripe_settings.data = frappe._dict(data)
stripe.api_key = stripe_settings.get_password(fieldname="secret_key", raise_exception=False)
stripe.default_http_client = stripe.http_client.RequestsClient()
try:
stripe_settings.integration_request = create_request_log(stripe_settings.data, "Host", "Stripe")
stripe_settings.payment_plans = frappe.get_doc(
"Payment Request", stripe_settings.data.reference_docname
).subscription_plans
return create_subscription_on_stripe(stripe_settings)
except Exception:
stripe_settings.log_error("Unable to create Stripe subscription")
return {
"redirect_to": frappe.redirect_to_message(
_("Server Error"),
_(
"It seems that there is an issue with the server's stripe configuration. In case of failure, the amount will get refunded to your account."
),
),
"status": 401,
}
def create_subscription_on_stripe(stripe_settings):
with payment_app_import_guard():
import stripe
items = []
for payment_plan in stripe_settings.payment_plans:
plan = frappe.db.get_value("Subscription Plan", payment_plan.plan, "product_price_id")
items.append({"price": plan, "quantity": payment_plan.qty})
try:
customer = stripe.Customer.create(
source=stripe_settings.data.stripe_token_id,
description=stripe_settings.data.payer_name,
email=stripe_settings.data.payer_email,
)
subscription = stripe.Subscription.create(customer=customer, items=items)
if subscription.status == "active":
stripe_settings.integration_request.db_set("status", "Completed", update_modified=False)
stripe_settings.flags.status_changed_to = "Completed"
else:
stripe_settings.integration_request.db_set("status", "Failed", update_modified=False)
frappe.log_error(f"Stripe Subscription ID {subscription.id}: Payment failed")
except Exception:
stripe_settings.integration_request.db_set("status", "Failed", update_modified=False)
stripe_settings.log_error("Unable to create Stripe subscription")
return stripe_settings.finalize_request()

View File

@ -6,8 +6,6 @@ from urllib.parse import urlparse
import frappe
from frappe import _
from erpnext import get_default_company
def validate_webhooks_request(doctype, hmac_key, secret_key="secret"):
def innerfn(fn):
@ -47,35 +45,6 @@ def get_webhook_address(connector_name, method, exclude_uri=False, force_https=F
return server_url
def create_mode_of_payment(gateway, payment_type="General"):
payment_gateway_account = frappe.db.get_value(
"Payment Gateway Account", {"payment_gateway": gateway}, ["payment_account"]
)
mode_of_payment = frappe.db.exists("Mode of Payment", gateway)
if not mode_of_payment and payment_gateway_account:
mode_of_payment = frappe.get_doc(
{
"doctype": "Mode of Payment",
"mode_of_payment": gateway,
"enabled": 1,
"type": payment_type,
"accounts": [
{
"doctype": "Mode of Payment Account",
"company": get_default_company(),
"default_account": payment_gateway_account,
}
],
}
)
mode_of_payment.insert(ignore_permissions=True)
return mode_of_payment
elif mode_of_payment:
return frappe.get_doc("Mode of Payment", mode_of_payment)
def get_tracking_url(carrier, tracking_number):
# Return the formatted Tracking URL.
tracking_url = ""

View File

@ -344,5 +344,6 @@ erpnext.patches.v15_0.delete_woocommerce_settings_doctype
erpnext.patches.v14_0.migrate_deferred_accounts_to_item_defaults
erpnext.patches.v14_0.update_invoicing_period_in_subscription
execute:frappe.delete_doc("Page", "welcome-to-erpnext")
erpnext.patches.v15_0.delete_payment_gateway_doctypes
# below migration patch should always run last
erpnext.patches.v14_0.migrate_gl_to_payment_ledger

View File

@ -0,0 +1,6 @@
import frappe
def execute():
for dt in ("GoCardless Settings", "GoCardless Mandate", "Mpesa Settings"):
frappe.delete_doc("DocType", dt, ignore_missing=True)

View File

@ -1,24 +0,0 @@
$(document).ready(function() {
var data = {{ frappe.form_dict | json }};
var doctype = "{{ reference_doctype }}"
var docname = "{{ reference_docname }}"
frappe.call({
method: "erpnext.templates.pages.integrations.gocardless_checkout.check_mandate",
freeze: true,
headers: {
"X-Requested-With": "XMLHttpRequest"
},
args: {
"data": JSON.stringify(data),
"reference_doctype": doctype,
"reference_docname": docname
},
callback: function(r) {
if (r.message) {
window.location.href = r.message.redirect_to
}
}
})
})

View File

@ -1,24 +0,0 @@
$(document).ready(function() {
var redirect_flow_id = "{{ redirect_flow_id }}";
var doctype = "{{ reference_doctype }}";
var docname = "{{ reference_docname }}";
frappe.call({
method: "erpnext.templates.pages.integrations.gocardless_confirmation.confirm_payment",
freeze: true,
headers: {
"X-Requested-With": "XMLHttpRequest"
},
args: {
"redirect_flow_id": redirect_flow_id,
"reference_doctype": doctype,
"reference_docname": docname
},
callback: function(r) {
if (r.message) {
window.location.href = r.message.redirect_to;
}
}
});
});

View File

@ -1,16 +0,0 @@
{% extends "templates/web.html" %}
{% block title %} Payment {% endblock %}
{%- block header -%}{% endblock %}
{% block script %}
<script>{% include "templates/includes/integrations/gocardless_checkout.js" %}</script>
{% endblock %}
{%- block page_content -%}
<p class='lead text-center'>
<span class='gocardless-loading'>{{ _("Loading Payment System") }}</span>
</p>
{% endblock %}

View File

@ -1,100 +0,0 @@
# Copyright (c) 2018, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
import json
import frappe
from frappe import _
from frappe.utils import flt, get_url
from erpnext.erpnext_integrations.doctype.gocardless_settings.gocardless_settings import (
get_gateway_controller,
gocardless_initialization,
)
no_cache = 1
expected_keys = (
"amount",
"title",
"description",
"reference_doctype",
"reference_docname",
"payer_name",
"payer_email",
"order_id",
"currency",
)
def get_context(context):
context.no_cache = 1
# all these keys exist in form_dict
if not (set(expected_keys) - set(frappe.form_dict.keys())):
for key in expected_keys:
context[key] = frappe.form_dict[key]
context["amount"] = flt(context["amount"])
gateway_controller = get_gateway_controller(context.reference_docname)
context["header_img"] = frappe.db.get_value(
"GoCardless Settings", gateway_controller, "header_img"
)
else:
frappe.redirect_to_message(
_("Some information is missing"),
_("Looks like someone sent you to an incomplete URL. Please ask them to look into it."),
)
frappe.local.flags.redirect_location = frappe.local.response.location
raise frappe.Redirect
@frappe.whitelist(allow_guest=True)
def check_mandate(data, reference_doctype, reference_docname):
data = json.loads(data)
client = gocardless_initialization(reference_docname)
payer = frappe.get_doc("Customer", data["payer_name"])
if payer.customer_type == "Individual" and payer.customer_primary_contact is not None:
primary_contact = frappe.get_doc("Contact", payer.customer_primary_contact)
prefilled_customer = {
"company_name": payer.name,
"given_name": primary_contact.first_name,
}
if primary_contact.last_name is not None:
prefilled_customer.update({"family_name": primary_contact.last_name})
if primary_contact.email_id is not None:
prefilled_customer.update({"email": primary_contact.email_id})
else:
prefilled_customer.update({"email": frappe.session.user})
else:
prefilled_customer = {"company_name": payer.name, "email": frappe.session.user}
success_url = get_url(
"./integrations/gocardless_confirmation?reference_doctype="
+ reference_doctype
+ "&reference_docname="
+ reference_docname
)
try:
redirect_flow = client.redirect_flows.create(
params={
"description": _("Pay {0} {1}").format(data["amount"], data["currency"]),
"session_token": frappe.session.user,
"success_redirect_url": success_url,
"prefilled_customer": prefilled_customer,
}
)
return {"redirect_to": redirect_flow.redirect_url}
except Exception as e:
frappe.log_error("GoCardless Payment Error")
return {"redirect_to": "/integrations/payment-failed"}

View File

@ -1,16 +0,0 @@
{% extends "templates/web.html" %}
{% block title %} Payment {% endblock %}
{%- block header -%}{% endblock %}
{% block script %}
<script>{% include "templates/includes/integrations/gocardless_confirmation.js" %}</script>
{% endblock %}
{%- block page_content -%}
<p class='lead text-center'>
<span class='gocardless-loading'>{{ _("Payment Confirmation") }}</span>
</p>
{% endblock %}

View File

@ -1,106 +0,0 @@
# Copyright (c) 2018, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
import frappe
from frappe import _
from erpnext.erpnext_integrations.doctype.gocardless_settings.gocardless_settings import (
get_gateway_controller,
gocardless_initialization,
)
no_cache = 1
expected_keys = ("redirect_flow_id", "reference_doctype", "reference_docname")
def get_context(context):
context.no_cache = 1
# all these keys exist in form_dict
if not (set(expected_keys) - set(frappe.form_dict.keys())):
for key in expected_keys:
context[key] = frappe.form_dict[key]
else:
frappe.redirect_to_message(
_("Some information is missing"),
_("Looks like someone sent you to an incomplete URL. Please ask them to look into it."),
)
frappe.local.flags.redirect_location = frappe.local.response.location
raise frappe.Redirect
@frappe.whitelist(allow_guest=True)
def confirm_payment(redirect_flow_id, reference_doctype, reference_docname):
client = gocardless_initialization(reference_docname)
try:
redirect_flow = client.redirect_flows.complete(
redirect_flow_id, params={"session_token": frappe.session.user}
)
confirmation_url = redirect_flow.confirmation_url
gocardless_success_page = frappe.get_hooks("gocardless_success_page")
if gocardless_success_page:
confirmation_url = frappe.get_attr(gocardless_success_page[-1])(
reference_doctype, reference_docname
)
data = {
"mandate": redirect_flow.links.mandate,
"customer": redirect_flow.links.customer,
"redirect_to": confirmation_url,
"redirect_message": "Mandate successfully created",
"reference_doctype": reference_doctype,
"reference_docname": reference_docname,
}
try:
create_mandate(data)
except Exception as e:
frappe.log_error("GoCardless Mandate Registration Error")
gateway_controller = get_gateway_controller(reference_docname)
frappe.get_doc("GoCardless Settings", gateway_controller).create_payment_request(data)
return {"redirect_to": confirmation_url}
except Exception as e:
frappe.log_error("GoCardless Payment Error")
return {"redirect_to": "/integrations/payment-failed"}
def create_mandate(data):
data = frappe._dict(data)
frappe.logger().debug(data)
mandate = data.get("mandate")
if frappe.db.exists("GoCardless Mandate", mandate):
return
else:
reference_doc = frappe.db.get_value(
data.get("reference_doctype"),
data.get("reference_docname"),
["reference_doctype", "reference_name"],
as_dict=1,
)
erpnext_customer = frappe.db.get_value(
reference_doc.reference_doctype, reference_doc.reference_name, ["customer_name"], as_dict=1
)
try:
frappe.get_doc(
{
"doctype": "GoCardless Mandate",
"mandate": mandate,
"customer": erpnext_customer.customer_name,
"gocardless_customer": data.get("customer"),
}
).insert(ignore_permissions=True)
except Exception:
frappe.log_error("Gocardless: Unable to create mandate")

View File

@ -16,11 +16,9 @@ dependencies = [
"holidays~=0.28",
# integration dependencies
"gocardless-pro~=1.22.0",
"googlemaps",
"plaid-python~=7.2.1",
"python-youtube~=0.8.0",
"tweepy~=4.14.0",
# Not used directly - required by PyQRCode for PNG generation
"pypng~=0.20220715.0",