Source code for pyx12.x12metadata

from __future__ import annotations

import logging
from typing import Any, TextIO

# Intrapackage imports
import pyx12.error_handler
import pyx12.errors
import pyx12.map_if
import pyx12.map_index
import pyx12.params
import pyx12.x12file
from pyx12.map_walker import apply_walk_errors, walk_tree


[docs] def get_x12file_metadata( param: pyx12.params.ParamsBase, src_file: str | TextIO, map_path: str | None = None, do_node_summary: bool = False, ) -> tuple[bool, dict[str, Any] | None, dict[str, Any] | None]: logger = logging.getLogger("pyx12") errh = pyx12.error_handler.errh_null() # Get X12 DATA file try: src = pyx12.x12file.X12Reader(src_file) except pyx12.errors.X12Error: logger.error('"%s" does not look like an X12 data file' % (src_file)) return (False, None, None) # Get Map of Control Segments map_file = "x12.control.00501.xml" if src.icvn == "00501" else "x12.control.00401.xml" logger.debug("X12 control file: %s" % (map_file)) control_map = pyx12.map_if.load_map_file(map_file, param, map_path) map_index_if = pyx12.map_index.map_index(map_path) node = control_map.getnodebypath("/ISA_LOOP/ISA") walker = walk_tree() icvn: str | None = None fic: str | None = None vriic: str | None = None tspc: str | None = None cur_map: Any = None # we do not initially know the X12 transaction type isa_data: dict[str, Any] = {} node_summary: dict[str, Any] | None node_ordinal = 0 last_x12_segment_path: str | None = None if do_node_summary: node_summary = {} else: node_summary = None for seg in src: if seg.get_seg_id() == "ISA": node = control_map.getnodebypath("/ISA_LOOP/ISA") walker.forceWalkCounterToLoopStart("/ISA_LOOP", "/ISA_LOOP/ISA") elif seg.get_seg_id() == "GS": node = control_map.getnodebypath("/ISA_LOOP/GS_LOOP/GS") walker.forceWalkCounterToLoopStart("/ISA_LOOP/GS_LOOP", "/ISA_LOOP/GS_LOOP/GS") else: # from the current node, find the map node matching the segment # keep track of the loops traversed try: (node, pop_loops, push_loops, walk_errors) = walker.walk_errors( node, seg, src.get_seg_count(), src.get_cur_line(), src.get_ls_id() ) apply_walk_errors(errh, walk_errors) except pyx12.errors.EngineError: logger.error("Source file line %i" % (src.get_cur_line())) raise if node is None: raise pyx12.errors.EngineError("Node not found") if seg.get_seg_id() == "ISA": icvn = seg.get_value("ISA12") elif seg.get_seg_id() == "IEA": pass elif seg.get_seg_id() == "GS": fic = seg.get_value("GS01") vriic = seg.get_value("GS08") map_file_new = map_index_if.get_filename(icvn, vriic, fic) if map_file != map_file_new: if map_file_new is None: err_str = f"Map not found. icvn={icvn}, fic={fic}, vriic={vriic}" raise pyx12.errors.EngineError(err_str) map_file = map_file_new cur_map = pyx12.map_if.load_map_file(map_file, param, map_path) src.check_837_lx = True if cur_map.id == "837" else False logger.debug("Map file: %s" % (map_file)) node = cur_map.getnodebypath("/ISA_LOOP/GS_LOOP/GS") elif seg.get_seg_id() == "BHT": # special case for 4010 837P if vriic in ("004010X094", "004010X094A1"): tspc = seg.get_value("BHT02") logger.debug("icvn=%s, fic=%s, vriic=%s, tspc=%s" % (icvn, fic, vriic, tspc)) map_file_new = map_index_if.get_filename(icvn, vriic, fic, tspc) logger.debug("New map file: %s" % (map_file_new)) if map_file != map_file_new: if map_file_new is None: err_str = ( f"Map not found. icvn={icvn}, fic={fic}, vriic={vriic}, tspc={tspc}" ) raise pyx12.errors.EngineError(err_str) map_file = map_file_new cur_map = pyx12.map_if.load_map_file(map_file, param, map_path) src.check_837_lx = True if cur_map.id == "837" else False logger.debug("Map file: %s" % (map_file)) node = cur_map.getnodebypath("/ISA_LOOP/GS_LOOP/ST_LOOP/HEADER/BHT") if seg.get_seg_id() == "ISA": isa_data = { "InterchangeSenderIDQualifier": seg.get_value("ISA05"), "InterchangeSenderID": seg.get_value("ISA06"), "InterchangeReceiverIDQualifier": seg.get_value("ISA07"), "InterchangeReceiverID": seg.get_value("ISA08"), "InterchangeDate": seg.get_value("ISA09"), "InterchangeTime": seg.get_value("ISA10"), "InterchangeControlStandardsIdentifier": seg.get_value("ISA11"), "InterchangeControlVersionNumber": seg.get_value("ISA12"), "InterchangeControlNumber": seg.get_value("ISA13"), "AcknowledgmentRequested": seg.get_value("ISA14"), "UsageIndicator": seg.get_value("ISA15"), "GSLoops": [], } icvn = isa_data["InterchangeControlVersionNumber"] elif seg.get_seg_id() == "IEA": isa_data["NumberofIncludedFunctionalGroups"] = seg.get_value("IEA01") elif seg.get_seg_id() == "GS": gs_data: dict[str, Any] = { "FunctionalGroupHeader": seg.get_value("GS01"), "ApplicationSendersCode": seg.get_value("GS02"), "ApplicationReceiversCode": seg.get_value("GS03"), "FunctionalGroupDate": seg.get_value("GS04"), "FunctionalGroupTime": seg.get_value("GS05"), "GroupControlNumber": seg.get_value("GS06"), "ResponsibleAgencyCode": seg.get_value("GS07"), "VersionReleaseIndustryIdentifierCode": seg.get_value("GS08"), "STLoops": [], } elif seg.get_seg_id() == "GE": gs_data["NumberofTransactionSetsIncluded"] = seg.get_value("GE01") isa_data["GSLoops"].append(gs_data) elif seg.get_seg_id() == "ST": st_data = { "TransactionSetIdentifierCode": seg.get_value("ST01"), "TransactionSetControlNumber": seg.get_value("ST02"), "ImplementationConventionReference": seg.get_value("ST03"), } elif seg.get_seg_id() == "SE": st_data["TransactionSegmentCount"] = seg.get_value("SE01") gs_data["STLoops"].append(st_data) elif seg.get_seg_id() == "BHT": st_data["HierarchicalStructureCode"] = seg.get_value("BHT01") st_data["TransactionSetPurposeCode"] = seg.get_value("BHT02") st_data["OriginatorApplicationTransactionIdentifier"] = seg.get_value("BHT03") st_data["TransactionSetCreationDate"] = seg.get_value("BHT04") st_data["TransactionSetCreationTime"] = seg.get_value("BHT05") st_data["ClaimorEncounterIdentifier"] = seg.get_value("BHT06") assert isinstance(node, pyx12.map_if.segment_if) assert node.parent is not None x12path = node.get_path() # parent if do_node_summary and node_summary is not None: if x12path in node_summary: node_summary[x12path]["Count"] += 1 if last_x12_segment_path not in node_summary[x12path]["prefix_nodes"]: node_summary[x12path]["prefix_nodes"].append(last_x12_segment_path) else: node_summary[x12path] = { "Ordinal": node_ordinal, "Count": 1, "NodeType": node.base_name, "Id": node.id, "Name": node.name, "ParentName": node.parent.name, "LoopMaxUse": node.max_use, "ParentPath": node.parent.get_path(), "prefix_nodes": [last_x12_segment_path], } node_ordinal += 1 for refdes, ele_ord, comp_ord, val in seg.values_iterator(): ele_node = node.getnodebypath2(refdes) if isinstance(ele_node, pyx12.map_if.composite_if): ele_node = ele_node.get_child_node_by_ordinal(1) assert isinstance(ele_node, pyx12.map_if.element_if) assert ele_node.parent is not None elepath = ele_node.get_path() if elepath in node_summary: node_summary[elepath]["Count"] += 1 else: node_summary[elepath] = { "Ordinal": node_ordinal, "Count": 1, "NodeType": ele_node.base_name, "Id": ele_node.id, "Name": ele_node.name, "ParentName": ele_node.parent.name, "ParentPath": ele_node.parent.get_path(), "Usage": ele_node.usage, "DataType": ele_node.data_type, "MinLength": ele_node.min_len, "MaxLength": ele_node.max_len, } node_ordinal += 1 last_x12_segment_path = x12path src.close() return (True, isa_data, node_summary)
[docs] def get_x12file_metadata_headers( param: pyx12.params.ParamsBase, src_file: str | TextIO, map_path: str | None = None, ) -> tuple[bool, dict[str, Any] | None]: logger = logging.getLogger("pyx12") # Get X12 DATA file try: src = pyx12.x12file.X12Reader(src_file) except pyx12.errors.X12Error: logger.error('"%s" does not look like an X12 data file' % (src_file)) return (False, None) isa_data: dict[str, Any] | None = None for seg in src: if seg.get_seg_id() == "ISA": isa_data = { "InterchangeSenderIDQualifier": seg.get_value("ISA05"), "InterchangeSenderID": seg.get_value("ISA06"), "InterchangeReceiverIDQualifier": seg.get_value("ISA07"), "InterchangeReceiverID": seg.get_value("ISA08"), "InterchangeDate": seg.get_value("ISA09"), "InterchangeTime": seg.get_value("ISA10"), "InterchangeControlStandardsIdentifier": seg.get_value("ISA11"), "InterchangeControlVersionNumber": seg.get_value("ISA12"), "InterchangeControlNumber": seg.get_value("ISA13"), "AcknowledgmentRequested": seg.get_value("ISA14"), "UsageIndicator": seg.get_value("ISA15"), "GSLoops": [], } elif seg.get_seg_id() == "IEA" and isa_data is not None: isa_data["NumberofIncludedFunctionalGroups"] = seg.get_value("IEA01") elif seg.get_seg_id() == "GS": gs_data: dict[str, Any] = { "FunctionalGroupHeader": seg.get_value("GS01"), "ApplicationSendersCode": seg.get_value("GS02"), "ApplicationReceiversCode": seg.get_value("GS03"), "FunctionalGroupDate": seg.get_value("GS04"), "FunctionalGroupTime": seg.get_value("GS05"), "GroupControlNumber": seg.get_value("GS06"), "ResponsibleAgencyCode": seg.get_value("GS07"), "VersionReleaseIndustryIdentifierCode": seg.get_value("GS08"), "STLoops": [], } elif seg.get_seg_id() == "GE" and isa_data is not None: gs_data["NumberofTransactionSetsIncluded"] = seg.get_value("GE01") isa_data["GSLoops"].append(gs_data) elif seg.get_seg_id() == "ST": st_data = { "TransactionSetIdentifierCode": seg.get_value("ST01"), "TransactionSetControlNumber": seg.get_value("ST02"), "ImplementationConventionReference": seg.get_value("ST03"), } elif seg.get_seg_id() == "SE": st_data["TransactionSegmentCount"] = seg.get_value("SE01") gs_data["STLoops"].append(st_data) elif seg.get_seg_id() == "BHT": st_data["HierarchicalStructureCode"] = seg.get_value("BHT01") st_data["TransactionSetPurposeCode"] = seg.get_value("BHT02") st_data["OriginatorApplicationTransactionIdentifier"] = seg.get_value("BHT03") st_data["TransactionSetCreationDate"] = seg.get_value("BHT04") st_data["TransactionSetCreationTime"] = seg.get_value("BHT05") st_data["ClaimorEncounterIdentifier"] = seg.get_value("BHT06") src.close() return (True, isa_data)