#####################################################################
# Copyright
# John Holland <john@zoner.org>
# All rights reserved.
#
# This software is licensed as described in the file LICENSE.txt, which
# you should have received as part of this distribution.
#
######################################################################
"""
Parse a ANSI X12N data file. Validate against a map and codeset values.
Create XML, HTML, and 997/999 documents based on the data file.
"""
from __future__ import annotations
import logging
from collections.abc import Callable
from typing import Any, TextIO
import pyx12.errh_json
import pyx12.error_997
import pyx12.error_999
# Intrapackage imports
import pyx12.error_handler
import pyx12.error_html
import pyx12.errors
import pyx12.map_if
import pyx12.map_index
import pyx12.params
import pyx12.x12file
import pyx12.x12xml_simple
from pyx12.map_if._segment import apply_segment_errors
from pyx12.map_walker import apply_walk_errors, walk_tree
def _reset_counter_to_isa_counts(walker: walk_tree) -> None:
"""
Reset ISA instance counts
"""
walker.counter.reset_to_node("/ISA_LOOP")
walker.counter.increment("/ISA_LOOP")
walker.counter.increment("/ISA_LOOP/ISA")
def _reset_counter_to_gs_counts(walker: walk_tree) -> None:
"""
Reset GS instance counts
"""
walker.counter.reset_to_node("/ISA_LOOP/GS_LOOP")
walker.counter.increment("/ISA_LOOP/GS_LOOP")
walker.counter.increment("/ISA_LOOP/GS_LOOP/GS")
[docs]
def x12n_document(
param: pyx12.params.ParamsBase,
src_file: str | TextIO,
fd_997: TextIO | None,
fd_html: TextIO | None,
fd_xmldoc: TextIO | None = None,
fd_json: TextIO | None = None,
xslt_files: Any = None,
map_path: str | None = None,
callback: Callable[..., Any] | None = None,
) -> bool:
"""
Primary X12 validation function
:param param: pyx12.param instance
:param src_file: Source document
:type src_file: string
:param fd_997: 997/999 output document
:type fd_997: file descriptor
:param fd_html: HTML output document
:type fd_html: file descriptor
:param fd_xmldoc: XML output document
:type fd_xmldoc: file descriptor
:param fd_json: JSON error output document
:type fd_json: file descriptor
:rtype: boolean
"""
logger = logging.getLogger("pyx12")
errh = pyx12.error_handler.err_handler()
# Get X12 DATA file
try:
src = pyx12.x12file.X12Reader(src_file)
except pyx12.errors.X12Error:
logger.error('"%s" does not look like an X12 data file' % (src_file))
return False
# Get Map of Control Segments
map_file = "x12.control.00501.xml" if src.icvn == "00501" else "x12.control.00401.xml"
logger.debug("X12 control file: %s" % (map_file))
control_map = pyx12.map_if.load_map_file(map_file, param, map_path)
map_index_if = pyx12.map_index.map_index(map_path)
node = control_map.getnodebypath("/ISA_LOOP/ISA")
walker = walk_tree()
icvn: str | None = None
fic: str | None = None
vriic: str | None = None
tspc: str | None = None
cur_map: Any = None # we do not initially know the X12 transaction type
if fd_html:
html = pyx12.error_html.error_html(errh, fd_html, src.get_term())
html.header()
err_iter = pyx12.error_handler.err_iter(errh)
if fd_xmldoc:
xmldoc = pyx12.x12xml_simple.x12xml_simple(fd_xmldoc, param.get("simple_dtd"))
# basedir = os.path.dirname(src_file)
# erx = errh_xml.err_handler(basedir=basedir)
valid = True
for seg in src:
# find node
orig_node = node
if False:
print("--------------------------------------------")
print(seg)
print("--------------------------------------------")
# reset to control map for ISA and GS loops
print("------- counters before --------")
print(walker.counter._dict)
if seg.get_seg_id() == "ISA":
node = control_map.getnodebypath("/ISA_LOOP/ISA")
walker.forceWalkCounterToLoopStart("/ISA_LOOP", "/ISA_LOOP/ISA")
elif seg.get_seg_id() == "GS":
node = control_map.getnodebypath("/ISA_LOOP/GS_LOOP/GS")
walker.forceWalkCounterToLoopStart("/ISA_LOOP/GS_LOOP", "/ISA_LOOP/GS_LOOP/GS")
else:
# from the current node, find the map node matching the segment
# keep track of the loops traversed
try:
(node, pop_loops, push_loops, walk_errors) = walker.walk_errors(
node, seg, src.get_seg_count(), src.get_cur_line(), src.get_ls_id()
)
apply_walk_errors(errh, walk_errors)
except pyx12.errors.EngineError:
logger.error("Source file line %i" % (src.get_cur_line()))
raise
if False:
print("------- counters after --------")
print(walker.counter._dict)
if node is None:
node = orig_node
else:
if seg.get_seg_id() == "ISA":
errh.add_isa_loop(seg, src)
icvn = seg.get_value("ISA12")
errh.handle_errors(src.pop_errors())
elif seg.get_seg_id() == "IEA":
errh.handle_errors(src.pop_errors())
errh.close_isa_loop(node, seg, src)
# Generate 997
elif seg.get_seg_id() == "GS":
fic = seg.get_value("GS01")
vriic = seg.get_value("GS08")
map_file_new = map_index_if.get_filename(icvn, vriic, fic)
if map_file != map_file_new:
if map_file_new is None:
err_str = f"Map not found. icvn={icvn}, fic={fic}, vriic={vriic}"
raise pyx12.errors.EngineError(err_str)
map_file = map_file_new
cur_map = pyx12.map_if.load_map_file(map_file, param, map_path)
src.check_837_lx = True if cur_map.id == "837" else False
logger.debug("Map file: %s" % (map_file))
# apply_loop_count(orig_node, cur_map)
# reset_isa_counts(cur_map)
# _reset_counter_to_isa_counts(walker) # new counter
# reset_gs_counts(cur_map)
# _reset_counter_to_gs_counts(walker) # new counter
node = cur_map.getnodebypath("/ISA_LOOP/GS_LOOP/GS")
errh.add_gs_loop(seg, src)
errh.handle_errors(src.pop_errors())
elif seg.get_seg_id() == "BHT":
# special case for 4010 837P
if vriic in ("004010X094", "004010X094A1"):
tspc = seg.get_value("BHT02")
logger.debug("icvn=%s, fic=%s, vriic=%s, tspc=%s" % (icvn, fic, vriic, tspc))
map_file_new = map_index_if.get_filename(icvn, vriic, fic, tspc)
logger.debug("New map file: %s" % (map_file_new))
if map_file != map_file_new:
if map_file_new is None:
err_str = f"Map not found. icvn={icvn}, fic={fic}, vriic={vriic}, tspc={tspc}"
raise pyx12.errors.EngineError(err_str)
map_file = map_file_new
cur_map = pyx12.map_if.load_map_file(map_file, param, map_path)
src.check_837_lx = True if cur_map.id == "837" else False
logger.debug("Map file: %s" % (map_file))
# apply_loop_count(node, cur_map)
node = cur_map.getnodebypath("/ISA_LOOP/GS_LOOP/ST_LOOP/HEADER/BHT")
errh.add_seg(node, seg, src.get_seg_count(), src.get_cur_line(), src.get_ls_id())
errh.handle_errors(src.pop_errors())
elif seg.get_seg_id() == "GE":
errh.handle_errors(src.pop_errors())
errh.close_gs_loop(node, seg, src)
elif seg.get_seg_id() == "ST":
errh.add_st_loop(seg, src)
errh.handle_errors(src.pop_errors())
elif seg.get_seg_id() == "SE":
errh.handle_errors(src.pop_errors())
errh.close_st_loop(node, seg, src)
else:
errh.add_seg(node, seg, src.get_seg_count(), src.get_cur_line(), src.get_ls_id())
errh.handle_errors(src.pop_errors())
# errh.set_cur_line(src.get_cur_line())
assert isinstance(node, pyx12.map_if.segment_if)
valid &= apply_segment_errors(node, seg, errh)
# erx.handleErrors(src.pop_errors())
# erx.handleErrors(errh.get_errors())
# errh.reset()
if callback:
try:
callback(seg, src, node, valid)
except Exception:
logger.error("callback failed")
if fd_html:
if isinstance(node, pyx12.map_if.segment_if) and node.is_first_seg_in_loop():
html.loop(node.get_parent())
err_node_list: list[Any] = []
while True:
try:
next(err_iter)
err_node = err_iter.get_cur_node()
err_node_list.append(err_node)
except pyx12.errors.IterOutOfBounds:
break
html.gen_seg(seg, src, err_node_list)
if fd_xmldoc:
xmldoc.seg(node, seg)
if False:
print("\n\n")
# erx.Write(src.cur_line)
# erx.handleErrors(src.pop_errors())
src.cleanup() # Catch any skipped loop trailers
errh.handle_errors(src.pop_errors())
# erx.handleErrors(src.pop_errors())
# erx.handleErrors(errh.get_errors())
if fd_html:
html.footer()
if fd_xmldoc:
xmldoc.close()
# visit_debug = pyx12.error_debug.error_debug_visitor(sys.stdout)
# errh.accept(visit_debug)
# If this transaction is not a 997/999, generate one.
if fd_997 and fic != "FA":
if vriic and vriic[:6] == "004010":
try:
visit_997 = pyx12.error_997.error_997_visitor(fd_997, src.get_term())
errh.accept(visit_997)
del visit_997
except Exception:
logger.exception("Failed to create 997 response")
if vriic and vriic[:6] == "005010":
try:
visit_999 = pyx12.error_999.error_999_visitor(fd_997, src.get_term())
errh.accept(visit_999)
del visit_999
except Exception:
logger.exception("Failed to create 999 response")
if fd_json:
try:
visit_json = pyx12.errh_json.errh_json_visitor(fd_json)
errh.accept(visit_json)
del visit_json
except Exception:
logger.exception("Failed to write JSON error output")
src.close()
try:
if not valid or errh.get_error_count() > 0:
return False
else:
return True
except Exception:
print(errh)
return False