""" Converts ERPNext Data Export CSVs into import-ready CSVs using template headers. Handles both simple doctypes (Item Price) and doctypes with child tables (Item). Reads the template file (from production) to determine the exact columns needed, then maps data from the export file (from staging) into those columns. Usage: python data.py Reads: - Item.csv (export from staging) - Item-template.csv (template from production) - Item Price.csv (export from staging) - Item Price-template.csv (template from production) Writes: - Item-import-ready.csv - Item Price-import-ready.csv - BOM-import-ready.csv - Item Group-import-ready.csv """ import csv import re import os from collections import defaultdict SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) def clean_cell(cell): """Remove the extra wrapping quotes ERPNext puts around IDs and values.""" cell = cell.strip() # ERPNext exports IDs as triple-quoted: """abc123""" -> "abc123" after csv reader while cell.startswith('"') and cell.endswith('"') and len(cell) >= 2: cell = cell[1:-1] return cell # --------------------------------------------------------------------------- # Export parser # --------------------------------------------------------------------------- def parse_export(filepath): """ Parse an ERPNext Data Import/Export CSV. Returns ------- sections : list[dict] Each dict has 'labels' (Column Labels), 'names' (Column Names), and 'start_idx' (column index in the raw row). sections[0] = main doctype, sections[1:] = child tables. data_rows : list[list[str]] The raw data rows (index 0 of each row is the empty leading cell). """ with open(filepath, newline="", encoding="utf-8") as fh: rows = list(csv.reader(fh)) label_row_idx = name_row_idx = data_start = None for i, row in enumerate(rows): first = row[0].strip() if row else "" if first == "Column Labels:": label_row_idx = i elif first == "Column Name:": name_row_idx = i elif "Start entering data below this line" in first: data_start = i + 1 if label_row_idx is None or name_row_idx is None or data_start is None: raise ValueError(f"Cannot locate header / data rows in {filepath}") labels = rows[label_row_idx] names = rows[name_row_idx] # Split into sections using the Column Name row ('~' separator) sections = [] cur_labels, cur_names, cur_start = [], [], 1 # skip col-0 prefix for i in range(1, max(len(labels), len(names))): nm = names[i].strip() if i < len(names) else "" lbl = labels[i].strip() if i < len(labels) else "" if nm == "~": if cur_labels: sections.append({ "labels": cur_labels, "names": cur_names, "start_idx": cur_start, }) cur_labels, cur_names, cur_start = [], [], i + 1 else: cur_labels.append(lbl) cur_names.append(nm) if cur_labels: sections.append({ "labels": cur_labels, "names": cur_names, "start_idx": cur_start, }) return sections, rows[data_start:] # --------------------------------------------------------------------------- # Template parser # --------------------------------------------------------------------------- def parse_template(filepath): """ Read a production template CSV (single header row). Returns ------- header : list[str] – the exact column labels section_names : set[str] – child-table section names found via 'ID (…)' """ with open(filepath, newline="", encoding="utf-8") as fh: header = list(csv.reader(fh))[0] section_names = set() for col in header: m = re.match(r"^ID \((.+)\)$", col) if m: section_names.add(m.group(1)) return header, section_names def _split_template_col(col, section_names): """ Decompose a template column into (base_label, section_name | None). 'Barcode (Barcodes)' -> ('Barcode', 'Barcodes') 'No of Months (Expense)' -> ('No of Months (Expense)', None) because 'Expense' is NOT a known child-table section name. """ for sname in section_names: suffix = f" ({sname})" if col.endswith(suffix): return col[: -len(suffix)], sname return col, None # --------------------------------------------------------------------------- # Section matching (child tables) # --------------------------------------------------------------------------- def _match_child_sections(template_child_labels, export_sections): """ Map each template child-section name -> export section index. Matches by column-label overlap (template labels ⊆ export labels). """ mapping = {} for tname, tlabels in template_child_labels.items(): best_idx, best_score = None, 0 for idx in range(1, len(export_sections)): elabels = set(export_sections[idx]["labels"]) score = sum(1 for tl in tlabels if tl in elabels) if score > best_score: best_score = score best_idx = idx if best_idx is not None and best_score > 0: mapping[tname] = best_idx return mapping # --------------------------------------------------------------------------- # Topological sort for hierarchical doctypes # --------------------------------------------------------------------------- def _topo_sort(rows, name_idx, parent_idx): """ Sort rows so that a parent row always appears before its children. Rows whose parent is empty or not in the dataset come first. """ by_name = {} # name -> row children = defaultdict(list) # parent_name -> [row, ...] roots = [] for row in rows: name = row[name_idx].strip() parent = row[parent_idx].strip() by_name[name] = row if not parent: roots.append(row) else: children[parent].append(row) # BFS from roots ordered = [] queue = list(roots) seen = set() while queue: current = queue.pop(0) cname = current[name_idx].strip() if cname in seen: continue seen.add(cname) ordered.append(current) for child in children.get(cname, []): queue.append(child) # Append any rows whose parent isn't in the dataset (orphans) for row in rows: rname = row[name_idx].strip() if rname not in seen: ordered.append(row) return ordered # --------------------------------------------------------------------------- # Build import-ready CSV # --------------------------------------------------------------------------- def build_import_csv(template_path, export_path, output_path, topo_sort_col=None, strip_link_cols=None): """ Create a clean import-ready CSV from a template + export pair. Parameters ---------- topo_sort_col : tuple(str, str) | None (name_col, parent_col) template column labels to topologically sort rows so parents appear before children. strip_link_cols : list[str] | None Template column labels whose values should be blanked out (e.g. Link fields referencing records that don't exist in production). """ header, section_names = parse_template(template_path) export_sections, data_rows = parse_export(export_path) # Collect child-table column labels by section from the template tmpl_child = {} # section_name -> [base_label, …] for col in header: base, sname = _split_template_col(col, section_names) if sname: tmpl_child.setdefault(sname, []).append(base) # Match template sections to export sections sec_map = _match_child_sections(tmpl_child, export_sections) # Build label -> export-column-index maps main_lbl_idx = { lbl: export_sections[0]["start_idx"] + i for i, lbl in enumerate(export_sections[0]["labels"]) } child_lbl_idx = {} for tname, eidx in sec_map.items(): esec = export_sections[eidx] child_lbl_idx[tname] = { lbl: esec["start_idx"] + i for i, lbl in enumerate(esec["labels"]) } # Map every template column to an export column index (or None) col_map = [] for col in header: base, sname = _split_template_col(col, section_names) if sname and sname in child_lbl_idx: col_map.append(child_lbl_idx[sname].get(base)) else: # Main-table column (or child section with no match -> None) col_map.append(main_lbl_idx.get(col if sname is None else base)) # Build all output rows first out_rows = [] for row in data_rows: if not row or all(c.strip() == "" for c in row): continue out = [] for idx in col_map: if idx is not None and idx < len(row): out.append(clean_cell(row[idx])) else: out.append("") if any(v.strip() for v in out): out_rows.append(out) # Strip Link columns whose targets won't exist in production if strip_link_cols: strip_idxs = [i for i, col in enumerate(header) if col in strip_link_cols] for out in out_rows: for si in strip_idxs: out[si] = "" # Topological sort so parent records are created before children if topo_sort_col: name_col, parent_col = topo_sort_col ni = header.index(name_col) pi = header.index(parent_col) out_rows = _topo_sort(out_rows, ni, pi) # Write output row_count = 0 with open(output_path, "w", newline="", encoding="utf-8") as fh: writer = csv.writer(fh) writer.writerow(header) # exact template header for out in out_rows: writer.writerow(out) row_count += 1 print(f" -> {output_path} ({row_count} data rows)") # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): # (template, export, output, topo_sort_col, strip_link_cols) # topo_sort_col = (name_col_label, parent_col_label) or None # strip_link_cols = list of template column labels to blank out item_group_strip = [ "ID (Item Group Defaults)", "Company (Item Group Defaults)", "Default Buying Cost Center (Item Group Defaults)", "Default Discount Account (Item Group Defaults)", "Default Expense Account (Item Group Defaults)", "Default Income Account (Item Group Defaults)", "Default Price List (Item Group Defaults)", "Default Provisional Account (Item Group Defaults)", "Default Selling Cost Center (Item Group Defaults)", "Default Supplier (Item Group Defaults)", "Default Warehouse (Item Group Defaults)", "Deferred Expense Account (Item Group Defaults)", "Deferred Revenue Account (Item Group Defaults)", ] pairs = [ ("Item-template.csv", "Item.csv", "Item-import-ready.csv", None, None), ("Item Price-template.csv", "Item Price.csv", "Item Price-import-ready.csv", None, None), ("BOM-template.csv", "BOM.csv", "BOM-import-ready.csv", None, None), ("Item Group-template.csv", "Item Group.csv", "Item Group-import-ready.csv", ("Item Group Name", "Parent Item Group"), item_group_strip), ("Supplier-template.csv", "Supplier.csv", "Supplier-import-ready.csv", None, None), ("User-template.csv", "User.csv", "User-import-ready.csv", None, None), ] for tmpl, export, output, topo, strip in pairs: tmpl_path = os.path.join(SCRIPT_DIR, tmpl) export_path = os.path.join(SCRIPT_DIR, export) output_path = os.path.join(SCRIPT_DIR, output) if not os.path.exists(tmpl_path): print(f" !! Template not found: {tmpl}") continue if not os.path.exists(export_path): print(f" !! Export not found: {export}") continue print(f"Processing {export} ...") build_import_csv(tmpl_path, export_path, output_path, topo_sort_col=topo, strip_link_cols=strip) print("\nDone.") if __name__ == "__main__": main()