perf: reduce memory usage by paging through records

While migrating GL entries to Payment Ledger, page through records using
primary key to reduce memory usage.
This commit is contained in:
ruthra kumar 2023-01-20 15:32:16 +05:30
parent adaeba1554
commit fee0ca8cd9

View File

@ -2,7 +2,8 @@ import frappe
from frappe import qb
from frappe.query_builder import Case, CustomFunction
from frappe.query_builder.custom import ConstantColumn
from frappe.query_builder.functions import IfNull
from frappe.query_builder.functions import Count, IfNull
from frappe.utils import flt
from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import (
get_dimensions,
@ -17,9 +18,9 @@ def create_accounting_dimension_fields():
make_dimension_in_accounting_doctypes(dimension, ["Payment Ledger Entry"])
def generate_name_for_payment_ledger_entries(gl_entries):
for index, entry in enumerate(gl_entries, 1):
entry.name = index
def generate_name_for_payment_ledger_entries(gl_entries, start):
for index, entry in enumerate(gl_entries, 0):
entry.name = start + index
def get_columns():
@ -81,6 +82,14 @@ def insert_chunk_into_payment_ledger(insert_query, gl_entries):
def execute():
"""
Description:
Migrate records from `tabGL Entry` to `tabPayment Ledger Entry`.
Patch is non-resumable. if patch failed or is terminatted abnormally, clear 'tabPayment Ledger Entry' table manually before re-running. Re-running is safe only during V13->V14 update.
Note: Post successful migration to V14, re-running is NOT-SAFE and SHOULD NOT be attempted.
"""
if frappe.reload_doc("accounts", "doctype", "payment_ledger_entry"):
# create accounting dimension fields in Payment Ledger
create_accounting_dimension_fields()
@ -89,52 +98,90 @@ def execute():
account = qb.DocType("Account")
ifelse = CustomFunction("IF", ["condition", "then", "else"])
gl_entries = (
qb.from_(gl)
.inner_join(account)
.on((gl.account == account.name) & (account.account_type.isin(["Receivable", "Payable"])))
.select(
gl.star,
ConstantColumn(1).as_("docstatus"),
account.account_type.as_("account_type"),
IfNull(
ifelse(gl.against_voucher_type == "", None, gl.against_voucher_type), gl.voucher_type
).as_("against_voucher_type"),
IfNull(ifelse(gl.against_voucher == "", None, gl.against_voucher), gl.voucher_no).as_(
"against_voucher_no"
),
# convert debit/credit to amount
Case()
.when(account.account_type == "Receivable", gl.debit - gl.credit)
.else_(gl.credit - gl.debit)
.as_("amount"),
# convert debit/credit in account currency to amount in account currency
Case()
.when(
account.account_type == "Receivable",
gl.debit_in_account_currency - gl.credit_in_account_currency,
)
.else_(gl.credit_in_account_currency - gl.debit_in_account_currency)
.as_("amount_in_account_currency"),
)
.where(gl.is_cancelled == 0)
.orderby(gl.creation)
.run(as_dict=True)
# Get Records Count
accounts = (
qb.from_(account)
.select(account.name)
.where((account.account_type == "Receivable") | (account.account_type == "Payable"))
.orderby(account.name)
)
un_processed = (
qb.from_(gl)
.select(Count(gl.name))
.where((gl.is_cancelled == 0) & (gl.account.isin(accounts)))
.run()
)[0][0]
# primary key(name) for payment ledger records
generate_name_for_payment_ledger_entries(gl_entries)
if un_processed:
print(f"Migrating {un_processed} GL Entries to Payment Ledger")
# split data into chunks
chunk_size = 1000
try:
for i in range(0, len(gl_entries), chunk_size):
insert_query = build_insert_query()
insert_chunk_into_payment_ledger(insert_query, gl_entries[i : i + chunk_size])
frappe.db.commit()
except Exception as err:
frappe.db.rollback()
ple = qb.DocType("Payment Ledger Entry")
qb.from_(ple).delete().where(ple.docstatus >= 0).run()
frappe.db.commit()
raise err
processed = 0
last_update_percent = 0
batch_size = 5000
last_name = None
while True:
if last_name:
where_clause = gl.name.gt(last_name) & (gl.is_cancelled == 0)
else:
where_clause = gl.is_cancelled == 0
gl_entries = (
qb.from_(gl)
.inner_join(account)
.on((gl.account == account.name) & (account.account_type.isin(["Receivable", "Payable"])))
.select(
gl.star,
ConstantColumn(1).as_("docstatus"),
account.account_type.as_("account_type"),
IfNull(
ifelse(gl.against_voucher_type == "", None, gl.against_voucher_type), gl.voucher_type
).as_("against_voucher_type"),
IfNull(ifelse(gl.against_voucher == "", None, gl.against_voucher), gl.voucher_no).as_(
"against_voucher_no"
),
# convert debit/credit to amount
Case()
.when(account.account_type == "Receivable", gl.debit - gl.credit)
.else_(gl.credit - gl.debit)
.as_("amount"),
# convert debit/credit in account currency to amount in account currency
Case()
.when(
account.account_type == "Receivable",
gl.debit_in_account_currency - gl.credit_in_account_currency,
)
.else_(gl.credit_in_account_currency - gl.debit_in_account_currency)
.as_("amount_in_account_currency"),
)
.where(where_clause)
.orderby(gl.name)
.limit(batch_size)
.run(as_dict=True)
)
if gl_entries:
last_name = gl_entries[-1].name
# primary key(name) for payment ledger records
generate_name_for_payment_ledger_entries(gl_entries, processed)
try:
insert_query = build_insert_query()
insert_chunk_into_payment_ledger(insert_query, gl_entries)
frappe.db.commit()
processed += len(gl_entries)
# Progress message
percent = flt((processed / un_processed) * 100, 2)
if percent - last_update_percent > 1:
print(f"{percent}% ({processed}) records processed")
last_update_percent = percent
except Exception as err:
print("Migration Failed. Clear `tabPayment Ledger Entry` table before re-running")
raise err
else:
break
print(f"{processed} records have been sucessfully migrated")