Source code for pyx12.error_997

######################################################################
# Copyright
#   John Holland <john@zoner.org>
# All rights reserved.
#
# This software is licensed as described in the file LICENSE.txt, which
# you should have received as part of this distribution.
#
######################################################################

"""
Generates a 997 Response
Visitor - Visits an error_handler composite
"""

from __future__ import annotations

import logging
import time
from typing import Any, TextIO

import pyx12.error_codes
import pyx12.segment

from . import error_visitor

# Intrapackage imports
from .errors import EngineError

[docs] logger = logging.getLogger("pyx12.error_997")
logger.setLevel(logging.DEBUG)
[docs] class error_997_visitor(error_visitor.error_visitor): """ Visit an error_handler composite. Generate a 997. """
[docs] fd: TextIO
[docs] seg_term: str
[docs] ele_term: str
[docs] subele_term: str
[docs] eol: str
[docs] seg_count: int
[docs] isa_control_num: str | None
[docs] isa_seg: pyx12.segment.Segment | None
[docs] gs_loop_count: int
[docs] gs_id: str | None
[docs] gs_seg: pyx12.segment.Segment | None
[docs] st_control_num: int
[docs] st_loop_count: int
def __init__(self, fd: TextIO, term: tuple[Any, ...] = ("~", "*", "~", "\n")) -> None: """ :param fd: target file :type fd: file descriptor :param term: tuple of x12 terminators used :type term: tuple(string, string, string, string) """ self.fd = fd self.seg_term = "~" self.ele_term = "*" self.subele_term = ":" # self.seg_term = term[0] # self.ele_term = term[1] # self.subele_term = term[2] # self.eol = term[3] self.eol = "\n" self.seg_count = 0 self.isa_control_num = None self.isa_seg = None self.gs_loop_count = 0 self.gs_id = None self.gs_seg = None self.st_control_num = 0 self.st_loop_count = 0
[docs] def visit_root_pre(self, errh: Any) -> None: """ :param errh: Error handler :type errh: L{error_handler.err_handler} """ # now = time.localtime() seg = errh.cur_isa_node.seg_data # ISA*00* *00* *ZZ*ENCOUNTER *ZZ*00GR *030425*1501*U*00401*000065350*0*T*:~ self.isa_control_num = ("%s%s" % (time.strftime("%y%m%d"), time.strftime("%H%M")))[1:] icvn = seg.get_value("ISA12") isa_seg = pyx12.segment.Segment( "ISA*00* *00* ", self.seg_term, self.ele_term, self.subele_term ) isa_seg.append(seg.get_value("ISA07")) isa_seg.append(seg.get_value("ISA08")) isa_seg.append(seg.get_value("ISA05")) isa_seg.append(seg.get_value("ISA06")) isa_seg.append(time.strftime("%y%m%d")) # Date isa_seg.append(time.strftime("%H%M")) # Time isa_seg.append(seg.get_value("ISA11")) isa_seg.append(icvn) isa_seg.append(self.isa_control_num) # ISA Interchange Control Number isa_seg.append("0") # No need for TA1 response to 997 isa_seg.append(seg.get_value("ISA15")) isa_seg.append(self.subele_term) self._write(isa_seg) self.isa_seg = isa_seg self.gs_loop_count = 0 # GS*FA*ENCOUNTER*00GR*20030425*150153*653500001*X*004010 seg = errh.cur_gs_node.seg_data gs_seg = pyx12.segment.Segment("GS", "~", "*", ":") gs_seg.append("FA") gs_seg.append(seg.get_value("GS03").rstrip()) gs_seg.append(seg.get_value("GS02").rstrip()) gs_seg.append(time.strftime("%Y%m%d")) gs_seg.append(time.strftime("%H%M%S")) gs_seg.append(seg.get_value("GS06")) gs_seg.append(seg.get_value("GS07")) gs_seg.append(icvn) self._write(gs_seg) self.gs_seg = gs_seg self.gs_id = seg.get_value("GS06") # self.gs_997_count = 0 self.st_loop_count = 0 self.gs_loop_count += 1
def __get_isa_errors(self, err_isa: Any) -> list[str]: """ Build list of TA1 level errors Only the first error is used """ isa_ele_err_map = { 1: "010", 2: "011", 3: "012", 4: "013", 5: "005", 6: "006", 7: "007", 8: "008", 9: "014", 10: "015", 11: "016", 12: "017", 13: "018", 14: "019", 15: "020", 16: "027", } iea_ele_err_map = {1: "021", 2: "018"} err_codes = [err[0] for err in err_isa.errors] for elem in err_isa.elements: for err_cde, err_str, bad_value in elem.errors: # Ugly if "ISA" in err_str: err_codes.append(isa_ele_err_map[elem.ele_pos]) elif "IEA" in err_str: err_codes.append(iea_ele_err_map[elem.ele_pos]) # Codes extracted from https://msdn.microsoft.com/en-us/library/bb246074.aspx reject_suspend_codes = set( [ "004", "005", "007", "010", "011", "012", "013", "014", "015", "016", "017", "018", "022", "023", "024", "025", "026", "027", ] ) # return unique codes, while trying to keep the order of the list # this is to ensure that test result is deterministic on Python 3 uniq_codes: list[str] = [] for err in err_codes: if err in uniq_codes: continue # priotize reject codes, rather than accept with errors codes if err in reject_suspend_codes: uniq_codes.insert(0, err) else: uniq_codes.append(err) return uniq_codes
[docs] def visit_root_post(self, errh: Any) -> None: """ :param errh: Error handler :type errh: L{error_handler.err_handler} """ gs06 = self.gs_seg.get_value("GS06") if self.gs_seg is not None else "" self._write(pyx12.segment.Segment("GE*%i*%s" % (self.st_loop_count, gs06), "~", "*", ":")) self.gs_loop_count = 1 # pdb.set_trace() # TA1 segment err_isa = errh.cur_isa_node if err_isa.ta1_req == "1": # seg = ['TA1', err_isa.isa_trn_set_id, err_isa.orig_date, \ # err_isa.orig_time] ta1_seg = pyx12.segment.Segment("TA1", "~", "*", ":") ta1_seg.append(err_isa.isa_trn_set_id) ta1_seg.append(err_isa.orig_date) ta1_seg.append(err_isa.orig_time) err_codes = self.__get_isa_errors(err_isa) if err_codes: err_cde = err_codes[0] ta1_seg.append("R") ta1_seg.append(err_cde) else: ta1_seg.append("A") ta1_seg.append("000") self._write(ta1_seg) self._write( pyx12.segment.Segment( "IEA*%i*%s" % (self.gs_loop_count, self.isa_control_num), "~", "*", ":" ) )
[docs] def visit_isa_pre(self, err_isa: Any) -> None: """ :param err_isa: ISA Loop error handler :type err_isa: L{error_handler.err_isa} """
[docs] def visit_isa_post(self, err_isa: Any) -> None: """ :param err_isa: ISA Loop error handler :type err_isa: L{error_handler.err_isa} """
[docs] def visit_gs_pre(self, err_gs: Any) -> None: """ :param err_gs: GS Loop error handler :type err_gs: L{error_handler.err_gs} """ # ST self.st_control_num += 1 # seg = ['ST', '997', '%04i' % self.st_control_num] # self._write(seg) self._write(pyx12.segment.Segment("ST*997*%04i" % (self.st_control_num), "~", "*", ":")) self.seg_count = 0 self.seg_count = 1 self.st_loop_count += 1 # AK1 # seg = ['AK1', err_gs.fic, err_gs.gs_control_num] # self._write(seg) self._write( pyx12.segment.Segment("AK1*%s*%s" % (err_gs.fic, err_gs.gs_control_num), "~", "*", ":") )
def __get_gs_errors(self, err_gs: Any) -> list[str]: """ Build list of GS level errors """ gs_ele_err_map = {6: "6", 8: "2"} ge_ele_err_map = {2: "6"} err_codes = [err[0] for err in err_gs.errors] for elem in err_gs.elements: for err_cde, err_str, bad_value in elem.errors: # Ugly if "GS" in err_str: # if elem.ele_pos in gs_ele_err_map.keys(): if elem.ele_pos in gs_ele_err_map: err_codes.append(gs_ele_err_map[elem.ele_pos]) else: err_codes.append("1") elif "GE" in err_str: if elem.ele_pos in ge_ele_err_map: err_codes.append(ge_ele_err_map[elem.ele_pos]) else: err_codes.append("1") # return unique codes ret = list(set(err_codes)) ret.sort() return ret
[docs] def visit_gs_post(self, err_gs: Any) -> None: """ :param err_gs: GS Loop error handler :type err_gs: L{error_handler.err_gs} """ if not (err_gs.ack_code and err_gs.st_count_orig and err_gs.st_count_recv): # raise EngineError, 'err_gs variables not set' if not err_gs.ack_code: err_gs.ack_code = "R" if not err_gs.st_count_orig: err_gs.st_count_orig = 0 if not err_gs.st_count_recv: err_gs.st_count_recv = 0 # .st_count_orig = None # AK902 # .st_count_recv = None # AK903 # .st_count_accept = None # AK904 # .err_cde = [] # AK905-9 # seg.append(err_st.ack_code) # if '1' in err_gs.errors: ack_code = 'R' # elif '2' in err_gs.errors: ack_code = 'R' # elif '3' in err_gs.errors: ack_code = 'R' # elif '4' in err_gs.errors: ack_code = 'R' # elif '5' in err_gs.errors: ack_code = 'E' # elif '6' in err_gs.errors: ack_code = 'E' # else: ack_code = 'A' seg_data = pyx12.segment.Segment("AK9", "~", "*", ":") seg_data.append(err_gs.ack_code) seg_data.append("%i" % err_gs.st_count_orig) seg_data.append("%i" % err_gs.st_count_recv) count_ok = max(err_gs.st_count_recv - err_gs.count_failed_st(), 0) seg_data.append("%i" % (count_ok)) # seg = ['AK9', err_gs.ack_code, '%i' % err_gs.st_count_orig, \ # '%i' % err_gs.st_count_recv, \ # '%i' % (err_gs.st_count_recv - err_gs.count_failed_st())] err_codes = self.__get_gs_errors(err_gs) for err_cde in err_codes: seg_data.append("%s" % err_cde) # seg.append('%s' % err_cde) self._write(seg_data) # for child in err_gs.children: # print(child.cur_line, child.seg) # logger.info('err_gs has %i children' % len(self.children)) # SE seg_count = self.seg_count + 1 seg_data = pyx12.segment.Segment("SE", "~", "*", ":") seg_data.append("%i" % seg_count) seg_data.append("%04i" % self.st_control_num) # seg = ['SE', '%i' % seg_count, '%04i' % self.st_control_num] self._write(seg_data)
[docs] def visit_st_pre(self, err_st: Any) -> None: """ :param err_st: ST Loop error handler :type err_st: L{error_handler.err_st} """ seg_data = pyx12.segment.Segment("AK2", "~", "*", ":") seg_data.append(err_st.trn_set_id) seg_data.append(err_st.trn_set_control_num.strip()) self._write(seg_data)
def __get_st_errors(self, err_st: Any) -> list[str]: """ Build list of ST level errors """ st_ele_err_map = {1: "6", 2: "7"} se_ele_err_map = {1: "6", 2: "7"} err_codes = [err[0] for err in err_st.errors] if err_st.child_err_count() > 0: err_codes.append("5") for elem in err_st.elements: for err_cde, err_str, bad_value in elem.errors: # Ugly if "ST" in err_str: err_codes.append(st_ele_err_map[elem.ele_pos]) elif "SE" in err_str: err_codes.append(se_ele_err_map[elem.ele_pos]) # return unique codes ret = list(set(err_codes)) ret.sort() return ret
[docs] def visit_st_post(self, err_st: Any) -> None: """ :param err_st: ST Loop error handler :type err_st: L{error_handler.err_st} """ if err_st.ack_code is None: raise EngineError("err_st.ack_cde variable not set") # self.ack_code = None # AK501 seg_data = pyx12.segment.Segment("AK5", "~", "*", ":") # self.err_cde = [] # AK502-6 # seg.append(err_st.ack_code) seg_data.append(err_st.ack_code) err_codes = self.__get_st_errors(err_st) for i in range(min(len(err_codes), 5)): seg_data.append(err_codes[i]) # seg.append(err_codes[i]) self._write(seg_data)
[docs] def visit_seg(self, err_seg: Any) -> None: """ :param err_seg: Segment error handler :type err_seg: L{error_handler.err_seg} """ seg_base = pyx12.segment.Segment("AK3", "~", "*", ":") seg_base.append(err_seg.seg_id) seg_base.append("%i" % err_seg.seg_count) if err_seg.ls_id: seg_base.append(err_seg.ls_id) else: seg_base.append("") seg_str = seg_base.format("~", "*", ":") # Every err_cde reaching this point is a pyx12 code from # pyx12.error_codes.ERROR_CODES (PRs #175-#178 migrated all # producers). None spec or None ak_code means the code does not # surface in the 4010 AK3 channel (e.g. parser HL1/HL2/LX). emitted: set[str] = set() for err_cde in [x[0] for x in err_seg.errors]: spec = pyx12.error_codes.ERROR_CODES.get(err_cde) if spec is None or spec.ak_code is None: continue ak_code = spec.ak_code if ak_code in emitted: continue seg_data = pyx12.segment.Segment(seg_str, "~", "*", ":") seg_data.set("AK304", ak_code) self._write(seg_data) emitted.add(ak_code) if err_seg.child_err_count() > 0 and "8" not in emitted: seg_data = pyx12.segment.Segment(seg_str, "~", "*", ":") seg_data.set("AK304", "8") self._write(seg_data)
[docs] def visit_ele(self, err_ele: Any) -> None: """ :param err_ele: Segment error handler :type err_ele: L{error_handler.err_ele} """ seg_base = pyx12.segment.Segment("AK4", "~", "*", ":") if err_ele.subele_pos: seg_base.append("%i:%i" % (err_ele.ele_pos, err_ele.subele_pos)) else: seg_base.append("%i" % (err_ele.ele_pos)) if err_ele.ele_ref_num: seg_base.append(err_ele.ele_ref_num) seg_str = seg_base.format("~", "*", ":") for err_cde, err_str, bad_value in err_ele.errors: spec = pyx12.error_codes.ERROR_CODES.get(err_cde) if spec is None or spec.ak_code is None: continue seg_data = pyx12.segment.Segment(seg_str, "~", "*", ":") seg_data.set("AK403", spec.ak_code) if bad_value: seg_data.set("AK404", error_visitor.ascii_only(bad_value)) self._write(seg_data)
def _write(self, seg_data: pyx12.segment.Segment) -> None: """ Params: seg_data - :param seg_data: Data segment instance :type seg_data: L{segment.Segment} """ sout = seg_data.format(self.seg_term, self.ele_term, self.subele_term) if seg_data.get_seg_id() == "ISA": sout = sout[:-1] + self.ele_term + self.subele_term + self.seg_term self.fd.write("%s\n" % (sout)) self.seg_count += 1