######################################################################
# Copyright
# John Holland <john@zoner.org>
# All rights reserved.
#
# This software is licensed as described in the file LICENSE.txt, which
# you should have received as part of this distribution.
#
######################################################################
"""
Interface to X12 Errors
"""
from __future__ import annotations
import logging
from typing import Any, cast
# Intrapackage imports
import pyx12.segment
from .errors import IterOutOfBounds # , IterDone
[docs]
logger = logging.getLogger("pyx12.error_handler")
[docs]
class err_iter:
"""
Iterate over the error tree
Implements an odd iterator???
"""
def __init__(self, errh: Any) -> None:
"""
:param errh: Error_handler instance
:type errh: L{error_handler.err_handler}
"""
self.errh = errh
self.cur_node = errh
self.visit_stack = []
[docs]
def first(self) -> None:
self.cur_node = self.errh
[docs]
def next(self) -> None:
self.__next__()
def __next__(self) -> None:
# If at previosly visited branch, do not do children
if self.cur_node in self.visit_stack:
node = None
else:
node = self.cur_node.get_first_child()
if node is not None:
self.visit_stack.append(self.cur_node)
self.cur_node = node
else:
node = self.cur_node.get_next_sibling()
if node is not None:
self.cur_node = node
else:
if not self.cur_node.is_closed():
raise IterOutOfBounds
node = self.cur_node.get_parent()
if node is None:
raise IterOutOfBounds
if not node.is_closed():
raise IterOutOfBounds
if self.cur_node in self.visit_stack:
del self.visit_stack[-1]
self.cur_node = node
if node.id == "ROOT":
raise IterOutOfBounds
# raise IterDone
[docs]
def get_cur_node(self) -> Any:
return self.cur_node
[docs]
class err_handler:
"""
The interface to the error handling structures.
"""
def __init__(self) -> None:
""" """
self.id = "ROOT"
self.children = []
self.cur_node = self
self.cur_isa_node = None
self.cur_gs_node = None
self.cur_st_node = None
self.cur_seg_node = None
self.seg_node_added = False
self.cur_ele_node = None
self.ele_node_added = False
self.cur_line = 0
[docs]
def accept(self, visitor: Any) -> None:
"""
Params: visitor - ref to visitor class
"""
visitor.visit_root_pre(self)
for child in self.children:
child.accept(visitor)
visitor.visit_root_post(self)
[docs]
def handle_errors(self, err_list: list[tuple[str, str, str, Any, int | None]]) -> None:
"""
:param err_list: list of errors to apply
"""
for err_type, err_cde, err_str, err_val, src_line in err_list:
if err_type == "isa":
self.isa_error(err_cde, err_str)
elif err_type == "gs":
self.gs_error(err_cde, err_str)
elif err_type == "st":
self.st_error(err_cde, err_str)
elif err_type == "seg":
self.seg_error(err_cde, err_str, err_val, src_line)
[docs]
def get_cur_line(self) -> int:
"""
:return: Current file line number
:rtype: int
"""
return self.cur_line
[docs]
def get_id(self) -> str:
""" """
return self.id
[docs]
def add_isa_loop(self, seg_data: pyx12.segment.Segment, src: Any) -> None:
"""
:param seg_data: Segment object
:type seg_data: L{segment<segment.Segment>}
"""
self.children.append(err_isa(self, seg_data, src))
self.cur_isa_node = self.children[-1]
self.cur_seg_node = self.cur_isa_node
self.seg_node_added = True
[docs]
def add_gs_loop(self, seg_data: pyx12.segment.Segment, src: Any) -> None:
"""
:param seg_data: Segment object
:type seg_data: L{segment<segment.Segment>}
"""
parent = self.cur_isa_node
parent.children.append(err_gs(parent, seg_data, src))
self.cur_gs_node = parent.children[-1]
self.cur_seg_node = self.cur_gs_node
self.seg_node_added = True
[docs]
def add_st_loop(self, seg_data: pyx12.segment.Segment, src: Any) -> None:
"""
:param seg_data: Segment object
:type seg_data: L{segment<segment.Segment>}
"""
parent = self.cur_gs_node
parent.children.append(err_st(parent, seg_data, src))
self.cur_st_node = parent.children[-1]
self.cur_seg_node = self.cur_st_node
self.seg_node_added = True
[docs]
def add_seg(
self,
map_node: Any,
seg_data: pyx12.segment.Segment,
seg_count: int,
cur_line: int,
ls_id: str | None,
) -> None:
"""
:param map_node: current segment node
:type map_node: L{node<map_if.segment_if>}
:param seg_data: Segment object
:type seg_data: L{segment<segment.Segment>}
:param seg_count: Count of current segment in the ST Loop
:type seg_count: int
:param cur_line: Current line number in the file
:type cur_line: int
:param ls_id: The current LS loop identifier
:type ls_id: string
"""
parent = self.cur_st_node
self.cur_seg_node = err_seg(parent, map_node, seg_data, seg_count, cur_line, ls_id)
self.seg_node_added = False
def _add_cur_seg(self) -> None:
""" """
if not self.seg_node_added:
self.cur_st_node.children.append(self.cur_seg_node)
self.seg_node_added = True
[docs]
def add_ele(self, map_node: Any) -> None:
""" """
if self.cur_seg_node.id == "ISA":
self.cur_ele_node = err_ele(self.cur_isa_node, map_node)
elif self.cur_seg_node.id == "GS":
self.cur_ele_node = err_ele(self.cur_gs_node, map_node)
elif self.cur_seg_node.id == "ST":
self.cur_ele_node = err_ele(self.cur_st_node, map_node)
else:
self.cur_ele_node = err_ele(self.cur_seg_node, map_node)
self.ele_node_added = False
def _add_cur_ele(self) -> None:
""" """
self._add_cur_seg()
if not self.ele_node_added and self.cur_seg_node is not None:
self.cur_seg_node.elements.append(self.cur_ele_node)
self.ele_node_added = True
[docs]
def isa_error(self, err_cde: str, err_str: str) -> None:
"""
:param err_cde: ISA level error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
sout = ""
sout += "Line:%i " % (self.cur_isa_node.get_cur_line())
sout += "ISA:%s - %s" % (err_cde, err_str)
logger.error(sout)
self.cur_isa_node.add_error(err_cde, err_str)
[docs]
def gs_error(self, err_cde: str, err_str: str) -> None:
"""
:param err_cde: GS level error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
sout = ""
sout += "Line:%i " % (self.cur_gs_node.get_cur_line())
sout += "GS:%s - %s" % (err_cde, err_str)
logger.error(sout)
self.cur_gs_node.add_error(err_cde, err_str)
[docs]
def st_error(self, err_cde: str, err_str: str) -> None:
"""
:param err_cde: Segment level error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
sout = ""
sout += "Line:%i " % (self.cur_st_node.get_cur_line())
sout += "ST:%s - %s" % (err_cde, err_str)
logger.error(sout)
self.cur_st_node.add_error(err_cde, err_str)
[docs]
def seg_error(
self,
err_cde: str,
err_str: str,
err_value: str | None = None,
src_line: int | None = None,
) -> None:
"""
:param err_cde: Segment level error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
sout = ""
try:
self._add_cur_seg()
self.cur_seg_node.add_error(err_cde, err_str, err_value)
except Exception:
sout += "No current segment in error_handler. "
if src_line:
sout += "Line:%i " % (src_line)
else:
if self.cur_seg_node is not None:
sout += "Line:%i " % (self.cur_seg_node.get_cur_line())
sout += "SEG:%s - %s" % (err_cde, err_str)
if err_value:
sout += " (%s)" % err_value
logger.error(sout)
[docs]
def ele_error(
self,
err_cde: str,
err_str: str,
bad_value: str | None,
refdes: str | None = None,
) -> None:
"""
:param err_cde: Element level error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self._add_cur_ele()
self.cur_ele_node.add_error(err_cde, err_str, bad_value)
sout = ""
sout += "Line:%i " % (self.cur_seg_node.get_cur_line())
sout += "ELE:%s - %s" % (err_cde, err_str)
if bad_value:
sout += " (%s)" % (bad_value)
logger.error(sout)
[docs]
def close_isa_loop(self, node: Any, seg: Any, src: Any) -> None:
""" """
self.cur_isa_node.close(node, seg, src)
self.cur_seg_node = self.cur_isa_node
self.seg_node_added = True
[docs]
def close_gs_loop(self, node: Any, seg: Any, src: Any) -> None:
""" """
self.cur_gs_node.close(node, seg, src)
self.cur_seg_node = self.cur_gs_node
self.seg_node_added = True
[docs]
def close_st_loop(self, node: Any, seg: Any, src: Any) -> None:
""" """
self.cur_st_node.close(node, seg, src)
self.cur_seg_node = self.cur_st_node
self.seg_node_added = True
[docs]
def find_node(self, type: str) -> None:
"""
Find the last node of a type
"""
new_node = self.cur_node
node_order = {"ROOT": 1, "ISA": 2, "GS": 3, "ST": 4, "SEG": 5, "ELE": 6}
while node_order[type] > new_node[new_node.get_id()]:
new_node = new_node.get_parent()
def _get_last_child(self) -> Any:
""" """
if len(self.children) != 0:
return self.children[-1]
else:
return None
[docs]
def get_parent(self) -> None:
return None
[docs]
def get_error_count(self) -> int:
""" """
count = 0
for child in self.children:
count += child.get_error_count()
return count
[docs]
def get_first_child(self) -> Any:
""" """
if len(self.children) > 0:
return self.children[0]
else:
return None
[docs]
def get_next_sibling(self) -> None:
""" """
return None
def __next__(self) -> Any:
"""
Return the next error node
"""
yield from self.children
[docs]
def is_closed(self) -> bool:
"""
:rtype: boolean
"""
return True
def __repr__(self) -> str:
""" """
return "%i: %s" % (-1, self.id)
[docs]
class err_node:
def __init__(self, parent: Any) -> None:
""" """
self.parent = parent
self.id = None
self.children = []
self.cur_line = -1
self.errors = []
[docs]
def accept(self, visitor: Any) -> None:
""" """
pass
[docs]
def get_cur_line(self) -> int:
"""
:return: Current file line number
:rtype: int
"""
return self.cur_line
[docs]
def get_id(self) -> str | None:
""" """
return self.id
[docs]
def get_parent(self) -> Any:
""" """
return self.parent
def _get_last_child(self) -> Any:
""" """
if len(self.children) != 0:
return self.children[-1]
else:
return None
[docs]
def get_next_sibling(self) -> Any:
""" """
bFound = False
for sibling in self.parent.children:
if bFound:
return sibling
if sibling is self:
bFound = True
return None
[docs]
def get_first_child(self) -> Any:
""" """
if len(self.children) > 0:
return self.children[0]
else:
return None
[docs]
def get_error_count(self) -> int:
""" """
count = 0
for child in self.children:
count += child.get_error_count()
return count
[docs]
def get_error_list(self, seg_id: str | None, pre: bool = False) -> list[Any]:
""" """
return self.errors
[docs]
def is_closed(self) -> bool:
"""
:rtype: boolean
"""
return True
[docs]
class err_isa(err_node):
"""
Holds source ISA loop errors
"""
[docs]
seg_data: pyx12.segment.Segment
[docs]
cur_line_iea: int | None
[docs]
isa_trn_set_id: str | None
def __init__(self, parent: Any, seg_data: pyx12.segment.Segment, src: Any) -> None:
"""
:param seg_data: Segment object
:type seg_data: L{segment<segment.Segment>}
:param src: X12file source
:type src: L{X12file<x12file.X12Reader>}
"""
self.seg_data = seg_data
self.isa_id = src.get_isa_id()
self.cur_line_isa = src.get_cur_line()
self.cur_line_iea = None
self.isa_trn_set_id = seg_data.get_value("ISA13")
self.ta1_req = seg_data.get_value("ISA14")
self.orig_date = seg_data.get_value("ISA09")
self.orig_time = seg_data.get_value("ISA10")
self.elements = []
[docs]
def is_closed(self) -> bool:
"""
:rtype: boolean
"""
if self.cur_line_iea:
return True
else:
return False
[docs]
def accept(self, visitor: Any) -> None:
"""
Params: visitor - ref to visitor class
"""
visitor.visit_isa_pre(self)
for child in self.children:
child.accept(visitor)
visitor.visit_isa_post(self)
[docs]
def add_error(self, err_cde: str, err_str: str) -> None:
"""
:param err_cde: Error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self.errors.append((err_cde, err_str))
[docs]
def close(self, node: Any, seg: Any, src: Any) -> None:
self.cur_line_iea = src.get_cur_line()
[docs]
def get_cur_line(self) -> int:
"""
:return: Current file line number
:rtype: int
"""
if self.cur_line_iea:
return self.cur_line_iea
else:
return self.cur_line_isa
[docs]
def get_error_count(self) -> int:
""" """
count = 0
for ele in self.elements:
count += ele.get_error_count()
for child in self.children:
count += child.get_error_count()
return count + len(self.errors)
[docs]
def get_error_list(self, seg_id: str | None, pre: bool = False) -> list[Any]:
""" """
if seg_id == "ISA":
return [err for err in self.errors if "ISA" in err[0]]
elif seg_id == "IEA":
return [err for err in self.errors if "IEA" in err[0]]
else:
return []
def __next__(self) -> Any:
"""
Return the next error node
"""
yield from self.children
next(self.parent)
def __repr__(self) -> str:
return "%i: %s" % (self.get_cur_line(), self.id)
[docs]
class err_gs(err_node):
"""
Holds source GS loop information
"""
[docs]
seg_data: pyx12.segment.Segment
[docs]
cur_line_ge: int | None
[docs]
gs_control_num: str | None
def __init__(self, parent: Any, seg_data: pyx12.segment.Segment, src: Any) -> None:
"""
:param seg_data: Segment object
:type seg_data: L{segment<segment.Segment>}
:param src: X12file source
:type src: L{X12file<x12file.X12Reader>}
"""
self.seg_data = seg_data
self.isa_id = src.get_isa_id()
self.cur_line_gs = src.get_cur_line()
self.cur_line_ge = None
self.gs_control_num = src.get_gs_id()
self.fic = self.seg_data.get_value("GS01")
self.vriic = self.seg_data.get_value("GS08")
self.st_loops = []
# From GE loop
self.ack_code = None # AK901
self.st_count_orig = 0 # AK902
self.st_count_recv = 0 # AK903
self.elements = []
[docs]
def accept(self, visitor: Any) -> None:
"""
Params: visitor - ref to visitor class
"""
visitor.visit_gs_pre(self)
for child in self.children:
child.accept(visitor)
visitor.visit_gs_post(self)
[docs]
def add_error(self, err_cde: str, err_str: str) -> None:
"""
:param err_cde: Error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self.errors.append((err_cde, err_str))
[docs]
def close(self, node: Any, seg_data: pyx12.segment.Segment | None, src: Any) -> None:
""" """
self.cur_line_ge = src.get_cur_line()
self.ack_code = self._get_ack_code()
if seg_data is None:
self.st_count_orig = 0
else:
self.st_count_orig = int(cast(str, seg_data.get_value("GE01")))
self.st_count_recv = src.st_count # AK903
def _get_ack_code(self) -> str:
for child in self.children:
if child.get_error_count() > 0:
return "R"
if len(self.errors) > 0:
return "R"
return "A"
[docs]
def count_failed_st(self) -> int:
ct = 0
for child in self.children:
if child.ack_code not in ["A", "E"]:
ct += 1
return ct
[docs]
def get_cur_line(self) -> int:
"""
:return: Current file line number
:rtype: int
"""
if self.cur_line_ge:
return self.cur_line_ge
else:
return self.cur_line_gs
[docs]
def get_error_count(self) -> int:
""" """
count = 0
for ele in self.elements:
count += ele.get_error_count()
for child in self.children:
count += child.get_error_count()
return count + len(self.errors)
[docs]
def get_error_list(self, seg_id: str | None, pre: bool = False) -> list[Any]:
""" """
if seg_id == "GS":
return [err for err in self.errors if err[0] in ("6")]
elif seg_id == "GE":
return [err for err in self.errors if err[0] not in ("6")]
else:
return []
[docs]
def is_closed(self) -> bool:
"""
:rtype: boolean
"""
if self.cur_line_ge:
return True
else:
return False
def __next__(self) -> Any:
"""
Return the next error node
"""
yield from self.children
next(self.parent)
def __repr__(self) -> str:
return "%i: %s" % (self.get_cur_line(), self.id)
[docs]
class err_st(err_node):
"""
ST loops
Needs:
1. Transaction set id code (837, 834)
2. Transaction set control number
3. trn set error codes
4. At SE, Determine final ack code
"""
[docs]
seg_data: pyx12.segment.Segment
[docs]
trn_set_control_num: str | None
[docs]
cur_line_se: int | None
def __init__(self, parent: Any, seg_data: pyx12.segment.Segment, src: Any) -> None:
"""
:param seg_data: Segment object
:type seg_data: L{segment<segment.Segment>}
:param src: X12file source
:type src: L{X12file<x12file.X12Reader>}
"""
self.seg_data = seg_data
self.trn_set_control_num = src.get_st_id()
self.cur_line_st = src.get_cur_line()
self.cur_line_se = None
self.trn_set_id = seg_data.get_value("ST01")
self.vriic = seg_data.get_value("ST03")
self.ack_code = "R"
self.elements = []
[docs]
def accept(self, visitor: Any) -> None:
"""
Params: visitor - ref to visitor class
"""
visitor.visit_st_pre(self)
for child in self.children:
child.accept(visitor)
visitor.visit_st_post(self)
[docs]
def add_error(self, err_cde: str, err_str: str) -> None:
"""
:param err_cde: Error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self.errors.append((err_cde, err_str))
[docs]
def close(self, node: Any, seg_data: pyx12.segment.Segment | None, src: Any) -> None:
"""
Close ST loop
:param node: SE node
:type node: L{node<map_if.x12_node>}
:param seg_data: Segment object
:type seg_data: L{segment<segment.Segment>}
:param src: X12file source
:type src: L{X12file<x12file.X12Reader>}
"""
self.cur_line_se = src.get_cur_line()
if self.err_count() > 0:
self.ack_code = "R"
else:
self.ack_code = "A"
[docs]
def err_count(self) -> int:
"""
:return: Count of ST/SE loop errors
:rtype: int
"""
seg_err_ct = 0
if self.child_err_count() > 0:
seg_err_ct = 1
return len(self.errors) + seg_err_ct
[docs]
def get_error_count(self) -> int:
return self.err_count()
[docs]
def get_error_list(self, seg_id: str | None, pre: bool = False) -> list[Any]:
""" """
if seg_id == "ST":
return [err for err in self.errors if err[0] in ("1", "6", "7", "23")]
elif seg_id == "SE":
return [err for err in self.errors if err[0] not in ("1", "6", "7", "23")]
else:
return []
[docs]
def child_err_count(self) -> int:
ct = 0
for child in self.children:
if child.err_count() > 0:
ct += 1
return ct
[docs]
def get_cur_line(self) -> int:
"""
:return: Current file line number
:rtype: int
"""
if self.cur_line_se:
return self.cur_line_se
else:
return self.cur_line_st
[docs]
def is_closed(self) -> bool:
"""
:rtype: boolean
"""
if self.cur_line_se:
return True
else:
return False
def __next__(self) -> Any:
"""
Return the next error node
"""
yield from self.children
return
def __repr__(self) -> str:
return "%i: %s" % (self.get_cur_line(), self.id)
[docs]
class err_seg(err_node):
"""
Segment Errors
"""
def __init__(
self,
parent: Any,
map_node: Any,
seg_data: pyx12.segment.Segment,
seg_count: int,
cur_line: int,
ls_id: str | None,
) -> None:
"""
Needs:
1. seg_id_code
2. seg_pos - pos in ST loop
3. loop_id - LS loop id
4. seg_count - in parent
"""
if map_node is None:
self.name = "Unknown"
self.pos = -1
else:
self.name = map_node.name
self.pos = map_node.pos
self.seg_id = seg_data.get_seg_id()
self.seg_count = seg_count
[docs]
self.cur_line = cur_line
self.ls_id = ls_id
self.elements = []
[docs]
def accept(self, visitor: Any) -> None:
"""
Params: visitor - ref to visitor class
"""
visitor.visit_seg(self)
for elem in self.elements:
elem.accept(visitor)
[docs]
def add_error(self, err_cde: str, err_str: str, err_value: str | None = None) -> None:
"""
:param err_cde: Error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self.errors.append((err_cde, err_str, err_value))
[docs]
def err_count(self) -> int:
"""
Returns: count of errors
"""
ele_err_ct = 0
if self.child_err_count() > 0:
ele_err_ct = 1
return len(self.errors) + ele_err_ct
[docs]
def get_error_count(self) -> int:
return self.err_count()
[docs]
def child_err_count(self) -> int:
ct = 0
for ele in self.elements:
if ele.err_count() > 0:
ct += 1
return ct
def __next__(self) -> Any:
"""
Desc: Return the next error node
"""
return self
def __repr__(self) -> str:
return "%i: %s %s" % (self.get_cur_line(), self.id, self.seg_id)
[docs]
def get_first_child(self) -> None:
return None
[docs]
class err_ele(err_node):
"""
Element Errors - Holds and generates output for element and
composite/sub-element errors
Each element with an error creates a new err_ele instance.
"""
[docs]
ele_ref_num: str | None
def __init__(self, parent: Any, map_node: Any) -> None:
""" """
self.ele_ref_num = map_node.data_ele
self.name = map_node.name
if map_node.parent.is_composite():
self.ele_pos = map_node.parent.seq
self.subele_pos = map_node.seq
else:
self.ele_pos = map_node.seq
self.subele_pos = None
self.repeat_pos = None
[docs]
def accept(self, visitor: Any) -> None:
"""
Params: visitor - ref to visitor class
"""
visitor.visit_ele(self)
[docs]
def add_error(self, err_cde: str, err_str: str, bad_value: str | None) -> None:
"""
:param err_cde: Error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self.errors.append((err_cde, err_str, bad_value))
[docs]
def err_count(self) -> int:
return len(self.errors)
[docs]
def get_error_count(self) -> int:
return len(self.errors)
[docs]
class ErrorErrhNull(Exception):
"""Class for errh_null errors."""
[docs]
class errh_null:
"""
A null error object - used for testing.
Stores the current error in simple variables.
"""
def __init__(self) -> None:
self.id = "ROOT"
self.cur_node = self
self.cur_line = 0
self.err_cde = None
self.err_str = None
[docs]
def reset(self) -> None:
"""
Clear any errors
"""
self.err_cde = None
self.err_str = None
[docs]
def get_cur_line(self) -> int:
"""
:return: Current file line number
:rtype: int
"""
return self.cur_line
[docs]
def get_id(self) -> str:
"""
:return: Error node type
:rtype: string
"""
return self.id
[docs]
def add_isa_loop(self, seg: Any, src: Any) -> None:
""" """
pass
[docs]
def add_gs_loop(self, seg: Any, src: Any) -> None:
""" """
pass
[docs]
def add_st_loop(self, seg: Any, src: Any) -> None:
""" """
pass
[docs]
def add_seg(
self,
map_node: Any,
seg: Any,
seg_count: int,
cur_line: int,
ls_id: str | None,
) -> None:
""" """
pass
[docs]
def add_ele(self, map_node: Any) -> None:
""" """
pass
[docs]
def isa_error(self, err_cde: str, err_str: str) -> None:
"""
:param err_cde: ISA level error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self.err_cde = err_cde
self.err_str = err_str
[docs]
def gs_error(self, err_cde: str, err_str: str) -> None:
"""
:param err_cde: GS level error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self.err_cde = err_cde
self.err_str = err_str
[docs]
def st_error(self, err_cde: str, err_str: str) -> None:
"""
:param err_cde: Segment level error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self.err_cde = err_cde
self.err_str = err_str
[docs]
def seg_error(
self,
err_cde: str,
err_str: str,
err_value: str | None = None,
src_line: int | None = None,
) -> None:
"""
:param err_cde: Segment level error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self.err_cde = err_cde
self.err_str = err_str
[docs]
def ele_error(
self,
err_cde: str,
err_str: str,
bad_value: str | None,
refdes: str | None = None,
) -> None:
"""
:param err_cde: Element level error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self.err_cde = err_cde
self.err_str = err_str
[docs]
def close_isa_loop(self, node: Any, seg: Any, src: Any) -> None:
""" """
pass
[docs]
def close_gs_loop(self, node: Any, seg: Any, src: Any) -> None:
""" """
pass
[docs]
def close_st_loop(self, node: Any, seg: Any, src: Any) -> None:
""" """
pass
[docs]
def find_node(self, type: str) -> None:
"""
Find the last node of a type
"""
pass
[docs]
def get_parent(self) -> None:
return None
[docs]
def get_next_sibling(self) -> None:
""" """
return None
[docs]
def get_error_count(self) -> int:
""" """
if self.err_cde is not None:
return 1
else:
return 0
[docs]
def handle_errors(self, err_list: list[Any]) -> None:
pass
[docs]
def is_closed(self) -> bool:
"""
:rtype: boolean
"""
return True
def __repr__(self) -> str:
""" """
return "%i: %s" % (-1, self.id)
[docs]
class errh_list:
"""
Capture validation errors in a list
Used to refactor away from error_handler
"""
[docs]
err_isa: list[tuple[str, str]]
[docs]
err_gs: list[tuple[str, str]]
[docs]
err_st: list[tuple[str, str]]
[docs]
err_seg: list[tuple[str, str, str | None]]
[docs]
err_ele: list[tuple[str, str, str | None, str | None]]
def __init__(self) -> None:
self.id = "ROOT"
self.cur_node = self
self.cur_line = 0
self.err_isa = []
self.err_gs = []
self.err_st = []
self.err_seg = []
self.err_ele = []
[docs]
def reset(self) -> None:
"""
Clear any errors
"""
self.err_isa = []
self.err_gs = []
self.err_st = []
self.err_seg = []
self.err_ele = []
[docs]
def get_cur_line(self) -> int:
"""
:return: Current file line number
:rtype: int
"""
return self.cur_line
[docs]
def get_id(self) -> str:
"""
:return: Error node type
:rtype: string
"""
return self.id
[docs]
def add_isa_loop(self, seg: Any, src: Any) -> None:
pass
[docs]
def add_gs_loop(self, seg: Any, src: Any) -> None:
pass
[docs]
def add_st_loop(self, seg: Any, src: Any) -> None:
pass
[docs]
def add_seg(
self,
map_node: Any,
seg: Any,
seg_count: int,
cur_line: int,
ls_id: str | None,
) -> None:
pass
[docs]
def add_ele(self, map_node: Any) -> None:
pass
[docs]
def isa_error(self, err_cde: str, err_str: str) -> None:
"""
:param err_cde: ISA level error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self.err_isa.append((err_cde, err_str))
[docs]
def gs_error(self, err_cde: str, err_str: str) -> None:
"""
:param err_cde: GS level error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self.err_gs.append((err_cde, err_str))
[docs]
def st_error(self, err_cde: str, err_str: str) -> None:
"""
:param err_cde: Segment level error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self.err_st.append((err_cde, err_str))
[docs]
def seg_error(
self,
err_cde: str,
err_str: str,
err_value: str | None = None,
src_line: int | None = None,
) -> None:
"""
:param err_cde: Segment level error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self.err_seg.append((err_cde, err_str, err_value))
[docs]
def ele_error(
self,
err_cde: str,
err_str: str,
bad_value: str | None,
refdes: str | None = None,
) -> None:
"""
:param err_cde: Element level error code
:type err_cde: string
:param err_str: Description of the error
:type err_str: string
"""
self.err_ele.append((err_cde, err_str, bad_value, refdes))
[docs]
def close_isa_loop(self, node: Any, seg: Any, src: Any) -> None:
pass
[docs]
def close_gs_loop(self, node: Any, seg: Any, src: Any) -> None:
pass
[docs]
def close_st_loop(self, node: Any, seg: Any, src: Any) -> None:
pass
[docs]
def find_node(self, type: str) -> None:
pass
[docs]
def get_parent(self) -> None:
return None
[docs]
def get_next_sibling(self) -> None:
return None
[docs]
def get_error_count(self) -> int:
return (
len(self.err_isa)
+ len(self.err_gs)
+ len(self.err_st)
+ len(self.err_seg)
+ len(self.err_ele)
)
[docs]
def handle_errors(self, err_list: list[tuple[str, str, str, Any, int | None]]) -> None:
"""
Handles errors generated by X12Reader
:param err_list: List of errors
:type err_list: [(type, error code, error string)]
"""
for etype, err_cde, err_str, err_value, src_line in err_list:
if etype == "isa":
self.isa_error(err_cde, err_str)
elif etype == "gs":
self.gs_error(err_cde, err_str)
elif etype == "st":
self.st_error(err_cde, err_str)
elif etype == "seg":
self.seg_error(err_cde, err_str, err_value, src_line)
[docs]
def is_closed(self) -> bool:
"""
:rtype: boolean
"""
return True
def __repr__(self) -> str:
""" """
return "%i: %s" % (-1, self.id)