Examples¶
This page collects worked examples that ship in pyx12.examples.
Each demonstrates a different pattern for working with X12 documents,
ordered roughly from low-level stream handling up to a full transaction-set
to-JSON pipeline. Run any of them directly from a checkout — every script
has a __main__ entry point — or copy the techniques into your own
code.
Splitting on ST loops with X12Reader¶
st_iterator.py splits an
X12 interchange that contains multiple ST/SE transaction sets into one
X12 file per ST loop. It uses the streaming pyx12.x12file.X12Reader
together with itertools.groupby() to chunk segments by their owning
ST envelope, then writes each chunk through pyx12.x12file.X12Writer
which auto-closes the ST / GS / ISA loops on Close(). The example also
shows how to renumber the ISA control number (ISA13) and GS group
control number (GS06) per output file.
This is the lowest-level approach — useful when you don’t need loop-aware parsing and want to keep memory flat over very large interchanges.
#! /usr/bin/env python
"""Split an X12 interchange that contains multiple ST/SE transaction sets into
one X12 file per ST loop, using ``pyx12.x12file.X12Reader`` and
``itertools.groupby``."""
import random
import sys
import tempfile
from itertools import groupby
import pyx12
import pyx12.error_handler
import pyx12.params
import pyx12.x12context
import pyx12.x12file
def x12_split_on_st(source_filename, isa_id=11, gs_id=21):
src = pyx12.x12file.X12Reader(source_filename)
idx = -1
for k, g in groupby(get_headers_stream(src), lambda x: x[0]):
idx += 1
st_id = int(k["st_seg"].get_value("ST02"))
fd_temp = tempfile.TemporaryFile(mode="w+", encoding="ascii")
wr = pyx12.x12file.X12Writer(fd_temp)
wr.Write(update_isa_id(k["isa_seg"], isa_id + idx))
wr.Write(update_gs_id(k["gs_seg"], gs_id + idx))
for seg in g:
wr.Write(seg[1])
wr.Close() # Auto close ST, GS and ISA loops
yield (isa_id + idx, gs_id + idx, st_id, fd_temp)
def save_many(src_filename, targetformat=None):
base_isa_id = random.randint(1000, 999999999)
base_gs_id = random.randint(100, 999999999)
for isa_id, gs_id, st_id, fd_temp in x12_split_on_st(src_filename, base_isa_id, base_gs_id):
if targetformat is not None:
newname = targetformat.format(isa_id=isa_id, gs_id=gs_id, st_id=st_id)
else:
newname = f"newfile_{isa_id}.txt"
with open(newname, "w", encoding="ascii") as fd_out:
fd_temp.seek(0)
fd_out.write(fd_temp.read())
print((newname, isa_id, gs_id, st_id))
def update_isa_id(seg_data, isa_id):
seg_data.set("ISA13", f"{int(isa_id):0>9}")
return seg_data
def update_gs_id(seg_data, gs_id):
seg_data.set("GS06", f"{int(gs_id)}")
return seg_data
def get_headers_stream(segments):
"""
passed a segment enumerable
yields (isa_segment, gs_segment, st_segment, current_segment)
"""
isa_seg = None
gs_seg = None
st_seg = None
for seg_data in segments:
seg_id = seg_data.get_seg_id()
if seg_id == "ISA":
isa_seg = seg_data
elif seg_id == "GS":
gs_seg = seg_data
elif seg_id in ("IEA", "GE"):
pass
else:
if seg_id == "ST":
st_seg = seg_data
k = {
"isa_seg": isa_seg,
"gs_seg": gs_seg,
"st_seg": st_seg,
}
v = seg_data
yield (k, v)
def iterate_2000(fd_in):
param = pyx12.params.params()
errh = pyx12.error_handler.errh_null()
src = pyx12.x12context.X12ContextReader(param, errh, fd_in)
isa_seg = None
gs_seg = None
st_id = None
for datatree in src.iter_segments("2000"):
if datatree.id == "ISA":
for dt in datatree.iterate_segments():
isa_seg = dt["segment"]
break
elif datatree.id == "GS":
for dt in datatree.iterate_segments():
gs_seg = dt["segment"]
break
elif datatree.id in ("IEA", "GE"):
pass
else:
if datatree.id == "ST":
st_id = datatree.get_value("ST02")
for seg_node in datatree.iterate_segments():
# do something with loop 2000
k = {
"isa_seg": isa_seg,
"gs_seg": gs_seg,
"st_id": st_id,
}
v = seg_node["segment"]
yield (k, v)
def main():
testfile = "834_multiple_st_loops.txt"
targetformat = None
save_many(testfile, targetformat)
return True
if __name__ == "__main__":
sys.exit(not main())
Splitting on ST loops with X12ContextReader¶
st_context_iterator.py
solves the same problem as st_iterator.py but parses the input
through pyx12.x12context.X12ContextReader and its loop-aware
iter_segments interface. Use this style when you also need to inspect
or modify segments inside specific loops (here the 834’s 2000 member
loops) while you split — the context reader keeps parent / child
relationships and the implementation-guide map node attached to each
yielded X12DataNode.
#! /usr/bin/env python
"""Split an X12 interchange into one file per ST/SE transaction set, the same
goal as ``st_iterator.py`` but demonstrating ``X12ContextReader`` for loop-aware
iteration over 834 enrollment 2000 loops."""
import random
import sys
import tempfile
from itertools import groupby
import pyx12
import pyx12.error_handler
import pyx12.params
import pyx12.x12context
import pyx12.x12file
def st_generator():
""" """
testfile = "834_multiple_st_loops.txt"
# wr = edifile.WriteFile(conn)
with open(testfile, encoding="ascii") as fd_in:
isa_seg = None
gs_seg = None
isa_id = 11
gs_id = 21
# for k, g in groupby(iterate_2000(fd_in), lambda x: x['st_id']):
for k, g in groupby(iterate_2000(fd_in), lambda x: x[0]):
# yield (k, g)
print("-----------------------------------------------------------")
print(k)
yield from g
print("-----------------------------------------------------------")
# for d in iterate_2000(fd_in):
# yield d
def simple_reader():
testfile = "834_multiple_st_loops.txt"
src = pyx12.x12file.X12Reader(testfile)
# for d in get_headers_stream(src):
# print d
for k, g in groupby(get_headers_stream(src), lambda x: x[0]):
print("-----------------------------------------------------------")
print(k)
for d in g:
# yield d
print(d)
print("-----------------------------------------------------------")
def x12_split_on_st(source_filename, isa_id=11, gs_id=21):
src = pyx12.x12file.X12Reader(source_filename)
idx = -1
for k, g in groupby(get_headers_stream(src), lambda x: x[0]):
idx += 1
st_id = int(k["st_seg"].get_value("ST02"))
fd_temp = tempfile.TemporaryFile(mode="w+", encoding="ascii")
wr = pyx12.x12file.X12Writer(fd_temp, "~", "*", ":", "\n", "^")
wr.Write(update_isa_id(k["isa_seg"], isa_id + idx))
wr.Write(update_gs_id(k["gs_seg"], gs_id + idx))
for seg in g:
wr.Write(seg[1])
wr.Close()
yield (isa_id + idx, gs_id + idx, st_id, fd_temp)
def save_many(src_filename, targetformat=None):
base_isa_id = random.randint(1000, 999999999)
base_gs_id = random.randint(100, 999999999)
for isa_id, gs_id, st_id, fd_temp in x12_split_on_st(src_filename, base_isa_id, base_gs_id):
if targetformat is not None:
newname = targetformat.format(isa_id=isa_id, gs_id=gs_id, st_id=st_id)
else:
newname = f"newfile_{isa_id}.txt"
with open(newname, "w", encoding="ascii") as fd_out:
fd_temp.seek(0)
fd_out.write(fd_temp.read())
print((newname, isa_id, gs_id, st_id))
def update_isa_id(seg_data, isa_id):
seg_data.set("ISA13", f"{int(isa_id):0>9}")
return seg_data
def update_gs_id(seg_data, gs_id):
seg_data.set("GS06", f"{int(gs_id)}")
return seg_data
def get_headers_stream(segments):
"""
passed a segment enumerable
yields (isa_segment, gs_segment, st_segment, current_segment)
"""
isa_seg = None
gs_seg = None
st_seg = None
for seg_data in segments:
seg_id = seg_data.get_seg_id()
if seg_id == "ISA":
isa_seg = seg_data
elif seg_id == "GS":
gs_seg = seg_data
elif seg_id in ("IEA", "GE"):
pass
else:
if seg_id == "ST":
st_seg = seg_data
# st_id = st_seg.get_value('ST02')
k = {
"isa_seg": isa_seg,
"gs_seg": gs_seg,
"st_seg": st_seg,
}
v = seg_data
yield (k, v)
def iterate_2000(fd_in):
param = pyx12.params.params()
errh = pyx12.error_handler.errh_null()
src = pyx12.x12context.X12ContextReader(param, errh, fd_in)
# isa_id = None
# gs_id = None
isa_seg = None
gs_seg = None
st_id = None
for datatree in src.iter_segments("2000"):
if datatree.id == "ISA":
for dt in datatree.iterate_segments():
isa_seg = dt["segment"]
break
elif datatree.id == "GS":
for dt in datatree.iterate_segments():
gs_seg = dt["segment"]
break
elif datatree.id in ("IEA", "GE"):
pass
else:
if datatree.id == "ST":
st_id = datatree.get_value("ST02")
for seg_node in datatree.iterate_segments():
# do something with loop 2000
k = {
"isa_seg": isa_seg,
"gs_seg": gs_seg,
"st_id": st_id,
}
v = seg_node["segment"]
yield (k, v)
def _get_unique_isa_id():
"""
Generate a random, 4 to 9 character ISA ID
"""
return f"{random.randint(1000, 999999999):0>9}"
def _get_unique_gs_id():
"""
Generate a random, 3 to 9 character GS ID
"""
return f"{random.randint(100, 999999999)}"
def _get_unique_st_id():
"""
Generate a random, 4 to 9 character ST ID
"""
# return '%04i' % (random.randint(10, 999999999))
return f"{random.randint(100, 999999999):0>4}"
def main():
testfile = "834_multiple_st_loops.txt"
targetformat = None
save_many(testfile, targetformat)
return True
if __name__ == "__main__":
sys.exit(not main())
De-identifying an 834 enrollment file¶
deident834.py walks an 834
Benefit Enrollment file and replaces every member’s PHI (name, SSN,
Medicaid ID, DOB, address) with synthetic values, writing the rewritten
X12 stream back out. It demonstrates the mutation pattern:
pyx12.x12context.X12DataNode.set_value() lets you patch element
values on a parsed loop in place, addressed by a path like 2100A/NM103
(member last name) or REF[0F]02 (subscriber identifier). Combined
with a pyx12.x12file.X12Writer you can round-trip a transformed copy
of the input back to disk.
This is a demo and is not production-ready de-identification — the substitution policy is illustrative only.
#! /usr/bin/env python
"""De-identify an 834 Enrollment file by replacing member demographics with
synthetic values. Demo / not production-ready."""
import getopt
import logging
import os.path
import random
import sys
# Intrapackage imports
libpath = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
if os.path.isdir(libpath):
sys.path.insert(0, libpath)
from collections import namedtuple
import pyx12
import pyx12.params
import pyx12.segment
import pyx12.x12context
import pyx12.x12file
__author__ = "John Holland"
__version__ = "1.0"
__date__ = "2015-02-12"
VERBOSE = 0
logger = logging.getLogger()
sub_idx = 0
Demographic = namedtuple(
"Demographic",
"primaryId, ssn, \
medicaidId, dob, dod, firstname, lastname, middlename, street, street2, county",
)
class FakeDeidentify:
def __init__(self):
pass
def getDeidentified(self, primaryId, datatree):
demo = Demographic(
primaryId,
"99999999",
"009999999",
"19500101",
"",
"Joe",
"Smith",
"",
"123 Elm",
"",
"99",
)
return demo
class RandomDeidentify:
def __init__(self):
self.identities = {}
def getDeidentified(self, primaryId, datatree):
if primaryId in self.identities:
return self.identities[primaryId]
demo = Demographic(
primaryId=f"{random.randint(1000, 99999999999):0>10}",
ssn=f"{random.randint(10000, 999999999):0>9}",
medicaidId=f"{random.randint(1000, 99999999999):0>10}",
dob="19520101",
dod="",
firstname="AA",
lastname="Smith",
middlename="",
street=f"{random.randint(10, 9999)} Oak",
street2="",
county="98",
)
self.identities[primaryId] = demo
return demo
def deidentify_file(fd_in):
""" """
param = pyx12.params.params()
errh = pyx12.error_handler.errh_null()
src = pyx12.x12context.X12ContextReader(param, errh, fd_in)
# deident = FakeDeidentify()
deident = RandomDeidentify()
with open("newfile.txt", "w", encoding="ascii") as fd_out:
wr = pyx12.x12file.X12Writer(fd_out)
for datatree in src.iter_segments("2000"):
if datatree.id == "2000":
scrub2000(datatree, deident)
for seg1 in datatree.iterate_segments():
# wr.Write(seg1['segment'].format())
print(seg1["segment"].format())
def scrub2000(loop_sub, deident):
primaryId = loop_sub.get_value("2100A/NM109")
demo = deident.getDeidentified(primaryId, loop_sub)
loop_sub.set_value("INS12", demo.dod)
loop_sub.set_value("REF[0F]02", demo.primaryId)
loop_sub.set_value("2100A/NM103", demo.lastname)
loop_sub.set_value("2100A/NM104", demo.firstname)
loop_sub.set_value("2100A/NM105", demo.middlename)
loop_sub.set_value("2100A/NM109", demo.medicaidId)
loop_sub.set_value("2100A/N301", demo.street)
loop_sub.set_value("2100A/N302", demo.street2)
loop_sub.set_value("2100A/N406", demo.county)
loop_sub.set_value("2100A/DMG02", demo.dob)
def usage():
pgm_nme = os.path.basename(sys.argv[0])
sys.stdout.write("%s %s (%s)\n" % (pgm_nme, __version__, __date__))
sys.stdout.write("usage: %s [options] source_file\n" % (pgm_nme))
sys.stdout.write("\noptions:\n")
sys.stdout.write(" -h Help\n")
sys.stdout.write(" -d Debug mode\n")
sys.stdout.write(" -o output_directory \n")
def main():
try:
opts, args = getopt.getopt(sys.argv[1:], "dhv")
except getopt.error as msg:
usage()
return False
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
stdout_hdlr = logging.StreamHandler()
stdout_hdlr.setFormatter(formatter)
logger.addHandler(stdout_hdlr)
logger.setLevel(logging.INFO)
for o, a in opts:
if o == "-h":
usage()
return True
if o == "-d":
logger.setLevel(logging.DEBUG)
if o == "-v":
logger.setLevel(logging.DEBUG)
for file_in in args:
if not os.path.isfile(file_in):
logger.error("File %s was not found" % (file_in))
usage()
return False
# file_name = os.path.basename(file_in)
fd_in = open(file_in, encoding="ascii")
deidentify_file(fd_in)
return True
if __name__ == "__main__":
sys.exit(not main())
Parsing 834 enrollment into JSON¶
834_x12_json.py is a
fuller pipeline: it parses an 834 file and emits one JSON document per
ST transaction set, with the envelope headers, members, providers, and
payers flattened into a structured object. The Enrollment834Parser
class wraps pyx12.x12context.X12ContextReader, dispatches on
datatree.id to capture envelope state (ISA / GS / ST / BGN), and
descends into each 2000 member loop to extract a member record
along with its nested 2100A (member name), 2100G (responsible
person), 2300 (coverage), 2310 (provider), and 2320 (payer)
loops.
It demonstrates the transformation pattern: how to use X12
path expressions like 2100A/NM109, DTP[356]03, or
REF[0F]02 against a context node to pull values out of a deeply
nested loop without writing per-segment scaffolding, and how to
accumulate the results into nested OrderedDict\ s suitable for
json.dumps.
"""Parse 834 Benefit Enrollment & Maintenance files into JSON, emitting one
JSON document per ST transaction set with header, members, providers, and
payers flattened into a structured object."""
import collections
import glob
import json
import logging
import os.path
import sys
import tempfile
import pyx12.error_handler
import pyx12.errors
import pyx12.params
import pyx12.x12context
from pyx12.examples.jsonTable import JsonInterface
class Enrollment834Parser:
"""Import an 834 eligibility file"""
def __init__(self, fullpath, fd_in, run_type_code="P"):
self.logger = logging.getLogger()
param = pyx12.params.params()
errh = pyx12.error_handler.errh_null()
(file_base, _ext) = os.path.splitext(os.path.basename(fullpath))
self.outfile_base = file_base
try:
self.src = pyx12.x12context.X12ContextReader(param, errh, fd_in, [])
except pyx12.errors.X12Error as e:
err_msg = "This does not look like a X12 data file"
self.logger.error(err_msg)
raise pyx12.errors.X12Error(err_msg) from e
def parse_file(self, out_dir):
"""Convert an 834 X12 file into JSON files"""
my_count = 0
subscribers = []
trn_set_idx = 0
isa = gs = st = header = None
for datatree in self.src.iter_segments("2000"):
if datatree.id == "2000":
my_count += 1
subscriber = self.get_2000(datatree, header, my_count)
subscribers.append(subscriber)
elif datatree.id == "ISA":
isa = JsonInterface()
isa.add_string("SenderCode", datatree.get_value("ISA06"))
isa.add_string("ReceiverCode", datatree.get_value("ISA08"))
isa.add_int("ControlNumber", datatree.get_value("ISA13"))
isa.add_datetime(
"InterchangeDate",
"20" + datatree.get_value("ISA09"),
datatree.get_value("ISA10"),
)
isa.add_string("Version", datatree.get_value("ISA12"))
isa.add_string("UsageIndicator", datatree.get_value("ISA15"))
isa.seg_count = datatree.seg_count
isa.line_number = datatree.cur_line_number
isa.parent_line_number = None
elif datatree.id == "IEA":
isa = None
elif datatree.id == "GS":
gs = JsonInterface()
gs.add_string("SenderCode", datatree.get_value("GS02"))
gs.add_string("ReceiverCode", datatree.get_value("GS03"))
gs.add_datetime(
"CreationDate", datatree.get_value("GS04"), datatree.get_value("GS05")
)
gs.add_int("GroupControlNumber", datatree.get_value("GS06"))
gs.add_string("Version", datatree.get_value("GS08"))
gs.seg_count = datatree.seg_count
gs.line_number = datatree.cur_line_number
gs.parent_line_number = isa.line_number
elif datatree.id == "GE":
gs = None
elif datatree.id == "ST":
st = JsonInterface()
st.add_string("IdentifierCode", datatree.get_value("ST01"))
st.add_int("ControlNumber", datatree.get_value("ST02"))
st.add_string("ImplementationGuideVersion", datatree.get_value("ST03"))
st.seg_count = datatree.seg_count
st.line_number = datatree.cur_line_number
st.parent_line_number = gs.line_number
subscribers = []
my_count = 0
trn_set_idx += 1
elif datatree.id == "SE":
header_od = self._make_headers(isa, gs, st, header)
_h = collections.OrderedDict()
_h["Header"] = header_od
_h["Members"] = subscribers
outfile = f"{self.outfile_base}.{trn_set_idx:04d}.v1.json"
full_outfile = os.path.join(out_dir, outfile)
if self._write_json(_h, full_outfile):
self.logger.info(f"Save JSON output to {full_outfile}")
header = None
elif datatree.id == "BGN":
header = JsonInterface()
header.add_string("TransactionSetPurposeCode", datatree.get_value("BGN01"))
header.add_string("TransactionSetReferenceNumber", datatree.get_value("BGN02"))
header.add_datetime(
"TransactionSetCreationDate",
datatree.get_value("BGN03"),
datatree.get_value("BGN04"),
)
header.add_string("TimeZoneCode", datatree.get_value("BGN05"))
header.add_string(
"OriginalTransactionSetReferenceNumber", datatree.get_value("BGN06")
)
header.add_string("ActionCode", datatree.get_value("BGN08"))
elif datatree.id == "DTP":
if datatree.get_value("DTP01") == "007":
header.add_string("FileEffectiveDateQualifier", datatree.get_value("DTP01"))
header.add_date("FileEffectiveDate", datatree.get_value("DTP[007]03"))
elif datatree.get_value("DTP01") == "303":
header.add_string("FileEffectiveDateQualifier", datatree.get_value("DTP01"))
header.add_date("FileEffectiveDate", datatree.get_value("DTP[303]03"))
elif datatree.id == "QTY":
header.add_int("MemberCount", datatree.get_value("QTY[TO]02"))
elif datatree.cur_path.find("1000A") != -1 and datatree.id == "N1":
header.add_string("SponsorEntityIdentifierCode", datatree.get_value("N101"))
header.add_string("SponsorName", datatree.get_value("N102"))
header.add_string("SponsorIdentificationCodeQualifier", datatree.get_value("N103"))
header.add_string("SponsorIdentifier", datatree.get_value("N104"))
elif datatree.cur_path.find("1000B") != -1 and datatree.id == "N1":
header.add_string("InsurerEntityIdentifierCode", datatree.get_value("N101"))
header.add_string("InsurerName", datatree.get_value("N102"))
header.add_string("InsurerIdentificationCodeQualifier", datatree.get_value("N103"))
header.add_string("InsurerIdentificationCode", datatree.get_value("N104"))
return True
def get_2000(self, loop_sub, st, idx):
"2000 Member Loop"
sub = JsonInterface()
sub.seg_count = loop_sub.seg_count
sub.line_number = loop_sub.cur_line_number
sub.parent_line_number = st.line_number
sub.add_int("MemberOrdinal", idx)
sub.add_string("MemberIndicator", loop_sub.get_value("INS01"))
sub.add_string("IndividualRelationshipCode", loop_sub.get_value("INS02"))
sub.add_string("MaintenanceTypeCode", loop_sub.get_value("INS03"))
sub.add_string("MaintenanceReasonCode", loop_sub.get_value("INS04"))
sub.add_string("BenefitStatusCode", loop_sub.get_value("INS05"))
sub.add_string("MedicarePlanCode", loop_sub.get_value("INS06-1"))
sub.add_string("MedicareEligibilityReasonCode", loop_sub.get_value("INS06-2"))
sub.add_string("CobraQualifyingEventCode", loop_sub.get_value("INS07"))
sub.add_string("EmploymentStatusCode", loop_sub.get_value("INS08"))
sub.add_string("StudentStatusCode", loop_sub.get_value("INS09"))
sub.add_string("HandicapIndicator", loop_sub.get_value("INS10"))
sub.add_date("DateOfDeath", loop_sub.get_value("INS12"))
sub.add_int("BirthSequenceNumber", loop_sub.get_value("INS17"))
sub.add_string("SubscriberIdentifier", loop_sub.get_value("REF[0F]02"))
sub.add_string("PolicyNumber", loop_sub.get_value("REF[1L]02"))
sub.add_string("CaseNumber", loop_sub.get_value("REF[3H]02"))
sub.add_date("EligibilityBeginDate", loop_sub.get_value("DTP[356]03"))
sub.add_date("EligibilityEndDate", loop_sub.get_value("DTP[357]03"))
sub.add_date("MedicaidEndDate", loop_sub.get_value("DTP[474]03"))
sub.add_string("MemberEntityIdentifierCode", loop_sub.get_value("2100A/NM101"))
sub.add_string("MemberLastName", loop_sub.get_value("2100A/NM103"))
sub.add_string("MemberFirstName", loop_sub.get_value("2100A/NM104"))
sub.add_string("MemberMiddleName", loop_sub.get_value("2100A/NM105"))
sub.add_string("MemberNamePrefix", loop_sub.get_value("2100A/NM106"))
sub.add_string("MemberNameSuffix", loop_sub.get_value("2100A/NM107"))
sub.add_string("MemberIdentificationCodeQualifier", loop_sub.get_value("2100A/NM108"))
sub.add_string("MemberIdentifier", loop_sub.get_value("2100A/NM109"))
if loop_sub.exists("2100A/PER"):
per = self._get_per_contact(loop_sub, "2100A")
if "EmailAddress" in per:
sub.add_string("MemberEmail", per["EmailAddress"])
if "FaxNumber" in per:
sub.add_string("MemberFaxNumber", per["FaxNumber"])
if "TelephoneNumber" in per:
sub.add_string("MemberTelephoneNumber", per["TelephoneNumber"])
if "TelephoneExtension" in per:
sub.add_string("MemberTelephoneExtension", per["TelephoneExtension"])
if "URI" in per:
sub.add_string("MemberURI", per["URI"])
sub.add_string("MemberAddressLine1", loop_sub.get_value("2100A/N301"))
sub.add_string("MemberAddressLine2", loop_sub.get_value("2100A/N302"))
sub.add_string("MemberCity", loop_sub.get_value("2100A/N401"))
sub.add_string("MemberState", loop_sub.get_value("2100A/N402"))
sub.add_string("MemberZipCode", loop_sub.get_value("2100A/N403"))
sub.add_string("MemberCountry", loop_sub.get_value("2100A/N404"))
sub.add_string("MemberLocationQualifier", loop_sub.get_value("2100A/N405"))
sub.add_string("MemberLocationIdentifier", loop_sub.get_value("2100A/N406"))
sub.add_date("DateOfBirth", loop_sub.get_value("2100A/DMG02"))
sub.add_string("Gender", loop_sub.get_value("2100A/DMG03"))
sub.add_string("MaritalStatusCode", loop_sub.get_value("2100A/DMG04"))
sub.add_string("EthnicityCode", loop_sub.get_value("2100A/DMG05-01"))
sub.add_string("ClassificationOfRaceOrEthnicity", loop_sub.get_value("2100A/DMG05-02"))
sub.add_string("RaceOrEthnicityCode", loop_sub.get_value("2100A/DMG05-03"))
sub.add_string("CitizenshipStatusCode", loop_sub.get_value("2100A/DMG06"))
sub.add_string("IncomeFrequencyCode", loop_sub.get_value("2100A/ICM01"))
sub.add_number("IncomeWageAmount", loop_sub.get_value("2100A/ICM02"))
sub.add_number("IncomeWorkHoursCount", loop_sub.get_value("2100A/ICM03"))
sub.add_string("IncomeLocationIdentificationCode", loop_sub.get_value("2100A/ICM04"))
sub.add_string("IncomeSalaryGradeCode", loop_sub.get_value("2100A/ICM05"))
sub.add_string("LanguageCodeQualifier", loop_sub.get_value("2100A/LUI01"))
sub.add_string("LanguageCode", loop_sub.get_value("2100A/LUI02"))
sub.add_string("LanguageDescription", loop_sub.get_value("2100A/LUI03"))
sub.add_string("LanguageUseIndicator", loop_sub.get_value("2100A/LUI04"))
ret = sub._asdict()
if loop_sub.exists("2100B"):
incorrect_member_fields = self.get_incorrect_member(loop_sub.first("2100B"))
ret["IncorrectMember"] = incorrect_member_fields
resps = []
for resp_loop in loop_sub.select("2100G"):
resp_per = self.get_responsible_person(resp_loop)
resps.append(resp_per)
ret["ResponsiblePerson"] = resps
coverages = []
for loop_node in loop_sub.select("2300"):
cov = self.get_2300(loop_node, sub)
coverages.append(cov)
ret["Coverages"] = coverages
return ret
def get_2300(self, loop_2300, parent):
"Coverage 2300 loop"
clsub = JsonInterface()
clsub.seg_count = loop_2300.seg_count
clsub.line_number = loop_2300.cur_line_number
clsub.parent_line_number = parent.line_number
clsub.add_string("MaintenanceTypeCode", loop_2300.get_value("HD01"))
clsub.add_string("InsuranceLineCode", loop_2300.get_value("HD03"))
clsub.add_string("PlanCoverageDescription", loop_2300.get_value("HD04"))
clsub.add_string("CoverageLevelCode", loop_2300.get_value("HD05"))
clsub.add_date("BenefitBeginDate", loop_2300.get_value("DTP[348]03"))
clsub.add_date("BenefitEndDate", loop_2300.get_value("DTP[349]03"))
clsub.add_date("EnrollmentSignatureDate", loop_2300.get_value("DTP[300]03"))
clsub.add_date("MaintenanceEffectiveDate", loop_2300.get_value("DTP[303]03"))
clsub.add_date("PremiumPaidToEndDate", loop_2300.get_value("DTP[343]03"))
clsub.add_date("LastPremiumPaidDate", loop_2300.get_value("DTP[543]03"))
clsub.add_string("ClientReportingCategory", loop_2300.get_value("REF[17]02"))
ret = clsub._asdict()
providers = []
for loop_node in loop_2300.select("2310"):
provider = self.get_2310_fields(loop_node)
providers.append(provider)
ret["Providers"] = providers
payers = []
idx = 0
for loop_node in loop_2300.select("2320"):
idx += 1
payer_fields = self.get_2320_fields(loop_node, idx)
payers.append(payer_fields)
ret["Payers"] = payers
return ret
def get_2310_fields(self, loop):
"Provider loop table"
prov = JsonInterface()
prov.seg_count = loop.seg_count
prov.line_number = loop.cur_line_number
prov.add_int("LineCounter", int(loop.get_value("LX01")))
prov.add_string("EntityIdentifierCode", loop.get_value("NM101"))
prov.add_string("EntityTypeQualifier", loop.get_value("NM102"))
prov.add_string("LastName", loop.get_value("NM103"))
prov.add_string("FirstName", loop.get_value("NM104"))
prov.add_string("MiddleName", loop.get_value("NM105"))
prov.add_string("NamePrefix", loop.get_value("NM106"))
prov.add_string("NameSuffix", loop.get_value("NM107"))
prov.add_string("IdentifierCodeQualifier", loop.get_value("NM108"))
prov.add_string("IdentificationCode", loop.get_value("NM109"))
prov.add_string("EntityRelationshipCode", loop.get_value("NM110"))
prov.add_string("Address1", loop.get_value("N301"))
prov.add_string("Address2", loop.get_value("N302"))
prov.add_string("City", loop.get_value("N401"))
prov.add_string("State", loop.get_value("N402"))
prov.add_string("ZipCode", loop.get_value("N403"))
prov.add_string("Country", loop.get_value("N404"))
if loop.exists("PER"):
per = self._get_per_contact(loop, None)
if "EmailAddress" in per:
prov.add_string("Email", per["EmailAddress"])
if "FaxNumber" in per:
prov.add_string("FaxNumber", per["FaxNumber"])
if "TelephoneNumber" in per:
prov.add_string("TelephoneNumber", per["TelephoneNumber"])
if "TelephoneExtension" in per:
prov.add_string("TelephoneExtension", per["TelephoneExtension"])
if "URI" in per:
prov.add_string("URI", per["URI"])
prov.add_date("EffectiveDate", loop.get_value("PLA03"))
prov.add_string("ChangeReason", loop.get_value("PLA04"))
return prov._asdict()
def get_2320_fields(self, loop, idx):
"Payer 2320 loop"
pay = JsonInterface()
pay.seg_count = loop.seg_count
pay.line_number = loop.cur_line_number
pay.add_int("PayerOrdinal", idx)
pay.add_string("ResponsibilitySequenceNumberCode", loop.get_value("COB01"))
pay.add_string("MemberGroupPolicyNumber", loop.get_value("COB02"))
pay.add_string("CoordinationOfBenefitsCode", loop.get_value("COB03"))
pay.add_string("SocialSecurityNumber", loop.get_value("REF[SY]02"))
pay.add_string("PolicyNumberRefZz", loop.get_value("REF[ZZ]02"))
pay.add_string("GroupNumber", loop.get_value("REF[6P]02"))
pay.add_string("AccountSuffixCode", loop.get_value("REF[60]02"))
pay.add_date("CoordinationOfBenefitsBeginDate", loop.get_value("DTP[344]03"))
pay.add_date("CoordinationOfBenefitsEndDate", loop.get_value("DTP[345]03"))
for ins_node in loop.select("2330"):
if ins_node.get_value("NM101") == "IN":
pay.add_string("PayerName", ins_node.get_value("NM103"))
if ins_node.get_value("NM108") == "FI":
pay.add_string("PayerFederalTaxIdNumber", ins_node.get_value("NM109"))
if ins_node.get_value("NM108") == "XV":
pay.add_string("PayerPlanID", ins_node.get_value("NM109"))
pay.add_string("PayerAddress1", ins_node.get_value("N301"))
pay.add_string("PayerAddress2", ins_node.get_value("N302"))
pay.add_string("PayerCity", ins_node.get_value("N401"))
pay.add_string("PayerState", ins_node.get_value("N402"))
pay.add_string("PayerZipCode", ins_node.get_value("N403"))
pay.add_string("PayerCountry", ins_node.get_value("N404"))
if ins_node.exists("PER"):
per = self._get_per_contact(ins_node, None)
if "EmailAddress" in per:
pay.add_string("Email", per["EmailAddress"])
if "FaxNumber" in per:
pay.add_string("FaxNumber", per["FaxNumber"])
if "TelephoneNumber" in per:
pay.add_string("TelephoneNumber", per["TelephoneNumber"])
if "TelephoneExtension" in per:
pay.add_string("TelephoneExtension", per["TelephoneExtension"])
if "URI" in per:
pay.add_string("URI", per["URI"])
break
return pay._asdict()
def get_incorrect_member(self, loop_sub):
sub = JsonInterface()
sub.add_string("EntityIdentifierCode", loop_sub.get_value("NM101"))
sub.add_string("EntityTypeQualifier", loop_sub.get_value("NM102"))
sub.add_string("LastName", loop_sub.get_value("NM103"))
sub.add_string("FirstName", loop_sub.get_value("NM104"))
sub.add_string("MiddleName", loop_sub.get_value("NM105"))
sub.add_string("NamePrefix", loop_sub.get_value("NM106"))
sub.add_string("NameSuffix", loop_sub.get_value("NM107"))
sub.add_string("IdentificationCodeQualifier", loop_sub.get_value("NM108"))
sub.add_string("InsuredIdentifier", loop_sub.get_value("NM109"))
if loop_sub.get_value("NM108") == "34":
sub.add_string("SocialSecurityNumber", loop_sub.get_value("NM109"))
sub.add_date("DateOfBirth", loop_sub.get_value("DMG02"))
sub.add_string("Gender", loop_sub.get_value("DMG03"))
sub.add_string("MaritalStatusCode", loop_sub.get_value("DMG04"))
if loop_sub.get_value("DMG05-01") != "":
sub.add_string("EthnicityCode", loop_sub.get_value("DMG05-01"))
if loop_sub.get_value("DMG05-02") != "":
sub.add_string("ClassificationOfRaceOrEthnicity", loop_sub.get_value("DMG05-02"))
if loop_sub.get_value("DMG05-03") != "":
sub.add_string("RaceOrEthnicityCode", loop_sub.get_value("DMG05-03"))
sub.add_string("CitizenshipStatusCode", loop_sub.get_value("DMG06"))
return sub._asdict()
def get_responsible_person(self, loop_sub):
sub = JsonInterface()
sub.add_string("EntityIdentifierCode", loop_sub.get_value("NM101"))
sub.add_string("EntityTypeQualifier", loop_sub.get_value("NM102"))
sub.add_string("LastName", loop_sub.get_value("NM103"))
sub.add_string("FirstName", loop_sub.get_value("NM104"))
sub.add_string("MiddleName", loop_sub.get_value("NM105"))
sub.add_string("NamePrefix", loop_sub.get_value("NM106"))
sub.add_string("NameSuffix", loop_sub.get_value("NM107"))
sub.add_string("IdentificationCodeQualifier", loop_sub.get_value("NM108"))
sub.add_string("ResponsiblePartyIdentifier", loop_sub.get_value("NM109"))
if loop_sub.get_value("NM108") == "34":
sub.add_string("SocialSecurityNumber", loop_sub.get_value("NM109"))
if loop_sub.exists("PER"):
per = self._get_per_contact(loop_sub, None)
if "EmailAddress" in per:
sub.add_string("Email", per["EmailAddress"])
if "FaxNumber" in per:
sub.add_string("FaxNumber", per["FaxNumber"])
if "TelephoneNumber" in per:
sub.add_string("TelephoneNumber", per["TelephoneNumber"])
if "TelephoneExtension" in per:
sub.add_string("TelephoneExtension", per["TelephoneExtension"])
if "URI" in per:
sub.add_string("URI", per["URI"])
sub.add_string("Address1", loop_sub.get_value("N301"))
sub.add_string("Address2", loop_sub.get_value("N302"))
sub.add_string("City", loop_sub.get_value("N401"))
sub.add_string("State", loop_sub.get_value("N402"))
sub.add_string("ZipCode", loop_sub.get_value("N403"))
sub.add_string("Country", loop_sub.get_value("N404"))
return sub._asdict()
def _get_per_contact(self, per_loop, loop_id):
"Get list of PER contact info"
per = {}
prefix = f"{loop_id}/PER" if loop_id is not None else "PER"
for idx in range(1, 4):
per_qual = per_loop.get_value(f"{prefix}0{idx * 2 + 1}")
per_value = per_loop.get_value(f"{prefix}0{idx * 2 + 2}")
if per_qual is not None and per_value != "":
if per_qual == "EM":
per["EmailAddress"] = per_value
elif per_qual == "FX":
per["FaxNumber"] = per_value
elif per_qual == "EX":
per["TelephoneExtension"] = per_value
elif per_qual == "TE":
per["TelephoneNumber"] = per_value
elif per_qual == "UR":
per["URI"] = per_value
return per
def _make_headers(self, isa, gs, st, header):
ret = collections.OrderedDict()
for k, v in isa.fields.items():
if not k.startswith("Interchange"):
ret["Interchange" + k] = v["value"]
else:
ret[k] = v["value"]
for k, v in gs.fields.items():
if not k.startswith("Functional"):
ret["Functional" + k] = v["value"]
else:
ret[k] = v["value"]
for k, v in st.fields.items():
if not k.startswith("TransactionSet"):
ret["TransactionSet" + k] = v["value"]
else:
ret[k] = v["value"]
for k, v in header.fields.items():
ret[k] = v["value"]
return ret
def _write_json(self, header, outfile):
try:
with open(outfile, "w") as fd_final:
fd_final.write(
json.dumps(header, indent=2, sort_keys=False, separators=(",", ": "))
)
return True
except OSError:
self.logger.warning(f"Failed to write {outfile}")
return False
def parse_file(fullname, out_dir="."):
logger = logging.getLogger()
if not os.path.isfile(fullname):
raise Exception(f"File {fullname} was not found")
if not os.path.isdir(out_dir):
logger.exception("outdir does not exist")
with tempfile.TemporaryFile(mode="w+", encoding="ascii") as fd_src:
with open(fullname, encoding="ascii") as fd_dx:
fd_src.write(fd_dx.read())
fd_src.seek(0)
my834 = Enrollment834Parser(fullname, fd_src)
print(fullname)
try:
my834.parse_file(out_dir)
except Exception:
logger.exception("Failed to parse file")
def main():
"""Script main program"""
import argparse
parser = argparse.ArgumentParser(description="834 eligibility to JSON parser")
parser.add_argument("--verbose", "-v", action="count", default=0)
parser.add_argument("--debug", "-d", action="store_true")
parser.add_argument(
"--out-dir",
"-o",
default=".",
help="directory to write JSON output files (default: current directory)",
)
parser.add_argument("files", metavar="N", nargs="*", help="input files")
args = parser.parse_args()
logger = logging.getLogger()
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
stdout_hdlr = logging.StreamHandler()
stdout_hdlr.setFormatter(formatter)
logger.addHandler(stdout_hdlr)
logger.setLevel(logging.INFO)
if args.debug or args.verbose > 0:
logger.setLevel(logging.DEBUG)
if args.files:
for glob_pattern in args.files:
files = glob.glob(glob_pattern)
print(files)
for full_name in files:
parse_file(full_name, args.out_dir)
else:
files = ["834_multiple_st_loops.txt"]
print(files)
for full_name in files:
parse_file(full_name, args.out_dir)
return True
if __name__ == "__main__":
sys.exit(not main())
Generating a field reference from a sample file¶
The next two scripts form a small pipeline that derives a human-readable field reference for whatever transaction type a sample X12 file contains. This is useful when you’re integrating a new transaction (or a new vendor’s flavor of an existing one) and want to see exactly which loops, segments, and elements show up — together with the implementation-guide metadata pyx12 has for each.
node_iterator.py
walks the input file with pyx12.x12file.X12Reader plus a
pyx12.map_walker.walk_tree, manually driving the walker the
same way pyx12.x12n_document.x12n_document() does internally,
and records every distinct map node it encounters along with its
base_name, id, name, usage, data_type, and
length bounds. The result is written to node_list.json next to
the input file. This is also a worked example of using the map
walker directly, picking the right map via
pyx12.map_index.map_index, and switching maps mid-stream
when a 4010 837 reveals its tspc in the BHT segment.
#!/usr/bin/env python
"""Walk an X12 file with the map walker and write a ``node_list.json`` catalog
of every loop / segment / element node encountered, with type, usage, and
length metadata. Output feeds ``generate_spec.py``."""
import argparse
import json
import logging
import os
import os.path
import sys
libpath = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
if os.path.isdir(libpath):
sys.path.insert(0, libpath)
# Intrapackage imports
import pyx12.error_handler
import pyx12.errors
import pyx12.map_if
import pyx12.map_index
import pyx12.params
import pyx12.x12file
from pyx12.map_walker import apply_walk_errors, walk_tree
__version__ = "1.0.0"
def x12n_iterator(param, src_file, map_path=None):
logger = logging.getLogger("pyx12")
errh = pyx12.error_handler.errh_null()
# Get X12 DATA file
try:
src = pyx12.x12file.X12Reader(src_file)
except pyx12.errors.X12Error:
logger.error('"%s" does not look like an X12 data file' % (src_file))
return False
# Get Map of Control Segments
map_file = "x12.control.00501.xml" if src.icvn == "00501" else "x12.control.00401.xml"
logger.debug("X12 control file: %s" % (map_file))
control_map = pyx12.map_if.load_map_file(map_file, param, map_path)
map_index_if = pyx12.map_index.map_index(map_path)
node = control_map.getnodebypath("/ISA_LOOP/ISA")
walker = walk_tree()
icvn = fic = vriic = tspc = None
cur_map = None # we do not initially know the X12 transaction type
res = {}
res_ordinal = 0
last_x12_segment_path = None
for seg in src:
# find node
orig_node = node
if seg.get_seg_id() == "ISA":
node = control_map.getnodebypath("/ISA_LOOP/ISA")
walker.forceWalkCounterToLoopStart("/ISA_LOOP", "/ISA_LOOP/ISA")
elif seg.get_seg_id() == "GS":
node = control_map.getnodebypath("/ISA_LOOP/GS_LOOP/GS")
walker.forceWalkCounterToLoopStart("/ISA_LOOP/GS_LOOP", "/ISA_LOOP/GS_LOOP/GS")
else:
# from the current node, find the map node matching the segment
# keep track of the loops traversed
try:
(node, pop_loops, push_loops, walk_errors) = walker.walk_errors(
node, seg, src.get_seg_count(), src.get_cur_line(), src.get_ls_id()
)
apply_walk_errors(errh, walk_errors)
except pyx12.errors.EngineError:
logger.error("Source file line %i" % (src.get_cur_line()))
raise
if node is None:
node = orig_node
else:
if seg.get_seg_id() == "ISA":
icvn = seg.get_value("ISA12")
elif seg.get_seg_id() == "IEA":
pass
elif seg.get_seg_id() == "GS":
fic = seg.get_value("GS01")
vriic = seg.get_value("GS08")
map_file_new = map_index_if.get_filename(icvn, vriic, fic)
if map_file != map_file_new:
map_file = map_file_new
if map_file is None:
err_str = f"Map not found. icvn={icvn}, fic={fic}, vriic={vriic}"
raise pyx12.errors.EngineError(err_str)
cur_map = pyx12.map_if.load_map_file(map_file, param, map_path)
src.check_837_lx = True if cur_map.id == "837" else False
logger.debug("Map file: %s" % (map_file))
node = cur_map.getnodebypath("/ISA_LOOP/GS_LOOP/GS")
pass
elif seg.get_seg_id() == "BHT":
# special case for 4010 837P
if vriic in ("004010X094", "004010X094A1"):
tspc = seg.get_value("BHT02")
logger.debug("icvn=%s, fic=%s, vriic=%s, tspc=%s" % (icvn, fic, vriic, tspc))
map_file_new = map_index_if.get_filename(icvn, vriic, fic, tspc)
logger.debug("New map file: %s" % (map_file_new))
if map_file != map_file_new:
map_file = map_file_new
if map_file is None:
err_str = f"Map not found. icvn={icvn}, fic={fic}, vriic={vriic}, tspc={tspc}"
raise pyx12.errors.EngineError(err_str)
cur_map = pyx12.map_if.load_map_file(map_file, param, map_path)
src.check_837_lx = True if cur_map.id == "837" else False
logger.debug("Map file: %s" % (map_file))
# apply_loop_count(node, cur_map)
node = cur_map.getnodebypath("/ISA_LOOP/GS_LOOP/ST_LOOP/HEADER/BHT")
# elif seg.get_seg_id() == 'GE':
# pass
# elif seg.get_seg_id() == 'ST':
# pass
# elif seg.get_seg_id() == 'SE':
# pass
else:
pass
x12path = node.get_path()
# parent
if x12path in res:
res[x12path]["Count"] += 1
if last_x12_segment_path not in res[x12path]["prefix_nodes"]:
res[x12path]["prefix_nodes"].append(last_x12_segment_path)
else:
res[x12path] = {
"Ordinal": res_ordinal,
"Count": 1,
"NodeType": node.base_name,
"Id": node.id,
"Name": node.name,
"FormattedName": clean_name(node.name),
"ParentName": clean_name(node.parent.name),
"LoopMaxUse": node.max_use,
"ParentPath": node.parent.get_path(),
"prefix_nodes": [last_x12_segment_path],
}
res_ordinal += 1
for refdes, ele_ord, comp_ord, val in seg.values_iterator():
elepath = node.parent.get_path() + "/" + refdes
if elepath in res:
res[elepath]["Count"] += 1
else:
ele_node = node.getnodebypath2(refdes)
# node.get_child_node_by_ordinal(
res[elepath] = {
"Ordinal": res_ordinal,
"Count": 1,
"NodeType": ele_node.base_name,
"Id": ele_node.id,
"Name": ele_node.name,
"FormattedName": clean_name(ele_node.name),
"ParentName": clean_name(ele_node.parent.name),
#'max_use': ele_node.max_use,
"ParentPath": ele_node.parent.get_path(),
"Usage": ele_node.usage,
"DataType": ele_node.data_type,
"MinLength": ele_node.min_len,
"MaxLength": ele_node.max_len,
}
res_ordinal += 1
# print (refdes, val)
last_x12_segment_path = x12path
del node
del src
del control_map
try:
del cur_map
except UnboundLocalError:
pass
return res
def clean_name(name):
return name.replace(" ", "").replace("/", "").replace("'", "")
def check_map_path_arg(map_path):
if not os.path.isdir(map_path):
raise argparse.ArgumentError(None, f"The MAP_PATH '{map_path}' is not a valid directory")
index_file = "maps.xml"
if not os.path.isfile(os.path.join(map_path, index_file)):
raise argparse.ArgumentError(
None,
f"The MAP_PATH '{map_path}' does not contain the map index file '{index_file}'",
)
return map_path
def main():
"""
Set up environment for processing
"""
parser = argparse.ArgumentParser(description="X12 Validation")
parser.add_argument("--config-file", "-c", action="store", dest="configfile", default=None)
parser.add_argument("--log-file", "-l", action="store", dest="logfile", default=None)
parser.add_argument(
"--map-path", "-m", action="store", dest="map_path", default=None, type=check_map_path_arg
)
parser.add_argument("--verbose", "-v", action="count", default=0)
parser.add_argument("--debug", "-d", action="store_true")
parser.add_argument("--quiet", "-q", action="store_true")
parser.add_argument("--html", "-H", action="store_true")
parser.add_argument(
"--version",
action="version",
version=f"{parser.prog} {__version__}",
)
parser.add_argument("input_files", nargs="*")
args = parser.parse_args()
logger = logging.getLogger("pyx12")
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
stdout_hdlr = logging.StreamHandler()
stdout_hdlr.setFormatter(formatter)
logger.addHandler(stdout_hdlr)
logger.setLevel(logging.INFO)
param = pyx12.params.params(args.configfile)
if args.debug:
logger.setLevel(logging.DEBUG)
param.set("debug", True)
if args.verbose > 0:
logger.setLevel(logging.DEBUG)
if args.quiet:
logger.setLevel(logging.ERROR)
if args.map_path:
param.set("map_path", args.map_path)
if args.logfile:
try:
hdlr = logging.FileHandler(args.logfile)
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
except OSError:
logger.exception("Could not open log file: %s" % (args.logfile))
for src_filename in args.input_files:
try:
if not os.path.isfile(src_filename):
logger.error('Could not open file "%s"' % (src_filename))
continue
res = x12n_iterator(param=param, src_file=src_filename, map_path=args.map_path)
json_file = os.path.join(
os.path.dirname(os.path.abspath(src_filename)), "node_list.json"
)
with open(json_file, "w") as fd:
json.dump(res, fd, indent=4)
except OSError:
logger.exception("Could not open files")
return False
except KeyboardInterrupt:
print("\n[interrupt]")
# except Exception as e:
# raise e
return True
if __name__ == "__main__":
sys.exit(not main())
generate_spec.py
consumes the node_list.json and emits two artifacts:
out.csv— one row per node with all of the metadata fields, suitable for review in a spreadsheet.map.json— element nodes grouped by the section of the transaction they belong to (Header, Patient, Claim, ServiceLine, ProviderStatus, …), with the section heuristics applied to 277CA loop names like2200D/2220D. The element list per section collapses duplicateFormattedNamecollisions by prefixing the parent loop name.
#!/usr/bin/env python
"""Convert the ``node_list.json`` produced by ``node_iterator.py`` into a
``out.csv`` field reference and a sectioned ``map.json`` field-mapping file."""
import argparse
import json
import logging
import os.path
import sys
libpath = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
if os.path.isdir(libpath):
sys.path.insert(0, libpath)
__version__ = "1.0.0"
def clean_name(name):
return name.replace(" ", "").replace("/", "").replace("'", "")
def check_map_path_arg(map_path):
if not os.path.isdir(map_path):
raise argparse.ArgumentError(None, f"The MAP_PATH '{map_path}' is not a valid directory")
index_file = "maps.xml"
if not os.path.isfile(os.path.join(map_path, index_file)):
raise argparse.ArgumentError(
None,
f"The MAP_PATH '{map_path}' does not contain the map index file '{index_file}'",
)
return map_path
def save_csv(rows, csv_file):
import csv
fields = [
"Ordinal",
"Id",
"NodeType",
"Name",
"FormattedName",
"Count",
"Section",
"RelativePath",
"FullPath",
"ParentPath",
"ParentName",
"LoopMaxUse",
"Usage",
"DataType",
"MinLength",
"MaxLength",
]
with open(csv_file, "w", newline="", encoding="utf-8") as outfile:
writer = csv.DictWriter(
outfile, fieldnames=fields, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL
)
writer.writeheader()
rows.sort(key=lambda item: item["Ordinal"])
for row in rows:
writer.writerow(row)
def save_mapping(rows, json_file):
sections = sorted(list(set([x["Section"] for x in rows])))
maps = {}
with open(json_file, "w") as fd:
fd.write("{")
for s in sections:
fd.write(f'"{s}": [')
s = [
{
"Id": x["Id"],
"Ordinal": x["Ordinal"],
"Type": x["DataType"] if "DataType" in x else None,
"FieldName": x["FormattedName"],
"X12Path": x["RelativePath"],
"FullPath": x["FullPath"],
"ParentPath": x["ParentPath"],
"ParentName": x["ParentName"],
"Usage": x["Usage"],
"MaxLength": x["MaxLength"],
}
for x in rows
if x["Section"] == s and x["NodeType"] == "element"
]
s.sort(key=lambda item: item["Ordinal"])
for item in s:
fitem = json.dumps(item)
fd.write(f"\n\t{fitem},")
fd.write("\n],\n")
fd.write("}")
def make_dict(data):
rows = []
for k, v in data.items():
if v["Id"] in ("IEA02", "GE02", "SE02", "ST03"):
continue
row = v
row["FullPath"] = k
if "2220D" in k and row["Id"].startswith("STC"):
row["Section"] = "ServiceLineStatus"
elif "2220D" in k:
row["Section"] = "ServiceLine"
elif "2200D" in k and row["Id"].startswith("STC"):
row["Section"] = "ClaimStatus"
elif "2200D" in k:
row["Section"] = "Claim"
elif "2000D" in k:
row["Section"] = "Patient"
elif "2000C" in k and row["Id"].startswith("STC"):
row["Section"] = "BillingProviderStatus"
elif "2000C" in k:
row["Section"] = "BillingProvider"
elif "2200B" in k and row["Id"].startswith("STC"):
row["Section"] = "InformationReceiverStatus"
elif "2000A" in k:
row["Section"] = "Header"
else:
row["Section"] = "Batch"
rows.append(row)
base_paths = {}
for row in rows:
section = row["Section"]
if section not in base_paths:
base_paths[section] = row["ParentPath"]
elif len(base_paths[section]) > len(row["ParentPath"]):
base_paths[section] = row["ParentPath"]
for row in rows:
basepath = base_paths[row["Section"]]
if row["FullPath"].startswith(basepath):
row["RelativePath"] = row["FullPath"][len(basepath) + 1 :]
for section in list(set([r["Section"] for r in rows])):
fields = [r for r in rows if r["Section"] == section and r["NodeType"] == "element"]
fieldnames = [f["FormattedName"] for f in fields]
duplicate_fieldnames = set([f for f in fieldnames if fieldnames.count(f) > 1])
for row in [
r
for r in rows
if r["Section"] == section
and r["NodeType"] == "element"
and r["FormattedName"] in duplicate_fieldnames
]:
row["FormattedName"] = row["ParentName"] + row["FormattedName"]
return rows
def main():
"""
Set up environment for processing
"""
parser = argparse.ArgumentParser(description="Gen X12 Sepcs")
parser.add_argument("--config-file", "-c", action="store", dest="configfile", default=None)
parser.add_argument("--log-file", "-l", action="store", dest="logfile", default=None)
parser.add_argument(
"--map-path", "-m", action="store", dest="map_path", default=None, type=check_map_path_arg
)
parser.add_argument("--verbose", "-v", action="count", default=0)
parser.add_argument("--debug", "-d", action="store_true")
parser.add_argument("--quiet", "-q", action="store_true")
parser.add_argument("--html", "-H", action="store_true")
parser.add_argument(
"--version",
action="version",
version=f"{parser.prog} {__version__}",
)
parser.add_argument("input_files", nargs="*")
args = parser.parse_args()
logger = logging.getLogger("pyx12")
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
stdout_hdlr = logging.StreamHandler()
stdout_hdlr.setFormatter(formatter)
logger.addHandler(stdout_hdlr)
logger.setLevel(logging.INFO)
if args.debug:
logger.setLevel(logging.DEBUG)
if args.verbose > 0:
logger.setLevel(logging.DEBUG)
if args.quiet:
logger.setLevel(logging.ERROR)
if args.logfile:
try:
hdlr = logging.FileHandler(args.logfile)
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
except OSError:
logger.exception("Could not open log file: %s" % (args.logfile))
src_filename = args.input_files[0]
json_file = os.path.join(os.path.dirname(os.path.abspath(src_filename)), "node_list.json")
with open(json_file) as fd:
res = json.load(fd)
rows = make_dict(res)
csv_file = os.path.join(os.path.dirname(os.path.abspath(src_filename)), "out.csv")
save_csv(rows, csv_file)
json_map_file = os.path.join(os.path.dirname(os.path.abspath(src_filename)), "map.json")
save_mapping(rows, json_map_file)
return True
if __name__ == "__main__":
sys.exit(not main())