custom_ui/csv/data.py
2026-02-19 11:02:06 -06:00

375 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Converts ERPNext Data Export CSVs into import-ready CSVs using template headers.
Handles both simple doctypes (Item Price) and doctypes with child tables (Item).
Reads the template file (from production) to determine the exact columns needed,
then maps data from the export file (from staging) into those columns.
Usage:
python data.py
Reads:
- Item.csv (export from staging)
- Item-template.csv (template from production)
- Item Price.csv (export from staging)
- Item Price-template.csv (template from production)
Writes:
- Item-import-ready.csv
- Item Price-import-ready.csv
- BOM-import-ready.csv
- Item Group-import-ready.csv
"""
import csv
import re
import os
from collections import defaultdict
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
def clean_cell(cell):
"""Remove the extra wrapping quotes ERPNext puts around IDs and values."""
cell = cell.strip()
# ERPNext exports IDs as triple-quoted: """abc123""" -> "abc123" after csv reader
while cell.startswith('"') and cell.endswith('"') and len(cell) >= 2:
cell = cell[1:-1]
return cell
# ---------------------------------------------------------------------------
# Export parser
# ---------------------------------------------------------------------------
def parse_export(filepath):
"""
Parse an ERPNext Data Import/Export CSV.
Returns
-------
sections : list[dict]
Each dict has 'labels' (Column Labels), 'names' (Column Names),
and 'start_idx' (column index in the raw row).
sections[0] = main doctype, sections[1:] = child tables.
data_rows : list[list[str]]
The raw data rows (index 0 of each row is the empty leading cell).
"""
with open(filepath, newline="", encoding="utf-8") as fh:
rows = list(csv.reader(fh))
label_row_idx = name_row_idx = data_start = None
for i, row in enumerate(rows):
first = row[0].strip() if row else ""
if first == "Column Labels:":
label_row_idx = i
elif first == "Column Name:":
name_row_idx = i
elif "Start entering data below this line" in first:
data_start = i + 1
if label_row_idx is None or name_row_idx is None or data_start is None:
raise ValueError(f"Cannot locate header / data rows in {filepath}")
labels = rows[label_row_idx]
names = rows[name_row_idx]
# Split into sections using the Column Name row ('~' separator)
sections = []
cur_labels, cur_names, cur_start = [], [], 1 # skip col-0 prefix
for i in range(1, max(len(labels), len(names))):
nm = names[i].strip() if i < len(names) else ""
lbl = labels[i].strip() if i < len(labels) else ""
if nm == "~":
if cur_labels:
sections.append({
"labels": cur_labels,
"names": cur_names,
"start_idx": cur_start,
})
cur_labels, cur_names, cur_start = [], [], i + 1
else:
cur_labels.append(lbl)
cur_names.append(nm)
if cur_labels:
sections.append({
"labels": cur_labels,
"names": cur_names,
"start_idx": cur_start,
})
return sections, rows[data_start:]
# ---------------------------------------------------------------------------
# Template parser
# ---------------------------------------------------------------------------
def parse_template(filepath):
"""
Read a production template CSV (single header row).
Returns
-------
header : list[str] the exact column labels
section_names : set[str] child-table section names found via 'ID (…)'
"""
with open(filepath, newline="", encoding="utf-8") as fh:
header = list(csv.reader(fh))[0]
section_names = set()
for col in header:
m = re.match(r"^ID \((.+)\)$", col)
if m:
section_names.add(m.group(1))
return header, section_names
def _split_template_col(col, section_names):
"""
Decompose a template column into (base_label, section_name | None).
'Barcode (Barcodes)' -> ('Barcode', 'Barcodes')
'No of Months (Expense)' -> ('No of Months (Expense)', None)
because 'Expense' is NOT a known child-table section name.
"""
for sname in section_names:
suffix = f" ({sname})"
if col.endswith(suffix):
return col[: -len(suffix)], sname
return col, None
# ---------------------------------------------------------------------------
# Section matching (child tables)
# ---------------------------------------------------------------------------
def _match_child_sections(template_child_labels, export_sections):
"""
Map each template child-section name -> export section index.
Matches by column-label overlap (template labels ⊆ export labels).
"""
mapping = {}
for tname, tlabels in template_child_labels.items():
best_idx, best_score = None, 0
for idx in range(1, len(export_sections)):
elabels = set(export_sections[idx]["labels"])
score = sum(1 for tl in tlabels if tl in elabels)
if score > best_score:
best_score = score
best_idx = idx
if best_idx is not None and best_score > 0:
mapping[tname] = best_idx
return mapping
# ---------------------------------------------------------------------------
# Topological sort for hierarchical doctypes
# ---------------------------------------------------------------------------
def _topo_sort(rows, name_idx, parent_idx):
"""
Sort rows so that a parent row always appears before its children.
Rows whose parent is empty or not in the dataset come first.
"""
by_name = {} # name -> row
children = defaultdict(list) # parent_name -> [row, ...]
roots = []
for row in rows:
name = row[name_idx].strip()
parent = row[parent_idx].strip()
by_name[name] = row
if not parent:
roots.append(row)
else:
children[parent].append(row)
# BFS from roots
ordered = []
queue = list(roots)
seen = set()
while queue:
current = queue.pop(0)
cname = current[name_idx].strip()
if cname in seen:
continue
seen.add(cname)
ordered.append(current)
for child in children.get(cname, []):
queue.append(child)
# Append any rows whose parent isn't in the dataset (orphans)
for row in rows:
rname = row[name_idx].strip()
if rname not in seen:
ordered.append(row)
return ordered
# ---------------------------------------------------------------------------
# Build import-ready CSV
# ---------------------------------------------------------------------------
def build_import_csv(template_path, export_path, output_path,
topo_sort_col=None, strip_link_cols=None):
"""
Create a clean import-ready CSV from a template + export pair.
Parameters
----------
topo_sort_col : tuple(str, str) | None
(name_col, parent_col) template column labels to topologically sort
rows so parents appear before children.
strip_link_cols : list[str] | None
Template column labels whose values should be blanked out (e.g.
Link fields referencing records that don't exist in production).
"""
header, section_names = parse_template(template_path)
export_sections, data_rows = parse_export(export_path)
# Collect child-table column labels by section from the template
tmpl_child = {} # section_name -> [base_label, …]
for col in header:
base, sname = _split_template_col(col, section_names)
if sname:
tmpl_child.setdefault(sname, []).append(base)
# Match template sections to export sections
sec_map = _match_child_sections(tmpl_child, export_sections)
# Build label -> export-column-index maps
main_lbl_idx = {
lbl: export_sections[0]["start_idx"] + i
for i, lbl in enumerate(export_sections[0]["labels"])
}
child_lbl_idx = {}
for tname, eidx in sec_map.items():
esec = export_sections[eidx]
child_lbl_idx[tname] = {
lbl: esec["start_idx"] + i
for i, lbl in enumerate(esec["labels"])
}
# Map every template column to an export column index (or None)
col_map = []
for col in header:
base, sname = _split_template_col(col, section_names)
if sname and sname in child_lbl_idx:
col_map.append(child_lbl_idx[sname].get(base))
else:
# Main-table column (or child section with no match -> None)
col_map.append(main_lbl_idx.get(col if sname is None else base))
# Build all output rows first
out_rows = []
for row in data_rows:
if not row or all(c.strip() == "" for c in row):
continue
out = []
for idx in col_map:
if idx is not None and idx < len(row):
out.append(clean_cell(row[idx]))
else:
out.append("")
if any(v.strip() for v in out):
out_rows.append(out)
# Strip Link columns whose targets won't exist in production
if strip_link_cols:
strip_idxs = [i for i, col in enumerate(header) if col in strip_link_cols]
for out in out_rows:
for si in strip_idxs:
out[si] = ""
# Topological sort so parent records are created before children
if topo_sort_col:
name_col, parent_col = topo_sort_col
ni = header.index(name_col)
pi = header.index(parent_col)
out_rows = _topo_sort(out_rows, ni, pi)
# Write output
row_count = 0
with open(output_path, "w", newline="", encoding="utf-8") as fh:
writer = csv.writer(fh)
writer.writerow(header) # exact template header
for out in out_rows:
writer.writerow(out)
row_count += 1
print(f" -> {output_path} ({row_count} data rows)")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
# (template, export, output, topo_sort_col, strip_link_cols)
# topo_sort_col = (name_col_label, parent_col_label) or None
# strip_link_cols = list of template column labels to blank out
item_group_strip = [
"ID (Item Group Defaults)",
"Company (Item Group Defaults)",
"Default Buying Cost Center (Item Group Defaults)",
"Default Discount Account (Item Group Defaults)",
"Default Expense Account (Item Group Defaults)",
"Default Income Account (Item Group Defaults)",
"Default Price List (Item Group Defaults)",
"Default Provisional Account (Item Group Defaults)",
"Default Selling Cost Center (Item Group Defaults)",
"Default Supplier (Item Group Defaults)",
"Default Warehouse (Item Group Defaults)",
"Deferred Expense Account (Item Group Defaults)",
"Deferred Revenue Account (Item Group Defaults)",
]
pairs = [
("Item-template.csv", "Item.csv", "Item-import-ready.csv",
None, None),
("Item Price-template.csv", "Item Price.csv", "Item Price-import-ready.csv",
None, None),
("BOM-template.csv", "BOM.csv", "BOM-import-ready.csv",
None, None),
("Item Group-template.csv", "Item Group.csv", "Item Group-import-ready.csv",
("Item Group Name", "Parent Item Group"), item_group_strip),
("Supplier-template.csv", "Supplier.csv", "Supplier-import-ready.csv",
None, None),
("User-template.csv", "User.csv", "User-import-ready.csv",
None, None),
]
for tmpl, export, output, topo, strip in pairs:
tmpl_path = os.path.join(SCRIPT_DIR, tmpl)
export_path = os.path.join(SCRIPT_DIR, export)
output_path = os.path.join(SCRIPT_DIR, output)
if not os.path.exists(tmpl_path):
print(f" !! Template not found: {tmpl}")
continue
if not os.path.exists(export_path):
print(f" !! Export not found: {export}")
continue
print(f"Processing {export} ...")
build_import_csv(tmpl_path, export_path, output_path,
topo_sort_col=topo, strip_link_cols=strip)
print("\nDone.")
if __name__ == "__main__":
main()