Source code for pyx12.errh_xml

######################################################################
# Copyright (c)
#   John Holland <john@zoner.org>
# All rights reserved.
#
# This software is licensed as described in the file LICENSE.txt, which
# you should have received as part of this distribution.
#
######################################################################

"""
Capture X12 Errors
"""

from __future__ import annotations

import logging
import os
import tempfile
from types import TracebackType
from typing import IO, Any, Literal

# Intrapackage imports
from .errors import EngineError
from .xmlwriter import XMLWriter

# (kind, code, message, value, src_line)
_ErrTuple = tuple[str, str, str, Any, int | None]


[docs] class err_handler: """ The interface to the error handling structures. """
[docs] logger: logging.Logger
[docs] filename: str
[docs] fd: IO[Any]
[docs] cur_line: int | None
[docs] errors: list[_ErrTuple]
[docs] writer: XMLWriter
def __init__(self, xml_out: str | None = None, basedir: str | None = None) -> None: """ :param xml_out: Output filename, if None, will dump to tempfile :param basedir: working directory, where file will be created """ self.logger = logging.getLogger("pyx12.errh_xml") if xml_out: self.filename = xml_out self.fd = open(xml_out, "w", encoding="utf-8") else: try: (fdesc, self.filename) = tempfile.mkstemp(".xml", "pyx12_") self.fd = os.fdopen(fdesc, "w+b") except OSError: (fdesc, self.filename) = tempfile.mkstemp( suffix=".xml", prefix="pyx12_", dir=basedir ) self.fd = os.fdopen(fdesc, "w+b") self.cur_line = None self.errors = [] if not self.fd: raise EngineError("Could not open temp error xml file") self.writer = XMLWriter(self.fd) # type: ignore[arg-type] self.writer.push("x12err")
[docs] def close(self) -> None: """ Flush remaining XML closing tags and close the underlying file. Idempotent. """ while len(self.writer) > 0: self.writer.pop() if self.fd is not None and not self.fd.closed: self.fd.close()
def __enter__(self) -> err_handler: return self def __exit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, tb: TracebackType | None, ) -> Literal[False]: self.close() return False
[docs] def getFilename(self) -> str: return self.filename
[docs] def handleErrors(self, err_list: list[_ErrTuple]) -> None: """ :param err_list: list of errors to apply """ self.errors.extend(err_list)
# for (err_type, err_cde, err_str, err_val, src_line) in err_list: # if err_type == 'isa': # self.isa_error(err_cde, err_str) # elif err_type == 'gs': # self.gs_error(err_cde, err_str) # elif err_type == 'st': # self.st_error(err_cde, err_str) # elif err_type == 'seg': # self.seg_error(err_cde, err_str, err_val, src_line)
[docs] def getCurLine(self) -> int | None: """ :return: Current file line number :rtype: int """ return self.cur_line
[docs] def Write(self, cur_line: int) -> None: """ Generate XML for the segment data and matching map node """ if len(self.errors) > 0: self.writer.push("seg", attrs={"line": "%i" % (cur_line)}) for err_type, err_cde, err_str, err_val, src_line in self.errors: self.writer.push("err", attrs={"code": err_cde}) # self.writer.elem(u"type", err_type) # self.writer.elem(u"code", err_cde) self.writer.elem("desc", err_str) if err_val: self.writer.elem("errval", err_val) # self.writer.push(u"seg", {u'line': '%i'%(cur_line)}) # self.writer.elem(u'ele', seg_data.get_value('%02i' % # (i+1)), # attrs={u'id': child_node.id}) self.writer.pop() # end err self.writer.pop() # end segment self.errors = []
[docs] class ErrorErrhNull(Exception): """Class for errh_null errors."""
[docs] class errh_list: """ A null error object - used for testing. Stores the current error in simple variables. """
[docs] logger: logging.Logger
[docs] errors: list[_ErrTuple]
[docs] cur_line: int
def __init__(self) -> None: self.logger = logging.getLogger("pyx12.errh_xml") # self.id = 'ROOT' self.errors = [] # self.cur_node = self self.cur_line = 0 # self.err_cde = None # self.err_str = None
[docs] def get_errors(self) -> list[_ErrTuple]: return self.errors
[docs] def reset(self) -> None: self.errors = []
[docs] def get_cur_line(self) -> int: """ :return: Current file line number :rtype: int """ return self.cur_line
[docs] def set_cur_line(self, cur_line: int) -> None: """ """ self.cur_line = cur_line
# def get_id(self): # """ # :return: Error node type # :rtype: string # """ # return self.id
[docs] def add_isa_loop(self, seg: Any, src: Any) -> None: """ """ # raise ErrorErrhNull, 'add_isa loop' pass
[docs] def add_gs_loop(self, seg: Any, src: Any) -> None: """ """ pass
[docs] def add_st_loop(self, seg: Any, src: Any) -> None: """ """ pass
[docs] def add_seg( self, map_node: Any, seg: Any, seg_count: int, cur_line: int, ls_id: str | None ) -> None: """ """ pass
[docs] def add_ele(self, map_node: Any) -> None: """ """ pass
[docs] def isa_error(self, err_cde: str, err_str: str) -> None: """ :param err_cde: ISA level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.errors.append(("isa", err_cde, err_str, None, None)) sout = "" sout += "Line:%i " % (self.cur_line) sout += "ISA:%s - %s" % (err_cde, err_str) self.logger.error(sout)
[docs] def gs_error(self, err_cde: str, err_str: str) -> None: """ :param err_cde: GS level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.errors.append(("gs", err_cde, err_str, None, None)) sout = "" sout += "Line:%i " % (self.cur_line) sout += "GS:%s - %s" % (err_cde, err_str) self.logger.error(sout)
[docs] def st_error(self, err_cde: str, err_str: str) -> None: """ :param err_cde: Segment level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.errors.append(("st", err_cde, err_str, None, None)) sout = "" sout += "Line:%i " % (self.cur_line) sout += "ST:%s - %s" % (err_cde, err_str) self.logger.error(sout)
[docs] def seg_error( self, err_cde: str, err_str: str, err_value: str | None = None, src_line: int | None = None, ) -> None: """ :param err_cde: Segment level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.errors.append(("seg", err_cde, err_str, err_value, src_line)) sout = "" sout += "Line:%i " % (self.cur_line) sout += "SEG:%s - %s" % (err_cde, err_str) if err_value: sout += " (%s)" % err_value self.logger.error(sout)
[docs] def ele_error(self, err_cde: str, err_str: str, bad_value: str | None) -> None: """ :param err_cde: Element level error code :type err_cde: string :param err_str: Description of the error :type err_str: string """ self.errors.append(("ele", err_cde, err_str, bad_value, None)) sout = "" sout += "Line:%i " % (self.cur_line) sout += "ELE:%s - %s" % (err_cde, err_str) if bad_value: sout += " (%s)" % (bad_value) self.logger.error(sout)
[docs] def close_isa_loop(self, node: Any, seg: Any, src: Any) -> None: """ """ pass
[docs] def close_gs_loop(self, node: Any, seg: Any, src: Any) -> None: """ """ pass
[docs] def close_st_loop(self, node: Any, seg: Any, src: Any) -> None: """ """ pass
[docs] def find_node(self, atype: str) -> None: """ Find the last node of a type """ pass
[docs] def get_parent(self) -> None: return None
# def get_first_child(self): # """ # """ # if len(self.children) > 0: # return self.children[0] # else: # return None
[docs] def get_next_sibling(self) -> None: """ """ return None
[docs] def get_error_count(self) -> int: """ """ return len(self.errors)
[docs] def is_closed(self) -> bool: """ :rtype: boolean """ return True