Examples

This page collects worked examples that ship in pyx12.examples. Each demonstrates a different pattern for working with X12 documents, ordered roughly from low-level stream handling up to a full transaction-set to-JSON pipeline. Run any of them directly from a checkout — every script has a __main__ entry point — or copy the techniques into your own code.

Splitting on ST loops with X12Reader

st_iterator.py splits an X12 interchange that contains multiple ST/SE transaction sets into one X12 file per ST loop. It uses the streaming pyx12.x12file.X12Reader together with itertools.groupby() to chunk segments by their owning ST envelope, then writes each chunk through pyx12.x12file.X12Writer which auto-closes the ST / GS / ISA loops on Close(). The example also shows how to renumber the ISA control number (ISA13) and GS group control number (GS06) per output file.

This is the lowest-level approach — useful when you don’t need loop-aware parsing and want to keep memory flat over very large interchanges.

#! /usr/bin/env python
"""Split an X12 interchange that contains multiple ST/SE transaction sets into
one X12 file per ST loop, using ``pyx12.x12file.X12Reader`` and
``itertools.groupby``."""

import random
import sys
import tempfile
from itertools import groupby

import pyx12
import pyx12.error_handler
import pyx12.params
import pyx12.x12context
import pyx12.x12file


def x12_split_on_st(source_filename, isa_id=11, gs_id=21):
    src = pyx12.x12file.X12Reader(source_filename)
    idx = -1
    for k, g in groupby(get_headers_stream(src), lambda x: x[0]):
        idx += 1
        st_id = int(k["st_seg"].get_value("ST02"))
        fd_temp = tempfile.TemporaryFile(mode="w+", encoding="ascii")
        wr = pyx12.x12file.X12Writer(fd_temp)
        wr.Write(update_isa_id(k["isa_seg"], isa_id + idx))
        wr.Write(update_gs_id(k["gs_seg"], gs_id + idx))
        for seg in g:
            wr.Write(seg[1])
        wr.Close()  # Auto close ST, GS and ISA loops
        yield (isa_id + idx, gs_id + idx, st_id, fd_temp)


def save_many(src_filename, targetformat=None):
    base_isa_id = random.randint(1000, 999999999)
    base_gs_id = random.randint(100, 999999999)
    for isa_id, gs_id, st_id, fd_temp in x12_split_on_st(src_filename, base_isa_id, base_gs_id):
        if targetformat is not None:
            newname = targetformat.format(isa_id=isa_id, gs_id=gs_id, st_id=st_id)
        else:
            newname = f"newfile_{isa_id}.txt"
        with open(newname, "w", encoding="ascii") as fd_out:
            fd_temp.seek(0)
            fd_out.write(fd_temp.read())
            print((newname, isa_id, gs_id, st_id))


def update_isa_id(seg_data, isa_id):
    seg_data.set("ISA13", f"{int(isa_id):0>9}")
    return seg_data


def update_gs_id(seg_data, gs_id):
    seg_data.set("GS06", f"{int(gs_id)}")
    return seg_data


def get_headers_stream(segments):
    """
    passed a segment enumerable
    yields (isa_segment, gs_segment, st_segment, current_segment)
    """
    isa_seg = None
    gs_seg = None
    st_seg = None
    for seg_data in segments:
        seg_id = seg_data.get_seg_id()
        if seg_id == "ISA":
            isa_seg = seg_data
        elif seg_id == "GS":
            gs_seg = seg_data
        elif seg_id in ("IEA", "GE"):
            pass
        else:
            if seg_id == "ST":
                st_seg = seg_data
            k = {
                "isa_seg": isa_seg,
                "gs_seg": gs_seg,
                "st_seg": st_seg,
            }
            v = seg_data
            yield (k, v)


def iterate_2000(fd_in):
    param = pyx12.params.params()
    errh = pyx12.error_handler.errh_null()
    src = pyx12.x12context.X12ContextReader(param, errh, fd_in)
    isa_seg = None
    gs_seg = None
    st_id = None
    for datatree in src.iter_segments("2000"):
        if datatree.id == "ISA":
            for dt in datatree.iterate_segments():
                isa_seg = dt["segment"]
                break
        elif datatree.id == "GS":
            for dt in datatree.iterate_segments():
                gs_seg = dt["segment"]
                break
        elif datatree.id in ("IEA", "GE"):
            pass
        else:
            if datatree.id == "ST":
                st_id = datatree.get_value("ST02")
            for seg_node in datatree.iterate_segments():
                # do something with loop 2000
                k = {
                    "isa_seg": isa_seg,
                    "gs_seg": gs_seg,
                    "st_id": st_id,
                }
                v = seg_node["segment"]
                yield (k, v)


def main():
    testfile = "834_multiple_st_loops.txt"
    targetformat = None
    save_many(testfile, targetformat)
    return True


if __name__ == "__main__":
    sys.exit(not main())

Splitting on ST loops with X12ContextReader

st_context_iterator.py solves the same problem as st_iterator.py but parses the input through pyx12.x12context.X12ContextReader and its loop-aware iter_segments interface. Use this style when you also need to inspect or modify segments inside specific loops (here the 834’s 2000 member loops) while you split — the context reader keeps parent / child relationships and the implementation-guide map node attached to each yielded X12DataNode.

#! /usr/bin/env python
"""Split an X12 interchange into one file per ST/SE transaction set, the same
goal as ``st_iterator.py`` but demonstrating ``X12ContextReader`` for loop-aware
iteration over 834 enrollment 2000 loops."""

import random
import sys
import tempfile
from itertools import groupby

import pyx12
import pyx12.error_handler
import pyx12.params
import pyx12.x12context
import pyx12.x12file


def st_generator():
    """ """
    testfile = "834_multiple_st_loops.txt"
    # wr = edifile.WriteFile(conn)
    with open(testfile, encoding="ascii") as fd_in:
        isa_seg = None
        gs_seg = None
        isa_id = 11
        gs_id = 21
        # for k, g in groupby(iterate_2000(fd_in), lambda x: x['st_id']):
        for k, g in groupby(iterate_2000(fd_in), lambda x: x[0]):
            # yield (k, g)
            print("-----------------------------------------------------------")
            print(k)
            yield from g
            print("-----------------------------------------------------------")
        # for d in iterate_2000(fd_in):
        #    yield d


def simple_reader():
    testfile = "834_multiple_st_loops.txt"
    src = pyx12.x12file.X12Reader(testfile)
    # for d in get_headers_stream(src):
    #    print d
    for k, g in groupby(get_headers_stream(src), lambda x: x[0]):
        print("-----------------------------------------------------------")
        print(k)
        for d in g:
            # yield d
            print(d)
        print("-----------------------------------------------------------")


def x12_split_on_st(source_filename, isa_id=11, gs_id=21):
    src = pyx12.x12file.X12Reader(source_filename)
    idx = -1
    for k, g in groupby(get_headers_stream(src), lambda x: x[0]):
        idx += 1
        st_id = int(k["st_seg"].get_value("ST02"))
        fd_temp = tempfile.TemporaryFile(mode="w+", encoding="ascii")
        wr = pyx12.x12file.X12Writer(fd_temp, "~", "*", ":", "\n", "^")
        wr.Write(update_isa_id(k["isa_seg"], isa_id + idx))
        wr.Write(update_gs_id(k["gs_seg"], gs_id + idx))
        for seg in g:
            wr.Write(seg[1])
        wr.Close()
        yield (isa_id + idx, gs_id + idx, st_id, fd_temp)


def save_many(src_filename, targetformat=None):
    base_isa_id = random.randint(1000, 999999999)
    base_gs_id = random.randint(100, 999999999)
    for isa_id, gs_id, st_id, fd_temp in x12_split_on_st(src_filename, base_isa_id, base_gs_id):
        if targetformat is not None:
            newname = targetformat.format(isa_id=isa_id, gs_id=gs_id, st_id=st_id)
        else:
            newname = f"newfile_{isa_id}.txt"
        with open(newname, "w", encoding="ascii") as fd_out:
            fd_temp.seek(0)
            fd_out.write(fd_temp.read())
            print((newname, isa_id, gs_id, st_id))


def update_isa_id(seg_data, isa_id):
    seg_data.set("ISA13", f"{int(isa_id):0>9}")
    return seg_data


def update_gs_id(seg_data, gs_id):
    seg_data.set("GS06", f"{int(gs_id)}")
    return seg_data


def get_headers_stream(segments):
    """
    passed a segment enumerable
    yields (isa_segment, gs_segment, st_segment, current_segment)
    """
    isa_seg = None
    gs_seg = None
    st_seg = None
    for seg_data in segments:
        seg_id = seg_data.get_seg_id()
        if seg_id == "ISA":
            isa_seg = seg_data
        elif seg_id == "GS":
            gs_seg = seg_data
        elif seg_id in ("IEA", "GE"):
            pass
        else:
            if seg_id == "ST":
                st_seg = seg_data
                # st_id = st_seg.get_value('ST02')
            k = {
                "isa_seg": isa_seg,
                "gs_seg": gs_seg,
                "st_seg": st_seg,
            }
            v = seg_data
            yield (k, v)


def iterate_2000(fd_in):
    param = pyx12.params.params()
    errh = pyx12.error_handler.errh_null()
    src = pyx12.x12context.X12ContextReader(param, errh, fd_in)
    # isa_id = None
    # gs_id = None
    isa_seg = None
    gs_seg = None
    st_id = None
    for datatree in src.iter_segments("2000"):
        if datatree.id == "ISA":
            for dt in datatree.iterate_segments():
                isa_seg = dt["segment"]
                break
        elif datatree.id == "GS":
            for dt in datatree.iterate_segments():
                gs_seg = dt["segment"]
                break
        elif datatree.id in ("IEA", "GE"):
            pass
        else:
            if datatree.id == "ST":
                st_id = datatree.get_value("ST02")
            for seg_node in datatree.iterate_segments():
                # do something with loop 2000
                k = {
                    "isa_seg": isa_seg,
                    "gs_seg": gs_seg,
                    "st_id": st_id,
                }
                v = seg_node["segment"]
                yield (k, v)


def _get_unique_isa_id():
    """
    Generate a random, 4 to 9 character ISA ID
    """
    return f"{random.randint(1000, 999999999):0>9}"


def _get_unique_gs_id():
    """
    Generate a random, 3 to 9 character GS ID
    """
    return f"{random.randint(100, 999999999)}"


def _get_unique_st_id():
    """
    Generate a random, 4 to 9 character ST ID
    """
    # return '%04i' % (random.randint(10, 999999999))
    return f"{random.randint(100, 999999999):0>4}"


def main():
    testfile = "834_multiple_st_loops.txt"
    targetformat = None
    save_many(testfile, targetformat)
    return True


if __name__ == "__main__":
    sys.exit(not main())

De-identifying an 834 enrollment file

deident834.py walks an 834 Benefit Enrollment file and replaces every member’s PHI (name, SSN, Medicaid ID, DOB, address) with synthetic values, writing the rewritten X12 stream back out. It demonstrates the mutation pattern: pyx12.x12context.X12DataNode.set_value() lets you patch element values on a parsed loop in place, addressed by a path like 2100A/NM103 (member last name) or REF[0F]02 (subscriber identifier). Combined with a pyx12.x12file.X12Writer you can round-trip a transformed copy of the input back to disk.

This is a demo and is not production-ready de-identification — the substitution policy is illustrative only.

#! /usr/bin/env python
"""De-identify an 834 Enrollment file by replacing member demographics with
synthetic values.  Demo / not production-ready."""

import getopt
import logging
import os.path
import random
import sys

# Intrapackage imports
libpath = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
if os.path.isdir(libpath):
    sys.path.insert(0, libpath)

from collections import namedtuple

import pyx12
import pyx12.params
import pyx12.segment
import pyx12.x12context
import pyx12.x12file

__author__ = "John Holland"
__version__ = "1.0"
__date__ = "2015-02-12"

VERBOSE = 0

logger = logging.getLogger()
sub_idx = 0

Demographic = namedtuple(
    "Demographic",
    "primaryId, ssn, \
        medicaidId, dob, dod, firstname, lastname, middlename, street, street2, county",
)


class FakeDeidentify:
    def __init__(self):
        pass

    def getDeidentified(self, primaryId, datatree):
        demo = Demographic(
            primaryId,
            "99999999",
            "009999999",
            "19500101",
            "",
            "Joe",
            "Smith",
            "",
            "123 Elm",
            "",
            "99",
        )
        return demo


class RandomDeidentify:
    def __init__(self):
        self.identities = {}

    def getDeidentified(self, primaryId, datatree):
        if primaryId in self.identities:
            return self.identities[primaryId]
        demo = Demographic(
            primaryId=f"{random.randint(1000, 99999999999):0>10}",
            ssn=f"{random.randint(10000, 999999999):0>9}",
            medicaidId=f"{random.randint(1000, 99999999999):0>10}",
            dob="19520101",
            dod="",
            firstname="AA",
            lastname="Smith",
            middlename="",
            street=f"{random.randint(10, 9999)} Oak",
            street2="",
            county="98",
        )
        self.identities[primaryId] = demo
        return demo


def deidentify_file(fd_in):
    """ """
    param = pyx12.params.params()
    errh = pyx12.error_handler.errh_null()
    src = pyx12.x12context.X12ContextReader(param, errh, fd_in)
    # deident = FakeDeidentify()
    deident = RandomDeidentify()

    with open("newfile.txt", "w", encoding="ascii") as fd_out:
        wr = pyx12.x12file.X12Writer(fd_out)
        for datatree in src.iter_segments("2000"):
            if datatree.id == "2000":
                scrub2000(datatree, deident)
            for seg1 in datatree.iterate_segments():
                # wr.Write(seg1['segment'].format())
                print(seg1["segment"].format())


def scrub2000(loop_sub, deident):
    primaryId = loop_sub.get_value("2100A/NM109")
    demo = deident.getDeidentified(primaryId, loop_sub)
    loop_sub.set_value("INS12", demo.dod)
    loop_sub.set_value("REF[0F]02", demo.primaryId)
    loop_sub.set_value("2100A/NM103", demo.lastname)
    loop_sub.set_value("2100A/NM104", demo.firstname)
    loop_sub.set_value("2100A/NM105", demo.middlename)
    loop_sub.set_value("2100A/NM109", demo.medicaidId)
    loop_sub.set_value("2100A/N301", demo.street)
    loop_sub.set_value("2100A/N302", demo.street2)
    loop_sub.set_value("2100A/N406", demo.county)
    loop_sub.set_value("2100A/DMG02", demo.dob)


def usage():
    pgm_nme = os.path.basename(sys.argv[0])
    sys.stdout.write("%s %s (%s)\n" % (pgm_nme, __version__, __date__))
    sys.stdout.write("usage: %s [options] source_file\n" % (pgm_nme))
    sys.stdout.write("\noptions:\n")
    sys.stdout.write("  -h         Help\n")
    sys.stdout.write("  -d         Debug mode\n")
    sys.stdout.write("  -o output_directory \n")


def main():
    try:
        opts, args = getopt.getopt(sys.argv[1:], "dhv")
    except getopt.error as msg:
        usage()
        return False
    formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")

    stdout_hdlr = logging.StreamHandler()
    stdout_hdlr.setFormatter(formatter)
    logger.addHandler(stdout_hdlr)
    logger.setLevel(logging.INFO)

    for o, a in opts:
        if o == "-h":
            usage()
            return True
        if o == "-d":
            logger.setLevel(logging.DEBUG)
        if o == "-v":
            logger.setLevel(logging.DEBUG)

    for file_in in args:
        if not os.path.isfile(file_in):
            logger.error("File %s was not found" % (file_in))
            usage()
            return False
        # file_name = os.path.basename(file_in)
        fd_in = open(file_in, encoding="ascii")
        deidentify_file(fd_in)
    return True


if __name__ == "__main__":
    sys.exit(not main())

Parsing 834 enrollment into JSON

834_x12_json.py is a fuller pipeline: it parses an 834 file and emits one JSON document per ST transaction set, with the envelope headers, members, providers, and payers flattened into a structured object. The Enrollment834Parser class wraps pyx12.x12context.X12ContextReader, dispatches on datatree.id to capture envelope state (ISA / GS / ST / BGN), and descends into each 2000 member loop to extract a member record along with its nested 2100A (member name), 2100G (responsible person), 2300 (coverage), 2310 (provider), and 2320 (payer) loops.

It demonstrates the transformation pattern: how to use X12 path expressions like 2100A/NM109, DTP[356]03, or REF[0F]02 against a context node to pull values out of a deeply nested loop without writing per-segment scaffolding, and how to accumulate the results into nested OrderedDict\ s suitable for json.dumps.

"""Parse 834 Benefit Enrollment & Maintenance files into JSON, emitting one
JSON document per ST transaction set with header, members, providers, and
payers flattened into a structured object."""

import collections
import glob
import json
import logging
import os.path
import sys
import tempfile

import pyx12.error_handler
import pyx12.errors
import pyx12.params
import pyx12.x12context
from pyx12.examples.jsonTable import JsonInterface


class Enrollment834Parser:
    """Import an 834 eligibility file"""

    def __init__(self, fullpath, fd_in, run_type_code="P"):
        self.logger = logging.getLogger()
        param = pyx12.params.params()
        errh = pyx12.error_handler.errh_null()
        (file_base, _ext) = os.path.splitext(os.path.basename(fullpath))
        self.outfile_base = file_base
        try:
            self.src = pyx12.x12context.X12ContextReader(param, errh, fd_in, [])
        except pyx12.errors.X12Error as e:
            err_msg = "This does not look like a X12 data file"
            self.logger.error(err_msg)
            raise pyx12.errors.X12Error(err_msg) from e

    def parse_file(self, out_dir):
        """Convert an 834 X12 file into JSON files"""
        my_count = 0
        subscribers = []
        trn_set_idx = 0
        isa = gs = st = header = None
        for datatree in self.src.iter_segments("2000"):
            if datatree.id == "2000":
                my_count += 1
                subscriber = self.get_2000(datatree, header, my_count)
                subscribers.append(subscriber)
            elif datatree.id == "ISA":
                isa = JsonInterface()
                isa.add_string("SenderCode", datatree.get_value("ISA06"))
                isa.add_string("ReceiverCode", datatree.get_value("ISA08"))
                isa.add_int("ControlNumber", datatree.get_value("ISA13"))
                isa.add_datetime(
                    "InterchangeDate",
                    "20" + datatree.get_value("ISA09"),
                    datatree.get_value("ISA10"),
                )
                isa.add_string("Version", datatree.get_value("ISA12"))
                isa.add_string("UsageIndicator", datatree.get_value("ISA15"))
                isa.seg_count = datatree.seg_count
                isa.line_number = datatree.cur_line_number
                isa.parent_line_number = None
            elif datatree.id == "IEA":
                isa = None
            elif datatree.id == "GS":
                gs = JsonInterface()
                gs.add_string("SenderCode", datatree.get_value("GS02"))
                gs.add_string("ReceiverCode", datatree.get_value("GS03"))
                gs.add_datetime(
                    "CreationDate", datatree.get_value("GS04"), datatree.get_value("GS05")
                )
                gs.add_int("GroupControlNumber", datatree.get_value("GS06"))
                gs.add_string("Version", datatree.get_value("GS08"))
                gs.seg_count = datatree.seg_count
                gs.line_number = datatree.cur_line_number
                gs.parent_line_number = isa.line_number
            elif datatree.id == "GE":
                gs = None
            elif datatree.id == "ST":
                st = JsonInterface()
                st.add_string("IdentifierCode", datatree.get_value("ST01"))
                st.add_int("ControlNumber", datatree.get_value("ST02"))
                st.add_string("ImplementationGuideVersion", datatree.get_value("ST03"))
                st.seg_count = datatree.seg_count
                st.line_number = datatree.cur_line_number
                st.parent_line_number = gs.line_number
                subscribers = []
                my_count = 0
                trn_set_idx += 1
            elif datatree.id == "SE":
                header_od = self._make_headers(isa, gs, st, header)
                _h = collections.OrderedDict()
                _h["Header"] = header_od
                _h["Members"] = subscribers
                outfile = f"{self.outfile_base}.{trn_set_idx:04d}.v1.json"
                full_outfile = os.path.join(out_dir, outfile)
                if self._write_json(_h, full_outfile):
                    self.logger.info(f"Save JSON output to {full_outfile}")
                header = None
            elif datatree.id == "BGN":
                header = JsonInterface()
                header.add_string("TransactionSetPurposeCode", datatree.get_value("BGN01"))
                header.add_string("TransactionSetReferenceNumber", datatree.get_value("BGN02"))
                header.add_datetime(
                    "TransactionSetCreationDate",
                    datatree.get_value("BGN03"),
                    datatree.get_value("BGN04"),
                )
                header.add_string("TimeZoneCode", datatree.get_value("BGN05"))
                header.add_string(
                    "OriginalTransactionSetReferenceNumber", datatree.get_value("BGN06")
                )
                header.add_string("ActionCode", datatree.get_value("BGN08"))
            elif datatree.id == "DTP":
                if datatree.get_value("DTP01") == "007":
                    header.add_string("FileEffectiveDateQualifier", datatree.get_value("DTP01"))
                    header.add_date("FileEffectiveDate", datatree.get_value("DTP[007]03"))
                elif datatree.get_value("DTP01") == "303":
                    header.add_string("FileEffectiveDateQualifier", datatree.get_value("DTP01"))
                    header.add_date("FileEffectiveDate", datatree.get_value("DTP[303]03"))
            elif datatree.id == "QTY":
                header.add_int("MemberCount", datatree.get_value("QTY[TO]02"))
            elif datatree.cur_path.find("1000A") != -1 and datatree.id == "N1":
                header.add_string("SponsorEntityIdentifierCode", datatree.get_value("N101"))
                header.add_string("SponsorName", datatree.get_value("N102"))
                header.add_string("SponsorIdentificationCodeQualifier", datatree.get_value("N103"))
                header.add_string("SponsorIdentifier", datatree.get_value("N104"))
            elif datatree.cur_path.find("1000B") != -1 and datatree.id == "N1":
                header.add_string("InsurerEntityIdentifierCode", datatree.get_value("N101"))
                header.add_string("InsurerName", datatree.get_value("N102"))
                header.add_string("InsurerIdentificationCodeQualifier", datatree.get_value("N103"))
                header.add_string("InsurerIdentificationCode", datatree.get_value("N104"))
        return True

    def get_2000(self, loop_sub, st, idx):
        "2000 Member Loop"
        sub = JsonInterface()
        sub.seg_count = loop_sub.seg_count
        sub.line_number = loop_sub.cur_line_number
        sub.parent_line_number = st.line_number

        sub.add_int("MemberOrdinal", idx)
        sub.add_string("MemberIndicator", loop_sub.get_value("INS01"))
        sub.add_string("IndividualRelationshipCode", loop_sub.get_value("INS02"))
        sub.add_string("MaintenanceTypeCode", loop_sub.get_value("INS03"))
        sub.add_string("MaintenanceReasonCode", loop_sub.get_value("INS04"))
        sub.add_string("BenefitStatusCode", loop_sub.get_value("INS05"))
        sub.add_string("MedicarePlanCode", loop_sub.get_value("INS06-1"))
        sub.add_string("MedicareEligibilityReasonCode", loop_sub.get_value("INS06-2"))
        sub.add_string("CobraQualifyingEventCode", loop_sub.get_value("INS07"))
        sub.add_string("EmploymentStatusCode", loop_sub.get_value("INS08"))
        sub.add_string("StudentStatusCode", loop_sub.get_value("INS09"))
        sub.add_string("HandicapIndicator", loop_sub.get_value("INS10"))
        sub.add_date("DateOfDeath", loop_sub.get_value("INS12"))
        sub.add_int("BirthSequenceNumber", loop_sub.get_value("INS17"))
        sub.add_string("SubscriberIdentifier", loop_sub.get_value("REF[0F]02"))
        sub.add_string("PolicyNumber", loop_sub.get_value("REF[1L]02"))
        sub.add_string("CaseNumber", loop_sub.get_value("REF[3H]02"))
        sub.add_date("EligibilityBeginDate", loop_sub.get_value("DTP[356]03"))
        sub.add_date("EligibilityEndDate", loop_sub.get_value("DTP[357]03"))
        sub.add_date("MedicaidEndDate", loop_sub.get_value("DTP[474]03"))

        sub.add_string("MemberEntityIdentifierCode", loop_sub.get_value("2100A/NM101"))
        sub.add_string("MemberLastName", loop_sub.get_value("2100A/NM103"))
        sub.add_string("MemberFirstName", loop_sub.get_value("2100A/NM104"))
        sub.add_string("MemberMiddleName", loop_sub.get_value("2100A/NM105"))
        sub.add_string("MemberNamePrefix", loop_sub.get_value("2100A/NM106"))
        sub.add_string("MemberNameSuffix", loop_sub.get_value("2100A/NM107"))
        sub.add_string("MemberIdentificationCodeQualifier", loop_sub.get_value("2100A/NM108"))
        sub.add_string("MemberIdentifier", loop_sub.get_value("2100A/NM109"))
        if loop_sub.exists("2100A/PER"):
            per = self._get_per_contact(loop_sub, "2100A")
            if "EmailAddress" in per:
                sub.add_string("MemberEmail", per["EmailAddress"])
            if "FaxNumber" in per:
                sub.add_string("MemberFaxNumber", per["FaxNumber"])
            if "TelephoneNumber" in per:
                sub.add_string("MemberTelephoneNumber", per["TelephoneNumber"])
            if "TelephoneExtension" in per:
                sub.add_string("MemberTelephoneExtension", per["TelephoneExtension"])
            if "URI" in per:
                sub.add_string("MemberURI", per["URI"])

        sub.add_string("MemberAddressLine1", loop_sub.get_value("2100A/N301"))
        sub.add_string("MemberAddressLine2", loop_sub.get_value("2100A/N302"))
        sub.add_string("MemberCity", loop_sub.get_value("2100A/N401"))
        sub.add_string("MemberState", loop_sub.get_value("2100A/N402"))
        sub.add_string("MemberZipCode", loop_sub.get_value("2100A/N403"))
        sub.add_string("MemberCountry", loop_sub.get_value("2100A/N404"))
        sub.add_string("MemberLocationQualifier", loop_sub.get_value("2100A/N405"))
        sub.add_string("MemberLocationIdentifier", loop_sub.get_value("2100A/N406"))

        sub.add_date("DateOfBirth", loop_sub.get_value("2100A/DMG02"))
        sub.add_string("Gender", loop_sub.get_value("2100A/DMG03"))
        sub.add_string("MaritalStatusCode", loop_sub.get_value("2100A/DMG04"))
        sub.add_string("EthnicityCode", loop_sub.get_value("2100A/DMG05-01"))
        sub.add_string("ClassificationOfRaceOrEthnicity", loop_sub.get_value("2100A/DMG05-02"))
        sub.add_string("RaceOrEthnicityCode", loop_sub.get_value("2100A/DMG05-03"))
        sub.add_string("CitizenshipStatusCode", loop_sub.get_value("2100A/DMG06"))

        sub.add_string("IncomeFrequencyCode", loop_sub.get_value("2100A/ICM01"))
        sub.add_number("IncomeWageAmount", loop_sub.get_value("2100A/ICM02"))
        sub.add_number("IncomeWorkHoursCount", loop_sub.get_value("2100A/ICM03"))
        sub.add_string("IncomeLocationIdentificationCode", loop_sub.get_value("2100A/ICM04"))
        sub.add_string("IncomeSalaryGradeCode", loop_sub.get_value("2100A/ICM05"))

        sub.add_string("LanguageCodeQualifier", loop_sub.get_value("2100A/LUI01"))
        sub.add_string("LanguageCode", loop_sub.get_value("2100A/LUI02"))
        sub.add_string("LanguageDescription", loop_sub.get_value("2100A/LUI03"))
        sub.add_string("LanguageUseIndicator", loop_sub.get_value("2100A/LUI04"))

        ret = sub._asdict()

        if loop_sub.exists("2100B"):
            incorrect_member_fields = self.get_incorrect_member(loop_sub.first("2100B"))
            ret["IncorrectMember"] = incorrect_member_fields

        resps = []
        for resp_loop in loop_sub.select("2100G"):
            resp_per = self.get_responsible_person(resp_loop)
            resps.append(resp_per)
        ret["ResponsiblePerson"] = resps

        coverages = []
        for loop_node in loop_sub.select("2300"):
            cov = self.get_2300(loop_node, sub)
            coverages.append(cov)
        ret["Coverages"] = coverages
        return ret

    def get_2300(self, loop_2300, parent):
        "Coverage 2300 loop"
        clsub = JsonInterface()
        clsub.seg_count = loop_2300.seg_count
        clsub.line_number = loop_2300.cur_line_number
        clsub.parent_line_number = parent.line_number

        clsub.add_string("MaintenanceTypeCode", loop_2300.get_value("HD01"))
        clsub.add_string("InsuranceLineCode", loop_2300.get_value("HD03"))
        clsub.add_string("PlanCoverageDescription", loop_2300.get_value("HD04"))
        clsub.add_string("CoverageLevelCode", loop_2300.get_value("HD05"))
        clsub.add_date("BenefitBeginDate", loop_2300.get_value("DTP[348]03"))
        clsub.add_date("BenefitEndDate", loop_2300.get_value("DTP[349]03"))
        clsub.add_date("EnrollmentSignatureDate", loop_2300.get_value("DTP[300]03"))
        clsub.add_date("MaintenanceEffectiveDate", loop_2300.get_value("DTP[303]03"))
        clsub.add_date("PremiumPaidToEndDate", loop_2300.get_value("DTP[343]03"))
        clsub.add_date("LastPremiumPaidDate", loop_2300.get_value("DTP[543]03"))

        clsub.add_string("ClientReportingCategory", loop_2300.get_value("REF[17]02"))
        ret = clsub._asdict()

        providers = []
        for loop_node in loop_2300.select("2310"):
            provider = self.get_2310_fields(loop_node)
            providers.append(provider)
        ret["Providers"] = providers

        payers = []
        idx = 0
        for loop_node in loop_2300.select("2320"):
            idx += 1
            payer_fields = self.get_2320_fields(loop_node, idx)
            payers.append(payer_fields)
        ret["Payers"] = payers
        return ret

    def get_2310_fields(self, loop):
        "Provider loop table"
        prov = JsonInterface()
        prov.seg_count = loop.seg_count
        prov.line_number = loop.cur_line_number

        prov.add_int("LineCounter", int(loop.get_value("LX01")))
        prov.add_string("EntityIdentifierCode", loop.get_value("NM101"))
        prov.add_string("EntityTypeQualifier", loop.get_value("NM102"))
        prov.add_string("LastName", loop.get_value("NM103"))
        prov.add_string("FirstName", loop.get_value("NM104"))
        prov.add_string("MiddleName", loop.get_value("NM105"))
        prov.add_string("NamePrefix", loop.get_value("NM106"))
        prov.add_string("NameSuffix", loop.get_value("NM107"))
        prov.add_string("IdentifierCodeQualifier", loop.get_value("NM108"))
        prov.add_string("IdentificationCode", loop.get_value("NM109"))
        prov.add_string("EntityRelationshipCode", loop.get_value("NM110"))
        prov.add_string("Address1", loop.get_value("N301"))
        prov.add_string("Address2", loop.get_value("N302"))
        prov.add_string("City", loop.get_value("N401"))
        prov.add_string("State", loop.get_value("N402"))
        prov.add_string("ZipCode", loop.get_value("N403"))
        prov.add_string("Country", loop.get_value("N404"))

        if loop.exists("PER"):
            per = self._get_per_contact(loop, None)
            if "EmailAddress" in per:
                prov.add_string("Email", per["EmailAddress"])
            if "FaxNumber" in per:
                prov.add_string("FaxNumber", per["FaxNumber"])
            if "TelephoneNumber" in per:
                prov.add_string("TelephoneNumber", per["TelephoneNumber"])
            if "TelephoneExtension" in per:
                prov.add_string("TelephoneExtension", per["TelephoneExtension"])
            if "URI" in per:
                prov.add_string("URI", per["URI"])

        prov.add_date("EffectiveDate", loop.get_value("PLA03"))
        prov.add_string("ChangeReason", loop.get_value("PLA04"))
        return prov._asdict()

    def get_2320_fields(self, loop, idx):
        "Payer 2320 loop"
        pay = JsonInterface()
        pay.seg_count = loop.seg_count
        pay.line_number = loop.cur_line_number

        pay.add_int("PayerOrdinal", idx)
        pay.add_string("ResponsibilitySequenceNumberCode", loop.get_value("COB01"))
        pay.add_string("MemberGroupPolicyNumber", loop.get_value("COB02"))
        pay.add_string("CoordinationOfBenefitsCode", loop.get_value("COB03"))
        pay.add_string("SocialSecurityNumber", loop.get_value("REF[SY]02"))
        pay.add_string("PolicyNumberRefZz", loop.get_value("REF[ZZ]02"))
        pay.add_string("GroupNumber", loop.get_value("REF[6P]02"))
        pay.add_string("AccountSuffixCode", loop.get_value("REF[60]02"))
        pay.add_date("CoordinationOfBenefitsBeginDate", loop.get_value("DTP[344]03"))
        pay.add_date("CoordinationOfBenefitsEndDate", loop.get_value("DTP[345]03"))
        for ins_node in loop.select("2330"):
            if ins_node.get_value("NM101") == "IN":
                pay.add_string("PayerName", ins_node.get_value("NM103"))
                if ins_node.get_value("NM108") == "FI":
                    pay.add_string("PayerFederalTaxIdNumber", ins_node.get_value("NM109"))
                if ins_node.get_value("NM108") == "XV":
                    pay.add_string("PayerPlanID", ins_node.get_value("NM109"))
                pay.add_string("PayerAddress1", ins_node.get_value("N301"))
                pay.add_string("PayerAddress2", ins_node.get_value("N302"))
                pay.add_string("PayerCity", ins_node.get_value("N401"))
                pay.add_string("PayerState", ins_node.get_value("N402"))
                pay.add_string("PayerZipCode", ins_node.get_value("N403"))
                pay.add_string("PayerCountry", ins_node.get_value("N404"))

                if ins_node.exists("PER"):
                    per = self._get_per_contact(ins_node, None)
                    if "EmailAddress" in per:
                        pay.add_string("Email", per["EmailAddress"])
                    if "FaxNumber" in per:
                        pay.add_string("FaxNumber", per["FaxNumber"])
                    if "TelephoneNumber" in per:
                        pay.add_string("TelephoneNumber", per["TelephoneNumber"])
                    if "TelephoneExtension" in per:
                        pay.add_string("TelephoneExtension", per["TelephoneExtension"])
                    if "URI" in per:
                        pay.add_string("URI", per["URI"])
                break
        return pay._asdict()

    def get_incorrect_member(self, loop_sub):
        sub = JsonInterface()
        sub.add_string("EntityIdentifierCode", loop_sub.get_value("NM101"))
        sub.add_string("EntityTypeQualifier", loop_sub.get_value("NM102"))
        sub.add_string("LastName", loop_sub.get_value("NM103"))
        sub.add_string("FirstName", loop_sub.get_value("NM104"))
        sub.add_string("MiddleName", loop_sub.get_value("NM105"))
        sub.add_string("NamePrefix", loop_sub.get_value("NM106"))
        sub.add_string("NameSuffix", loop_sub.get_value("NM107"))
        sub.add_string("IdentificationCodeQualifier", loop_sub.get_value("NM108"))
        sub.add_string("InsuredIdentifier", loop_sub.get_value("NM109"))
        if loop_sub.get_value("NM108") == "34":
            sub.add_string("SocialSecurityNumber", loop_sub.get_value("NM109"))

        sub.add_date("DateOfBirth", loop_sub.get_value("DMG02"))
        sub.add_string("Gender", loop_sub.get_value("DMG03"))
        sub.add_string("MaritalStatusCode", loop_sub.get_value("DMG04"))
        if loop_sub.get_value("DMG05-01") != "":
            sub.add_string("EthnicityCode", loop_sub.get_value("DMG05-01"))
        if loop_sub.get_value("DMG05-02") != "":
            sub.add_string("ClassificationOfRaceOrEthnicity", loop_sub.get_value("DMG05-02"))
        if loop_sub.get_value("DMG05-03") != "":
            sub.add_string("RaceOrEthnicityCode", loop_sub.get_value("DMG05-03"))
        sub.add_string("CitizenshipStatusCode", loop_sub.get_value("DMG06"))
        return sub._asdict()

    def get_responsible_person(self, loop_sub):
        sub = JsonInterface()
        sub.add_string("EntityIdentifierCode", loop_sub.get_value("NM101"))
        sub.add_string("EntityTypeQualifier", loop_sub.get_value("NM102"))
        sub.add_string("LastName", loop_sub.get_value("NM103"))
        sub.add_string("FirstName", loop_sub.get_value("NM104"))
        sub.add_string("MiddleName", loop_sub.get_value("NM105"))
        sub.add_string("NamePrefix", loop_sub.get_value("NM106"))
        sub.add_string("NameSuffix", loop_sub.get_value("NM107"))
        sub.add_string("IdentificationCodeQualifier", loop_sub.get_value("NM108"))
        sub.add_string("ResponsiblePartyIdentifier", loop_sub.get_value("NM109"))
        if loop_sub.get_value("NM108") == "34":
            sub.add_string("SocialSecurityNumber", loop_sub.get_value("NM109"))

        if loop_sub.exists("PER"):
            per = self._get_per_contact(loop_sub, None)
            if "EmailAddress" in per:
                sub.add_string("Email", per["EmailAddress"])
            if "FaxNumber" in per:
                sub.add_string("FaxNumber", per["FaxNumber"])
            if "TelephoneNumber" in per:
                sub.add_string("TelephoneNumber", per["TelephoneNumber"])
            if "TelephoneExtension" in per:
                sub.add_string("TelephoneExtension", per["TelephoneExtension"])
            if "URI" in per:
                sub.add_string("URI", per["URI"])

        sub.add_string("Address1", loop_sub.get_value("N301"))
        sub.add_string("Address2", loop_sub.get_value("N302"))
        sub.add_string("City", loop_sub.get_value("N401"))
        sub.add_string("State", loop_sub.get_value("N402"))
        sub.add_string("ZipCode", loop_sub.get_value("N403"))
        sub.add_string("Country", loop_sub.get_value("N404"))
        return sub._asdict()

    def _get_per_contact(self, per_loop, loop_id):
        "Get list of PER contact info"
        per = {}
        prefix = f"{loop_id}/PER" if loop_id is not None else "PER"
        for idx in range(1, 4):
            per_qual = per_loop.get_value(f"{prefix}0{idx * 2 + 1}")
            per_value = per_loop.get_value(f"{prefix}0{idx * 2 + 2}")
            if per_qual is not None and per_value != "":
                if per_qual == "EM":
                    per["EmailAddress"] = per_value
                elif per_qual == "FX":
                    per["FaxNumber"] = per_value
                elif per_qual == "EX":
                    per["TelephoneExtension"] = per_value
                elif per_qual == "TE":
                    per["TelephoneNumber"] = per_value
                elif per_qual == "UR":
                    per["URI"] = per_value
        return per

    def _make_headers(self, isa, gs, st, header):
        ret = collections.OrderedDict()
        for k, v in isa.fields.items():
            if not k.startswith("Interchange"):
                ret["Interchange" + k] = v["value"]
            else:
                ret[k] = v["value"]
        for k, v in gs.fields.items():
            if not k.startswith("Functional"):
                ret["Functional" + k] = v["value"]
            else:
                ret[k] = v["value"]
        for k, v in st.fields.items():
            if not k.startswith("TransactionSet"):
                ret["TransactionSet" + k] = v["value"]
            else:
                ret[k] = v["value"]
        for k, v in header.fields.items():
            ret[k] = v["value"]
        return ret

    def _write_json(self, header, outfile):
        try:
            with open(outfile, "w") as fd_final:
                fd_final.write(
                    json.dumps(header, indent=2, sort_keys=False, separators=(",", ": "))
                )
                return True
        except OSError:
            self.logger.warning(f"Failed to write {outfile}")
        return False


def parse_file(fullname, out_dir="."):
    logger = logging.getLogger()
    if not os.path.isfile(fullname):
        raise Exception(f"File {fullname} was not found")
    if not os.path.isdir(out_dir):
        logger.exception("outdir does not exist")

    with tempfile.TemporaryFile(mode="w+", encoding="ascii") as fd_src:
        with open(fullname, encoding="ascii") as fd_dx:
            fd_src.write(fd_dx.read())
        fd_src.seek(0)
        my834 = Enrollment834Parser(fullname, fd_src)
        print(fullname)
        try:
            my834.parse_file(out_dir)
        except Exception:
            logger.exception("Failed to parse file")


def main():
    """Script main program"""
    import argparse

    parser = argparse.ArgumentParser(description="834 eligibility to JSON parser")
    parser.add_argument("--verbose", "-v", action="count", default=0)
    parser.add_argument("--debug", "-d", action="store_true")
    parser.add_argument(
        "--out-dir",
        "-o",
        default=".",
        help="directory to write JSON output files (default: current directory)",
    )
    parser.add_argument("files", metavar="N", nargs="*", help="input files")
    args = parser.parse_args()

    logger = logging.getLogger()
    formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")

    stdout_hdlr = logging.StreamHandler()
    stdout_hdlr.setFormatter(formatter)
    logger.addHandler(stdout_hdlr)
    logger.setLevel(logging.INFO)

    if args.debug or args.verbose > 0:
        logger.setLevel(logging.DEBUG)

    if args.files:
        for glob_pattern in args.files:
            files = glob.glob(glob_pattern)
            print(files)
            for full_name in files:
                parse_file(full_name, args.out_dir)
    else:
        files = ["834_multiple_st_loops.txt"]
        print(files)
        for full_name in files:
            parse_file(full_name, args.out_dir)

    return True


if __name__ == "__main__":
    sys.exit(not main())

Generating a field reference from a sample file

The next two scripts form a small pipeline that derives a human-readable field reference for whatever transaction type a sample X12 file contains. This is useful when you’re integrating a new transaction (or a new vendor’s flavor of an existing one) and want to see exactly which loops, segments, and elements show up — together with the implementation-guide metadata pyx12 has for each.

node_iterator.py walks the input file with pyx12.x12file.X12Reader plus a pyx12.map_walker.walk_tree, manually driving the walker the same way pyx12.x12n_document.x12n_document() does internally, and records every distinct map node it encounters along with its base_name, id, name, usage, data_type, and length bounds. The result is written to node_list.json next to the input file. This is also a worked example of using the map walker directly, picking the right map via pyx12.map_index.map_index, and switching maps mid-stream when a 4010 837 reveals its tspc in the BHT segment.

#!/usr/bin/env python
"""Walk an X12 file with the map walker and write a ``node_list.json`` catalog
of every loop / segment / element node encountered, with type, usage, and
length metadata.  Output feeds ``generate_spec.py``."""

import argparse
import json
import logging
import os
import os.path
import sys

libpath = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
if os.path.isdir(libpath):
    sys.path.insert(0, libpath)

# Intrapackage imports
import pyx12.error_handler
import pyx12.errors
import pyx12.map_if
import pyx12.map_index
import pyx12.params
import pyx12.x12file
from pyx12.map_walker import apply_walk_errors, walk_tree

__version__ = "1.0.0"


def x12n_iterator(param, src_file, map_path=None):
    logger = logging.getLogger("pyx12")
    errh = pyx12.error_handler.errh_null()

    # Get X12 DATA file
    try:
        src = pyx12.x12file.X12Reader(src_file)
    except pyx12.errors.X12Error:
        logger.error('"%s" does not look like an X12 data file' % (src_file))
        return False

    # Get Map of Control Segments
    map_file = "x12.control.00501.xml" if src.icvn == "00501" else "x12.control.00401.xml"
    logger.debug("X12 control file: %s" % (map_file))
    control_map = pyx12.map_if.load_map_file(map_file, param, map_path)
    map_index_if = pyx12.map_index.map_index(map_path)
    node = control_map.getnodebypath("/ISA_LOOP/ISA")
    walker = walk_tree()
    icvn = fic = vriic = tspc = None
    cur_map = None  # we do not initially know the X12 transaction type

    res = {}
    res_ordinal = 0
    last_x12_segment_path = None
    for seg in src:
        # find node
        orig_node = node

        if seg.get_seg_id() == "ISA":
            node = control_map.getnodebypath("/ISA_LOOP/ISA")
            walker.forceWalkCounterToLoopStart("/ISA_LOOP", "/ISA_LOOP/ISA")
        elif seg.get_seg_id() == "GS":
            node = control_map.getnodebypath("/ISA_LOOP/GS_LOOP/GS")
            walker.forceWalkCounterToLoopStart("/ISA_LOOP/GS_LOOP", "/ISA_LOOP/GS_LOOP/GS")
        else:
            # from the current node, find the map node matching the segment
            # keep track of the loops traversed
            try:
                (node, pop_loops, push_loops, walk_errors) = walker.walk_errors(
                    node, seg, src.get_seg_count(), src.get_cur_line(), src.get_ls_id()
                )
                apply_walk_errors(errh, walk_errors)
            except pyx12.errors.EngineError:
                logger.error("Source file line %i" % (src.get_cur_line()))
                raise

        if node is None:
            node = orig_node
        else:
            if seg.get_seg_id() == "ISA":
                icvn = seg.get_value("ISA12")
            elif seg.get_seg_id() == "IEA":
                pass
            elif seg.get_seg_id() == "GS":
                fic = seg.get_value("GS01")
                vriic = seg.get_value("GS08")
                map_file_new = map_index_if.get_filename(icvn, vriic, fic)
                if map_file != map_file_new:
                    map_file = map_file_new
                    if map_file is None:
                        err_str = f"Map not found.  icvn={icvn}, fic={fic}, vriic={vriic}"
                        raise pyx12.errors.EngineError(err_str)
                    cur_map = pyx12.map_if.load_map_file(map_file, param, map_path)
                    src.check_837_lx = True if cur_map.id == "837" else False
                    logger.debug("Map file: %s" % (map_file))
                node = cur_map.getnodebypath("/ISA_LOOP/GS_LOOP/GS")
                pass
            elif seg.get_seg_id() == "BHT":
                # special case for 4010 837P
                if vriic in ("004010X094", "004010X094A1"):
                    tspc = seg.get_value("BHT02")
                    logger.debug("icvn=%s, fic=%s, vriic=%s, tspc=%s" % (icvn, fic, vriic, tspc))
                    map_file_new = map_index_if.get_filename(icvn, vriic, fic, tspc)
                    logger.debug("New map file: %s" % (map_file_new))
                    if map_file != map_file_new:
                        map_file = map_file_new
                        if map_file is None:
                            err_str = f"Map not found.  icvn={icvn}, fic={fic}, vriic={vriic}, tspc={tspc}"
                            raise pyx12.errors.EngineError(err_str)
                        cur_map = pyx12.map_if.load_map_file(map_file, param, map_path)
                        src.check_837_lx = True if cur_map.id == "837" else False
                        logger.debug("Map file: %s" % (map_file))
                        # apply_loop_count(node, cur_map)
                        node = cur_map.getnodebypath("/ISA_LOOP/GS_LOOP/ST_LOOP/HEADER/BHT")
            # elif seg.get_seg_id() == 'GE':
            #    pass
            # elif seg.get_seg_id() == 'ST':
            #    pass
            # elif seg.get_seg_id() == 'SE':
            #    pass
            else:
                pass

        x12path = node.get_path()
        # parent
        if x12path in res:
            res[x12path]["Count"] += 1
            if last_x12_segment_path not in res[x12path]["prefix_nodes"]:
                res[x12path]["prefix_nodes"].append(last_x12_segment_path)
        else:
            res[x12path] = {
                "Ordinal": res_ordinal,
                "Count": 1,
                "NodeType": node.base_name,
                "Id": node.id,
                "Name": node.name,
                "FormattedName": clean_name(node.name),
                "ParentName": clean_name(node.parent.name),
                "LoopMaxUse": node.max_use,
                "ParentPath": node.parent.get_path(),
                "prefix_nodes": [last_x12_segment_path],
            }
            res_ordinal += 1

        for refdes, ele_ord, comp_ord, val in seg.values_iterator():
            elepath = node.parent.get_path() + "/" + refdes
            if elepath in res:
                res[elepath]["Count"] += 1
            else:
                ele_node = node.getnodebypath2(refdes)
                # node.get_child_node_by_ordinal(
                res[elepath] = {
                    "Ordinal": res_ordinal,
                    "Count": 1,
                    "NodeType": ele_node.base_name,
                    "Id": ele_node.id,
                    "Name": ele_node.name,
                    "FormattedName": clean_name(ele_node.name),
                    "ParentName": clean_name(ele_node.parent.name),
                    #'max_use': ele_node.max_use,
                    "ParentPath": ele_node.parent.get_path(),
                    "Usage": ele_node.usage,
                    "DataType": ele_node.data_type,
                    "MinLength": ele_node.min_len,
                    "MaxLength": ele_node.max_len,
                }
                res_ordinal += 1

            # print (refdes, val)
        last_x12_segment_path = x12path

    del node
    del src
    del control_map
    try:
        del cur_map
    except UnboundLocalError:
        pass
    return res


def clean_name(name):
    return name.replace(" ", "").replace("/", "").replace("'", "")


def check_map_path_arg(map_path):
    if not os.path.isdir(map_path):
        raise argparse.ArgumentError(None, f"The MAP_PATH '{map_path}' is not a valid directory")
    index_file = "maps.xml"
    if not os.path.isfile(os.path.join(map_path, index_file)):
        raise argparse.ArgumentError(
            None,
            f"The MAP_PATH '{map_path}' does not contain the map index file '{index_file}'",
        )
    return map_path


def main():
    """
    Set up environment for processing
    """
    parser = argparse.ArgumentParser(description="X12 Validation")
    parser.add_argument("--config-file", "-c", action="store", dest="configfile", default=None)
    parser.add_argument("--log-file", "-l", action="store", dest="logfile", default=None)
    parser.add_argument(
        "--map-path", "-m", action="store", dest="map_path", default=None, type=check_map_path_arg
    )
    parser.add_argument("--verbose", "-v", action="count", default=0)
    parser.add_argument("--debug", "-d", action="store_true")
    parser.add_argument("--quiet", "-q", action="store_true")
    parser.add_argument("--html", "-H", action="store_true")
    parser.add_argument(
        "--version",
        action="version",
        version=f"{parser.prog} {__version__}",
    )
    parser.add_argument("input_files", nargs="*")
    args = parser.parse_args()

    logger = logging.getLogger("pyx12")
    formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")

    stdout_hdlr = logging.StreamHandler()
    stdout_hdlr.setFormatter(formatter)
    logger.addHandler(stdout_hdlr)
    logger.setLevel(logging.INFO)

    param = pyx12.params.params(args.configfile)
    if args.debug:
        logger.setLevel(logging.DEBUG)
        param.set("debug", True)
    if args.verbose > 0:
        logger.setLevel(logging.DEBUG)
    if args.quiet:
        logger.setLevel(logging.ERROR)
    if args.map_path:
        param.set("map_path", args.map_path)

    if args.logfile:
        try:
            hdlr = logging.FileHandler(args.logfile)
            hdlr.setFormatter(formatter)
            logger.addHandler(hdlr)
        except OSError:
            logger.exception("Could not open log file: %s" % (args.logfile))

    for src_filename in args.input_files:
        try:
            if not os.path.isfile(src_filename):
                logger.error('Could not open file "%s"' % (src_filename))
                continue
            res = x12n_iterator(param=param, src_file=src_filename, map_path=args.map_path)
            json_file = os.path.join(
                os.path.dirname(os.path.abspath(src_filename)), "node_list.json"
            )
            with open(json_file, "w") as fd:
                json.dump(res, fd, indent=4)

        except OSError:
            logger.exception("Could not open files")
            return False
        except KeyboardInterrupt:
            print("\n[interrupt]")
        # except Exception as e:
        #    raise e
    return True


if __name__ == "__main__":
    sys.exit(not main())

generate_spec.py consumes the node_list.json and emits two artifacts:

  • out.csv — one row per node with all of the metadata fields, suitable for review in a spreadsheet.

  • map.json — element nodes grouped by the section of the transaction they belong to (Header, Patient, Claim, ServiceLine, ProviderStatus, …), with the section heuristics applied to 277CA loop names like 2200D / 2220D. The element list per section collapses duplicate FormattedName collisions by prefixing the parent loop name.

#!/usr/bin/env python
"""Convert the ``node_list.json`` produced by ``node_iterator.py`` into a
``out.csv`` field reference and a sectioned ``map.json`` field-mapping file."""

import argparse
import json
import logging
import os.path
import sys

libpath = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
if os.path.isdir(libpath):
    sys.path.insert(0, libpath)

__version__ = "1.0.0"


def clean_name(name):
    return name.replace(" ", "").replace("/", "").replace("'", "")


def check_map_path_arg(map_path):
    if not os.path.isdir(map_path):
        raise argparse.ArgumentError(None, f"The MAP_PATH '{map_path}' is not a valid directory")
    index_file = "maps.xml"
    if not os.path.isfile(os.path.join(map_path, index_file)):
        raise argparse.ArgumentError(
            None,
            f"The MAP_PATH '{map_path}' does not contain the map index file '{index_file}'",
        )
    return map_path


def save_csv(rows, csv_file):
    import csv

    fields = [
        "Ordinal",
        "Id",
        "NodeType",
        "Name",
        "FormattedName",
        "Count",
        "Section",
        "RelativePath",
        "FullPath",
        "ParentPath",
        "ParentName",
        "LoopMaxUse",
        "Usage",
        "DataType",
        "MinLength",
        "MaxLength",
    ]
    with open(csv_file, "w", newline="", encoding="utf-8") as outfile:
        writer = csv.DictWriter(
            outfile, fieldnames=fields, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL
        )
        writer.writeheader()
        rows.sort(key=lambda item: item["Ordinal"])
        for row in rows:
            writer.writerow(row)


def save_mapping(rows, json_file):
    sections = sorted(list(set([x["Section"] for x in rows])))
    maps = {}
    with open(json_file, "w") as fd:
        fd.write("{")
        for s in sections:
            fd.write(f'"{s}": [')
            s = [
                {
                    "Id": x["Id"],
                    "Ordinal": x["Ordinal"],
                    "Type": x["DataType"] if "DataType" in x else None,
                    "FieldName": x["FormattedName"],
                    "X12Path": x["RelativePath"],
                    "FullPath": x["FullPath"],
                    "ParentPath": x["ParentPath"],
                    "ParentName": x["ParentName"],
                    "Usage": x["Usage"],
                    "MaxLength": x["MaxLength"],
                }
                for x in rows
                if x["Section"] == s and x["NodeType"] == "element"
            ]
            s.sort(key=lambda item: item["Ordinal"])
            for item in s:
                fitem = json.dumps(item)
                fd.write(f"\n\t{fitem},")
            fd.write("\n],\n")
        fd.write("}")


def make_dict(data):
    rows = []
    for k, v in data.items():
        if v["Id"] in ("IEA02", "GE02", "SE02", "ST03"):
            continue
        row = v
        row["FullPath"] = k
        if "2220D" in k and row["Id"].startswith("STC"):
            row["Section"] = "ServiceLineStatus"
        elif "2220D" in k:
            row["Section"] = "ServiceLine"
        elif "2200D" in k and row["Id"].startswith("STC"):
            row["Section"] = "ClaimStatus"
        elif "2200D" in k:
            row["Section"] = "Claim"
        elif "2000D" in k:
            row["Section"] = "Patient"
        elif "2000C" in k and row["Id"].startswith("STC"):
            row["Section"] = "BillingProviderStatus"
        elif "2000C" in k:
            row["Section"] = "BillingProvider"
        elif "2200B" in k and row["Id"].startswith("STC"):
            row["Section"] = "InformationReceiverStatus"
        elif "2000A" in k:
            row["Section"] = "Header"
        else:
            row["Section"] = "Batch"
        rows.append(row)
    base_paths = {}
    for row in rows:
        section = row["Section"]
        if section not in base_paths:
            base_paths[section] = row["ParentPath"]
        elif len(base_paths[section]) > len(row["ParentPath"]):
            base_paths[section] = row["ParentPath"]
    for row in rows:
        basepath = base_paths[row["Section"]]
        if row["FullPath"].startswith(basepath):
            row["RelativePath"] = row["FullPath"][len(basepath) + 1 :]
    for section in list(set([r["Section"] for r in rows])):
        fields = [r for r in rows if r["Section"] == section and r["NodeType"] == "element"]
        fieldnames = [f["FormattedName"] for f in fields]
        duplicate_fieldnames = set([f for f in fieldnames if fieldnames.count(f) > 1])
        for row in [
            r
            for r in rows
            if r["Section"] == section
            and r["NodeType"] == "element"
            and r["FormattedName"] in duplicate_fieldnames
        ]:
            row["FormattedName"] = row["ParentName"] + row["FormattedName"]
    return rows


def main():
    """
    Set up environment for processing
    """
    parser = argparse.ArgumentParser(description="Gen X12 Sepcs")
    parser.add_argument("--config-file", "-c", action="store", dest="configfile", default=None)
    parser.add_argument("--log-file", "-l", action="store", dest="logfile", default=None)
    parser.add_argument(
        "--map-path", "-m", action="store", dest="map_path", default=None, type=check_map_path_arg
    )
    parser.add_argument("--verbose", "-v", action="count", default=0)
    parser.add_argument("--debug", "-d", action="store_true")
    parser.add_argument("--quiet", "-q", action="store_true")
    parser.add_argument("--html", "-H", action="store_true")
    parser.add_argument(
        "--version",
        action="version",
        version=f"{parser.prog} {__version__}",
    )
    parser.add_argument("input_files", nargs="*")
    args = parser.parse_args()

    logger = logging.getLogger("pyx12")
    formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")

    stdout_hdlr = logging.StreamHandler()
    stdout_hdlr.setFormatter(formatter)
    logger.addHandler(stdout_hdlr)
    logger.setLevel(logging.INFO)

    if args.debug:
        logger.setLevel(logging.DEBUG)
    if args.verbose > 0:
        logger.setLevel(logging.DEBUG)
    if args.quiet:
        logger.setLevel(logging.ERROR)

    if args.logfile:
        try:
            hdlr = logging.FileHandler(args.logfile)
            hdlr.setFormatter(formatter)
            logger.addHandler(hdlr)
        except OSError:
            logger.exception("Could not open log file: %s" % (args.logfile))

    src_filename = args.input_files[0]
    json_file = os.path.join(os.path.dirname(os.path.abspath(src_filename)), "node_list.json")
    with open(json_file) as fd:
        res = json.load(fd)
    rows = make_dict(res)

    csv_file = os.path.join(os.path.dirname(os.path.abspath(src_filename)), "out.csv")
    save_csv(rows, csv_file)

    json_map_file = os.path.join(os.path.dirname(os.path.abspath(src_filename)), "map.json")
    save_mapping(rows, json_map_file)

    return True


if __name__ == "__main__":
    sys.exit(not main())