Source code for pyx12.error_handler

######################################################################
# Copyright
#   John Holland <john@zoner.org>
# All rights reserved.
#
# This software is licensed as described in the file LICENSE.txt, which
# you should have received as part of this distribution.
#
######################################################################

"""
Interface to X12 Errors
"""

from __future__ import annotations

import logging
from typing import Any, cast

# Intrapackage imports
import pyx12.segment

from .errors import IterOutOfBounds  # , IterDone

[docs] logger = logging.getLogger("pyx12.error_handler")
[docs] class err_iter: """ Iterate over the error tree Implements an odd iterator??? """
[docs] errh: Any
[docs] cur_node: Any
[docs] visit_stack: list[Any]
def __init__(self, errh: Any) -> None: """ :param errh: Error_handler instance :type errh: L{error_handler.err_handler} """ self.errh = errh self.cur_node = errh self.visit_stack = []
[docs] def first(self) -> None: self.cur_node = self.errh
[docs] def next(self) -> None: self.__next__()
def __next__(self) -> None: # If at previosly visited branch, do not do children if self.cur_node in self.visit_stack: node = None else: node = self.cur_node.get_first_child() if node is not None: self.visit_stack.append(self.cur_node) self.cur_node = node else: node = self.cur_node.get_next_sibling() if node is not None: self.cur_node = node else: if not self.cur_node.is_closed(): raise IterOutOfBounds node = self.cur_node.get_parent() if node is None: raise IterOutOfBounds if not node.is_closed(): raise IterOutOfBounds if self.cur_node in self.visit_stack: del self.visit_stack[-1] self.cur_node = node if node.id == "ROOT": raise IterOutOfBounds # raise IterDone
[docs] def get_cur_node(self) -> Any: return self.cur_node
[docs] class err_handler: """ The interface to the error handling structures. """
[docs] id: str
[docs] children: list[Any]
[docs] cur_node: Any
[docs] cur_isa_node: Any
[docs] cur_gs_node: Any
[docs] cur_st_node: Any
[docs] cur_seg_node: Any
[docs] seg_node_added: bool
[docs] cur_ele_node: Any
[docs] ele_node_added: bool
[docs] cur_line: int
def __init__(self) -> None: """ """ self.id = "ROOT" self.children = [] self.cur_node = self self.cur_isa_node = None self.cur_gs_node = None self.cur_st_node = None self.cur_seg_node = None self.seg_node_added = False self.cur_ele_node = None self.ele_node_added = False self.cur_line = 0
[docs] def accept(self, visitor: Any) -> None: """ Params: visitor - ref to visitor class """ visitor.visit_root_pre(self) for child in self.children: child.accept(visitor) visitor.visit_root_post(self)
[docs] def handle_errors(self, err_list: list[tuple[str, str, str, Any, int | None]]) -> None: """ :param err_list: list of errors to apply """ for err_type, err_cde, err_str, err_val, src_line in err_list: if err_type == "isa": self.isa_error(err_cde, err_str) elif err_type == "gs": self.gs_error(err_cde, err_str) elif err_type == "st": self.st_error(err_cde, err_str) elif err_type == "seg": self.seg_error(err_cde, err_str, err_val, src_line)
[docs] def get_cur_line(self) -> int: """ :return: Current file line number :rtype: int """ return self.cur_line
[docs] def get_id(self) -> str: """ """ return self.id
[docs] def add_isa_loop(self, seg_data: pyx12.segment.Segment, src: Any) -> None: """ :param seg_data: Segment object :type seg_data: L{segment<segment.Segment>} """ self.children.append(err_isa(self, seg_data, src)) self.cur_isa_node = self.children[-1] self.cur_seg_node = self.cur_isa_node self.seg_node_added = True
[docs] def add_gs_loop(self, seg_data: pyx12.segment.Segment, src: Any) -> None: """ :param seg_data: Segment object :type seg_data: L{segment<segment.Segment>} """ parent = self.cur_isa_node parent.children.append(err_gs(parent, seg_data, src)) self.cur_gs_node = parent.children[-1] self.cur_seg_node = self.cur_gs_node self.seg_node_added = True
[docs] def add_st_loop(self, seg_data: pyx12.segment.Segment, src: Any) -> None: """ :param seg_data: Segment object :type seg_data: L{segment<segment.Segment>} """ parent = self.cur_gs_node parent.children.append(err_st(parent, seg_data, src)) self.cur_st_node = parent.children[-1] self.cur_seg_node = self.cur_st_node self.seg_node_added = True
[docs] def add_seg( self, map_node: Any, seg_data: pyx12.segment.Segment, seg_count: int, cur_line: int, ls_id: str | None, ) -> None: """ :param map_node: current segment node :type map_node: L{node<map_if.segment_if>} :param seg_data: Segment object :type seg_data: L{segment<segment.Segment>} :param seg_count: Count of current segment in the ST Loop :type seg_count: int :param cur_line: Current line number in the file :type cur_line: int :param ls_id: The current LS loop identifier :type ls_id: string """ parent = self.cur_st_node self.cur_seg_node = err_seg(parent, map_node, seg_data, seg_count, cur_line, ls_id) self.seg_node_added = False
def _add_cur_seg(self) -> None: """ """ if not self.seg_node_added: self.cur_st_node.children.append(self.cur_seg_node) self.seg_node_added = True
[docs] def add_ele(self, map_node: Any) -> None: """ """ if self.cur_seg_node.id == "ISA": self.cur_ele_node = err_ele(self.cur_isa_node, map_node) elif self.cur_seg_node.id == "GS": self.cur_ele_node = err_ele(self.cur_gs_node, map_node) elif self.cur_seg_node.id == "ST": self.cur_ele_node = err_ele(self.cur_st_node, map_node) else: self.cur_ele_node = err_ele(self.cur_seg_node, map_node) self.ele_node_added = False
def _add_cur_ele(self) -> None: """ """ self._add_cur_seg() if not self.ele_node_added and self.cur_seg_node is not None: self.cur_seg_node.elements.append(self.cur_ele_node) self.ele_node_added = True
[docs] def isa_error(self, err_cde: str, err_str: str) -> None: """ :param err_cde: ISA level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ sout = "" sout += "Line:%i " % (self.cur_isa_node.get_cur_line()) sout += "ISA:%s - %s" % (err_cde, err_str) logger.error(sout) self.cur_isa_node.add_error(err_cde, err_str)
[docs] def gs_error(self, err_cde: str, err_str: str) -> None: """ :param err_cde: GS level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ sout = "" sout += "Line:%i " % (self.cur_gs_node.get_cur_line()) sout += "GS:%s - %s" % (err_cde, err_str) logger.error(sout) self.cur_gs_node.add_error(err_cde, err_str)
[docs] def st_error(self, err_cde: str, err_str: str) -> None: """ :param err_cde: Segment level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ sout = "" sout += "Line:%i " % (self.cur_st_node.get_cur_line()) sout += "ST:%s - %s" % (err_cde, err_str) logger.error(sout) self.cur_st_node.add_error(err_cde, err_str)
[docs] def seg_error( self, err_cde: str, err_str: str, err_value: str | None = None, src_line: int | None = None, ) -> None: """ :param err_cde: Segment level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ sout = "" try: self._add_cur_seg() self.cur_seg_node.add_error(err_cde, err_str, err_value) except Exception: sout += "No current segment in error_handler. " if src_line: sout += "Line:%i " % (src_line) else: if self.cur_seg_node is not None: sout += "Line:%i " % (self.cur_seg_node.get_cur_line()) sout += "SEG:%s - %s" % (err_cde, err_str) if err_value: sout += " (%s)" % err_value logger.error(sout)
[docs] def ele_error( self, err_cde: str, err_str: str, bad_value: str | None, refdes: str | None = None, ) -> None: """ :param err_cde: Element level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self._add_cur_ele() self.cur_ele_node.add_error(err_cde, err_str, bad_value) sout = "" sout += "Line:%i " % (self.cur_seg_node.get_cur_line()) sout += "ELE:%s - %s" % (err_cde, err_str) if bad_value: sout += " (%s)" % (bad_value) logger.error(sout)
[docs] def close_isa_loop(self, node: Any, seg: Any, src: Any) -> None: """ """ self.cur_isa_node.close(node, seg, src) self.cur_seg_node = self.cur_isa_node self.seg_node_added = True
[docs] def close_gs_loop(self, node: Any, seg: Any, src: Any) -> None: """ """ self.cur_gs_node.close(node, seg, src) self.cur_seg_node = self.cur_gs_node self.seg_node_added = True
[docs] def close_st_loop(self, node: Any, seg: Any, src: Any) -> None: """ """ self.cur_st_node.close(node, seg, src) self.cur_seg_node = self.cur_st_node self.seg_node_added = True
[docs] def find_node(self, type: str) -> None: """ Find the last node of a type """ new_node = self.cur_node node_order = {"ROOT": 1, "ISA": 2, "GS": 3, "ST": 4, "SEG": 5, "ELE": 6} while node_order[type] > new_node[new_node.get_id()]: new_node = new_node.get_parent()
def _get_last_child(self) -> Any: """ """ if len(self.children) != 0: return self.children[-1] else: return None
[docs] def get_parent(self) -> None: return None
[docs] def get_error_count(self) -> int: """ """ count = 0 for child in self.children: count += child.get_error_count() return count
[docs] def get_first_child(self) -> Any: """ """ if len(self.children) > 0: return self.children[0] else: return None
[docs] def get_next_sibling(self) -> None: """ """ return None
def __next__(self) -> Any: """ Return the next error node """ yield from self.children
[docs] def is_closed(self) -> bool: """ :rtype: boolean """ return True
def __repr__(self) -> str: """ """ return "%i: %s" % (-1, self.id)
[docs] class err_node:
[docs] parent: Any
[docs] id: str | None
[docs] children: list[Any]
[docs] cur_line: int
[docs] errors: list[Any]
def __init__(self, parent: Any) -> None: """ """ self.parent = parent self.id = None self.children = [] self.cur_line = -1 self.errors = []
[docs] def accept(self, visitor: Any) -> None: """ """ pass
[docs] def get_cur_line(self) -> int: """ :return: Current file line number :rtype: int """ return self.cur_line
[docs] def get_id(self) -> str | None: """ """ return self.id
[docs] def get_parent(self) -> Any: """ """ return self.parent
def _get_last_child(self) -> Any: """ """ if len(self.children) != 0: return self.children[-1] else: return None
[docs] def get_next_sibling(self) -> Any: """ """ bFound = False for sibling in self.parent.children: if bFound: return sibling if sibling is self: bFound = True return None
[docs] def get_first_child(self) -> Any: """ """ if len(self.children) > 0: return self.children[0] else: return None
[docs] def get_error_count(self) -> int: """ """ count = 0 for child in self.children: count += child.get_error_count() return count
[docs] def get_error_list(self, seg_id: str | None, pre: bool = False) -> list[Any]: """ """ return self.errors
[docs] def is_closed(self) -> bool: """ :rtype: boolean """ return True
[docs] class err_isa(err_node): """ Holds source ISA loop errors """
[docs] seg_data: pyx12.segment.Segment
[docs] isa_id: str | None
[docs] cur_line_isa: int
[docs] cur_line_iea: int | None
[docs] isa_trn_set_id: str | None
[docs] ta1_req: str | None
[docs] orig_date: str | None
[docs] orig_time: str | None
[docs] elements: list[Any]
def __init__(self, parent: Any, seg_data: pyx12.segment.Segment, src: Any) -> None: """ :param seg_data: Segment object :type seg_data: L{segment<segment.Segment>} :param src: X12file source :type src: L{X12file<x12file.X12Reader>} """ self.seg_data = seg_data self.isa_id = src.get_isa_id() self.cur_line_isa = src.get_cur_line() self.cur_line_iea = None self.isa_trn_set_id = seg_data.get_value("ISA13") self.ta1_req = seg_data.get_value("ISA14") self.orig_date = seg_data.get_value("ISA09") self.orig_time = seg_data.get_value("ISA10")
[docs] self.id = "ISA"
[docs] self.parent = parent
[docs] self.children = []
[docs] self.errors = []
self.elements = []
[docs] def is_closed(self) -> bool: """ :rtype: boolean """ if self.cur_line_iea: return True else: return False
[docs] def accept(self, visitor: Any) -> None: """ Params: visitor - ref to visitor class """ visitor.visit_isa_pre(self) for child in self.children: child.accept(visitor) visitor.visit_isa_post(self)
[docs] def add_error(self, err_cde: str, err_str: str) -> None: """ :param err_cde: Error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.errors.append((err_cde, err_str))
[docs] def close(self, node: Any, seg: Any, src: Any) -> None: self.cur_line_iea = src.get_cur_line()
[docs] def get_cur_line(self) -> int: """ :return: Current file line number :rtype: int """ if self.cur_line_iea: return self.cur_line_iea else: return self.cur_line_isa
[docs] def get_error_count(self) -> int: """ """ count = 0 for ele in self.elements: count += ele.get_error_count() for child in self.children: count += child.get_error_count() return count + len(self.errors)
[docs] def get_error_list(self, seg_id: str | None, pre: bool = False) -> list[Any]: """ """ if seg_id == "ISA": return [err for err in self.errors if "ISA" in err[0]] elif seg_id == "IEA": return [err for err in self.errors if "IEA" in err[0]] else: return []
def __next__(self) -> Any: """ Return the next error node """ yield from self.children next(self.parent) def __repr__(self) -> str: return "%i: %s" % (self.get_cur_line(), self.id)
[docs] class err_gs(err_node): """ Holds source GS loop information """
[docs] seg_data: pyx12.segment.Segment
[docs] isa_id: str | None
[docs] cur_line_gs: int
[docs] cur_line_ge: int | None
[docs] gs_control_num: str | None
[docs] fic: str | None
[docs] vriic: str | None
[docs] st_loops: list[Any]
[docs] ack_code: str | None
[docs] st_count_orig: int
[docs] st_count_recv: int
[docs] elements: list[Any]
def __init__(self, parent: Any, seg_data: pyx12.segment.Segment, src: Any) -> None: """ :param seg_data: Segment object :type seg_data: L{segment<segment.Segment>} :param src: X12file source :type src: L{X12file<x12file.X12Reader>} """ self.seg_data = seg_data self.isa_id = src.get_isa_id() self.cur_line_gs = src.get_cur_line() self.cur_line_ge = None self.gs_control_num = src.get_gs_id() self.fic = self.seg_data.get_value("GS01") self.vriic = self.seg_data.get_value("GS08")
[docs] self.id = "GS"
self.st_loops = [] # From GE loop self.ack_code = None # AK901 self.st_count_orig = 0 # AK902 self.st_count_recv = 0 # AK903
[docs] self.parent = parent
[docs] self.children = []
[docs] self.errors = []
self.elements = []
[docs] def accept(self, visitor: Any) -> None: """ Params: visitor - ref to visitor class """ visitor.visit_gs_pre(self) for child in self.children: child.accept(visitor) visitor.visit_gs_post(self)
[docs] def add_error(self, err_cde: str, err_str: str) -> None: """ :param err_cde: Error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.errors.append((err_cde, err_str))
[docs] def close(self, node: Any, seg_data: pyx12.segment.Segment | None, src: Any) -> None: """ """ self.cur_line_ge = src.get_cur_line() self.ack_code = self._get_ack_code() if seg_data is None: self.st_count_orig = 0 else: self.st_count_orig = int(cast(str, seg_data.get_value("GE01"))) self.st_count_recv = src.st_count # AK903
def _get_ack_code(self) -> str: for child in self.children: if child.get_error_count() > 0: return "R" if len(self.errors) > 0: return "R" return "A"
[docs] def count_failed_st(self) -> int: ct = 0 for child in self.children: if child.ack_code not in ["A", "E"]: ct += 1 return ct
[docs] def get_cur_line(self) -> int: """ :return: Current file line number :rtype: int """ if self.cur_line_ge: return self.cur_line_ge else: return self.cur_line_gs
[docs] def get_error_count(self) -> int: """ """ count = 0 for ele in self.elements: count += ele.get_error_count() for child in self.children: count += child.get_error_count() return count + len(self.errors)
[docs] def get_error_list(self, seg_id: str | None, pre: bool = False) -> list[Any]: """ """ if seg_id == "GS": return [err for err in self.errors if err[0] in ("6")] elif seg_id == "GE": return [err for err in self.errors if err[0] not in ("6")] else: return []
[docs] def is_closed(self) -> bool: """ :rtype: boolean """ if self.cur_line_ge: return True else: return False
def __next__(self) -> Any: """ Return the next error node """ yield from self.children next(self.parent) def __repr__(self) -> str: return "%i: %s" % (self.get_cur_line(), self.id)
[docs] class err_st(err_node): """ ST loops Needs: 1. Transaction set id code (837, 834) 2. Transaction set control number 3. trn set error codes 4. At SE, Determine final ack code """
[docs] seg_data: pyx12.segment.Segment
[docs] trn_set_control_num: str | None
[docs] cur_line_st: int
[docs] cur_line_se: int | None
[docs] trn_set_id: str | None
[docs] vriic: str | None
[docs] ack_code: str
[docs] elements: list[Any]
def __init__(self, parent: Any, seg_data: pyx12.segment.Segment, src: Any) -> None: """ :param seg_data: Segment object :type seg_data: L{segment<segment.Segment>} :param src: X12file source :type src: L{X12file<x12file.X12Reader>} """ self.seg_data = seg_data self.trn_set_control_num = src.get_st_id() self.cur_line_st = src.get_cur_line() self.cur_line_se = None self.trn_set_id = seg_data.get_value("ST01") self.vriic = seg_data.get_value("ST03")
[docs] self.id = "ST"
self.ack_code = "R"
[docs] self.parent = parent
[docs] self.children = []
[docs] self.errors = []
self.elements = []
[docs] def accept(self, visitor: Any) -> None: """ Params: visitor - ref to visitor class """ visitor.visit_st_pre(self) for child in self.children: child.accept(visitor) visitor.visit_st_post(self)
[docs] def add_error(self, err_cde: str, err_str: str) -> None: """ :param err_cde: Error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.errors.append((err_cde, err_str))
[docs] def close(self, node: Any, seg_data: pyx12.segment.Segment | None, src: Any) -> None: """ Close ST loop :param node: SE node :type node: L{node<map_if.x12_node>} :param seg_data: Segment object :type seg_data: L{segment<segment.Segment>} :param src: X12file source :type src: L{X12file<x12file.X12Reader>} """ self.cur_line_se = src.get_cur_line() if self.err_count() > 0: self.ack_code = "R" else: self.ack_code = "A"
[docs] def err_count(self) -> int: """ :return: Count of ST/SE loop errors :rtype: int """ seg_err_ct = 0 if self.child_err_count() > 0: seg_err_ct = 1 return len(self.errors) + seg_err_ct
[docs] def get_error_count(self) -> int: return self.err_count()
[docs] def get_error_list(self, seg_id: str | None, pre: bool = False) -> list[Any]: """ """ if seg_id == "ST": return [err for err in self.errors if err[0] in ("1", "6", "7", "23")] elif seg_id == "SE": return [err for err in self.errors if err[0] not in ("1", "6", "7", "23")] else: return []
[docs] def child_err_count(self) -> int: ct = 0 for child in self.children: if child.err_count() > 0: ct += 1 return ct
[docs] def get_cur_line(self) -> int: """ :return: Current file line number :rtype: int """ if self.cur_line_se: return self.cur_line_se else: return self.cur_line_st
[docs] def is_closed(self) -> bool: """ :rtype: boolean """ if self.cur_line_se: return True else: return False
def __next__(self) -> Any: """ Return the next error node """ yield from self.children return def __repr__(self) -> str: return "%i: %s" % (self.get_cur_line(), self.id)
[docs] class err_seg(err_node): """ Segment Errors """
[docs] name: str
[docs] pos: int
[docs] seg_id: str | None
[docs] seg_count: int
[docs] ls_id: str | None
[docs] elements: list[Any]
def __init__( self, parent: Any, map_node: Any, seg_data: pyx12.segment.Segment, seg_count: int, cur_line: int, ls_id: str | None, ) -> None: """ Needs: 1. seg_id_code 2. seg_pos - pos in ST loop 3. loop_id - LS loop id 4. seg_count - in parent """
[docs] self.parent = parent
if map_node is None: self.name = "Unknown" self.pos = -1 else: self.name = map_node.name self.pos = map_node.pos self.seg_id = seg_data.get_seg_id() self.seg_count = seg_count
[docs] self.cur_line = cur_line
self.ls_id = ls_id
[docs] self.id = "SEG"
self.elements = []
[docs] self.errors = []
[docs] def accept(self, visitor: Any) -> None: """ Params: visitor - ref to visitor class """ visitor.visit_seg(self) for elem in self.elements: elem.accept(visitor)
[docs] def add_error(self, err_cde: str, err_str: str, err_value: str | None = None) -> None: """ :param err_cde: Error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.errors.append((err_cde, err_str, err_value))
[docs] def err_count(self) -> int: """ Returns: count of errors """ ele_err_ct = 0 if self.child_err_count() > 0: ele_err_ct = 1 return len(self.errors) + ele_err_ct
[docs] def get_error_count(self) -> int: return self.err_count()
[docs] def child_err_count(self) -> int: ct = 0 for ele in self.elements: if ele.err_count() > 0: ct += 1 return ct
def __next__(self) -> Any: """ Desc: Return the next error node """ return self def __repr__(self) -> str: return "%i: %s %s" % (self.get_cur_line(), self.id, self.seg_id)
[docs] def get_first_child(self) -> None: return None
[docs] class err_ele(err_node): """ Element Errors - Holds and generates output for element and composite/sub-element errors Each element with an error creates a new err_ele instance. """
[docs] ele_ref_num: str | None
[docs] name: str | None
[docs] ele_pos: int
[docs] subele_pos: int | None
[docs] repeat_pos: int | None
def __init__(self, parent: Any, map_node: Any) -> None: """ """ self.ele_ref_num = map_node.data_ele self.name = map_node.name if map_node.parent.is_composite(): self.ele_pos = map_node.parent.seq self.subele_pos = map_node.seq else: self.ele_pos = map_node.seq self.subele_pos = None self.repeat_pos = None
[docs] self.id = "ELE"
[docs] self.parent = parent
[docs] self.errors = []
[docs] def accept(self, visitor: Any) -> None: """ Params: visitor - ref to visitor class """ visitor.visit_ele(self)
[docs] def add_error(self, err_cde: str, err_str: str, bad_value: str | None) -> None: """ :param err_cde: Error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.errors.append((err_cde, err_str, bad_value))
[docs] def err_count(self) -> int: return len(self.errors)
[docs] def get_error_count(self) -> int: return len(self.errors)
[docs] class ErrorErrhNull(Exception): """Class for errh_null errors."""
[docs] class errh_null: """ A null error object - used for testing. Stores the current error in simple variables. """
[docs] id: str
[docs] cur_node: Any
[docs] cur_line: int
[docs] err_cde: str | None
[docs] err_str: str | None
def __init__(self) -> None: self.id = "ROOT" self.cur_node = self self.cur_line = 0 self.err_cde = None self.err_str = None
[docs] def reset(self) -> None: """ Clear any errors """ self.err_cde = None self.err_str = None
[docs] def get_cur_line(self) -> int: """ :return: Current file line number :rtype: int """ return self.cur_line
[docs] def get_id(self) -> str: """ :return: Error node type :rtype: string """ return self.id
[docs] def add_isa_loop(self, seg: Any, src: Any) -> None: """ """ pass
[docs] def add_gs_loop(self, seg: Any, src: Any) -> None: """ """ pass
[docs] def add_st_loop(self, seg: Any, src: Any) -> None: """ """ pass
[docs] def add_seg( self, map_node: Any, seg: Any, seg_count: int, cur_line: int, ls_id: str | None, ) -> None: """ """ pass
[docs] def add_ele(self, map_node: Any) -> None: """ """ pass
[docs] def isa_error(self, err_cde: str, err_str: str) -> None: """ :param err_cde: ISA level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.err_cde = err_cde self.err_str = err_str
[docs] def gs_error(self, err_cde: str, err_str: str) -> None: """ :param err_cde: GS level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.err_cde = err_cde self.err_str = err_str
[docs] def st_error(self, err_cde: str, err_str: str) -> None: """ :param err_cde: Segment level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.err_cde = err_cde self.err_str = err_str
[docs] def seg_error( self, err_cde: str, err_str: str, err_value: str | None = None, src_line: int | None = None, ) -> None: """ :param err_cde: Segment level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.err_cde = err_cde self.err_str = err_str
[docs] def ele_error( self, err_cde: str, err_str: str, bad_value: str | None, refdes: str | None = None, ) -> None: """ :param err_cde: Element level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.err_cde = err_cde self.err_str = err_str
[docs] def close_isa_loop(self, node: Any, seg: Any, src: Any) -> None: """ """ pass
[docs] def close_gs_loop(self, node: Any, seg: Any, src: Any) -> None: """ """ pass
[docs] def close_st_loop(self, node: Any, seg: Any, src: Any) -> None: """ """ pass
[docs] def find_node(self, type: str) -> None: """ Find the last node of a type """ pass
[docs] def get_parent(self) -> None: return None
[docs] def get_next_sibling(self) -> None: """ """ return None
[docs] def get_error_count(self) -> int: """ """ if self.err_cde is not None: return 1 else: return 0
[docs] def handle_errors(self, err_list: list[Any]) -> None: pass
[docs] def is_closed(self) -> bool: """ :rtype: boolean """ return True
def __repr__(self) -> str: """ """ return "%i: %s" % (-1, self.id)
[docs] class errh_list: """ Capture validation errors in a list Used to refactor away from error_handler """
[docs] id: str
[docs] cur_node: Any
[docs] cur_line: int
[docs] err_isa: list[tuple[str, str]]
[docs] err_gs: list[tuple[str, str]]
[docs] err_st: list[tuple[str, str]]
[docs] err_seg: list[tuple[str, str, str | None]]
[docs] err_ele: list[tuple[str, str, str | None, str | None]]
def __init__(self) -> None: self.id = "ROOT" self.cur_node = self self.cur_line = 0 self.err_isa = [] self.err_gs = [] self.err_st = [] self.err_seg = [] self.err_ele = []
[docs] def reset(self) -> None: """ Clear any errors """ self.err_isa = [] self.err_gs = [] self.err_st = [] self.err_seg = [] self.err_ele = []
[docs] def get_cur_line(self) -> int: """ :return: Current file line number :rtype: int """ return self.cur_line
[docs] def get_id(self) -> str: """ :return: Error node type :rtype: string """ return self.id
[docs] def add_isa_loop(self, seg: Any, src: Any) -> None: pass
[docs] def add_gs_loop(self, seg: Any, src: Any) -> None: pass
[docs] def add_st_loop(self, seg: Any, src: Any) -> None: pass
[docs] def add_seg( self, map_node: Any, seg: Any, seg_count: int, cur_line: int, ls_id: str | None, ) -> None: pass
[docs] def add_ele(self, map_node: Any) -> None: pass
[docs] def isa_error(self, err_cde: str, err_str: str) -> None: """ :param err_cde: ISA level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.err_isa.append((err_cde, err_str))
[docs] def gs_error(self, err_cde: str, err_str: str) -> None: """ :param err_cde: GS level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.err_gs.append((err_cde, err_str))
[docs] def st_error(self, err_cde: str, err_str: str) -> None: """ :param err_cde: Segment level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.err_st.append((err_cde, err_str))
[docs] def seg_error( self, err_cde: str, err_str: str, err_value: str | None = None, src_line: int | None = None, ) -> None: """ :param err_cde: Segment level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.err_seg.append((err_cde, err_str, err_value))
[docs] def ele_error( self, err_cde: str, err_str: str, bad_value: str | None, refdes: str | None = None, ) -> None: """ :param err_cde: Element level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.err_ele.append((err_cde, err_str, bad_value, refdes))
[docs] def close_isa_loop(self, node: Any, seg: Any, src: Any) -> None: pass
[docs] def close_gs_loop(self, node: Any, seg: Any, src: Any) -> None: pass
[docs] def close_st_loop(self, node: Any, seg: Any, src: Any) -> None: pass
[docs] def find_node(self, type: str) -> None: pass
[docs] def get_parent(self) -> None: return None
[docs] def get_next_sibling(self) -> None: return None
[docs] def get_error_count(self) -> int: return ( len(self.err_isa) + len(self.err_gs) + len(self.err_st) + len(self.err_seg) + len(self.err_ele) )
[docs] def handle_errors(self, err_list: list[tuple[str, str, str, Any, int | None]]) -> None: """ Handles errors generated by X12Reader :param err_list: List of errors :type err_list: [(type, error code, error string)] """ for etype, err_cde, err_str, err_value, src_line in err_list: if etype == "isa": self.isa_error(err_cde, err_str) elif etype == "gs": self.gs_error(err_cde, err_str) elif etype == "st": self.st_error(err_cde, err_str) elif etype == "seg": self.seg_error(err_cde, err_str, err_value, src_line)
[docs] def is_closed(self) -> bool: """ :rtype: boolean """ return True
def __repr__(self) -> str: """ """ return "%i: %s" % (-1, self.id)