Source code for pyx12.map_if._segment

######################################################################
# Copyright (c)
# All rights reserved.
#
# This software is licensed as described in the file LICENSE.txt, which
# you should have received as part of this distribution.
#
######################################################################
"""
Segment interface - per-segment validator and matcher.
"""

from __future__ import annotations

import sys
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any
from xml.etree.ElementTree import Element

import pyx12.segment

from ..error_codes import (
    SEG_2_SYNTAX_RELATIONAL,
    SEG_3_TOO_MANY_ELEMENTS,
    SEG_3_TOO_MANY_SUBELEMENTS,
    SEG_8_HAS_DATA_ELEMENT_ERRORS,
    SEG_10_SYNTAX_EXCLUSIVE,
)
from ..error_item import EleError
from ..errors import EngineError
from ..path import X12Path
from ..syntax import is_syntax_valid
from ._base import MAXINT, _required_attr, x12_node
from ._composite import composite_if
from ._element import element_if

if TYPE_CHECKING:
    from ._root import map_if


def apply_segment_errors(node: segment_if, seg_data: pyx12.segment.Segment, errh: Any) -> bool:
    """Drive a segment validation: run is_valid_errors and forward errors
    with cursor maintenance.

    Routing rule: any element- or composite-level error (``err_cde`` with
    prefix ``ELE_`` or ``COMP_``, identified by a non-None ``map_node``)
    is preserved in ``err_ele.errors`` with its specific pyx12 code, AND
    triggers a single ``SEG_8_HAS_DATA_ELEMENT_ERRORS`` in
    ``err_seg.errors``. SEG-level validator errors (``map_node`` is
    ``None``) collapse to that same single ``SEG_8`` (their specific
    ``SEG_3_*`` / ``SEG_2_*`` / ``SEG_10_*`` codes are dropped per
    PR #161 spec correctness — emitting them as IK3-04 would map to
    wrong X12 semantics).

    Exactly one ``SEG_8`` per segment, no matter how many element /
    composite / seg-validator errors fired. The first SEG-level
    validator error's err_str / err_val is preserved on the SEG_8
    emission for diagnostic detail; if only element/composite errors
    fired, a generic err_str is used.

    Errors whose pyx12 code is in ``errh.suppress_error_codes`` are
    filtered out before reaching the err_handler tree. ``ok`` still
    reflects whether the validator found any errors (suppression does
    not flip a bad segment to good)."""
    ok, errors = node.is_valid_errors(seg_data)
    suppressed: set[str] = getattr(errh, "suppress_error_codes", set()) or set()
    if suppressed:
        errors = [e for e in errors if e.err_cde not in suppressed]
    prev_cursor = None
    needs_seg_8 = False
    seg_8_err_str = "Segment has data element errors"
    seg_8_err_val: str | None = None
    for e in errors:
        if e.map_node is None:
            # SEG-level validator error: preserve the first one's err_str
            # and err_val on the SEG_8 emission for diagnostic detail.
            if not needs_seg_8:
                seg_8_err_str = e.err_str
                seg_8_err_val = e.err_val
            needs_seg_8 = True
            continue
        # Element- or composite-level error: keep it in err_ele.errors
        # with its specific pyx12 code.
        if e.map_node is not prev_cursor:
            errh.add_ele(e.map_node)
            prev_cursor = e.map_node
        errh.ele_error(e.err_cde, e.err_str, e.err_val, e.refdes)
        needs_seg_8 = True
    if needs_seg_8:
        errh.seg_error(SEG_8_HAS_DATA_ELEMENT_ERRORS, seg_8_err_str, seg_8_err_val)
    return ok


[docs] class segment_if(x12_node): """ Segment Interface """ root: map_if children: list[element_if | composite_if] # type: ignore[assignment] base_name: str _cur_count: int syntax: list[list[Any]] type: str | None pos: int max_use: str | None repeat: str | None end_tag: str | None def __init__(self, root: map_if, parent: x12_node, elem: Element) -> None: """ :param parent: parent node """ x12_node.__init__(self) self.root = root self.parent = parent self.children = [] self.base_name = "segment" self._cur_count = 0 self.syntax = [] self.id = elem.get("xid") self.path = self.id or "" self.type = elem.get("type") self.name = elem.get("name") if elem.get("name") else elem.findtext("name") self.usage = elem.get("usage") if elem.get("usage") else elem.findtext("usage") self.pos = int(_required_attr(elem, "pos")) self.max_use = elem.get("max_use") if elem.get("max_use") else elem.findtext("max_use") self.repeat = elem.get("repeat") if elem.get("repeat") else elem.findtext("repeat") self.end_tag = elem.get("end_tag") if elem.get("end_tag") else elem.findtext("end_tag") for s in elem.findall("syntax"): syn_list = self._split_syntax(s.text) if syn_list is not None: self.syntax.append(syn_list) children_map: dict[int, Element] = {} for e in elem.findall("element"): seq = int(_required_attr(e, "seq")) children_map[seq] = e for e in elem.findall("composite"): seq = int(_required_attr(e, "seq")) children_map[seq] = e for seq in sorted(children_map.keys()): if children_map[seq].tag == "element": self.children.append(element_if(self.root, self, children_map[seq])) elif children_map[seq].tag == "composite": self.children.append(composite_if(self.root, self, children_map[seq]))
[docs] def debug_print(self) -> None: sys.stdout.write(self.__repr__()) for node in self.children: node.debug_print()
def __repr__(self) -> str: """ :rtype: string """ out = '%s "%s"' % (self.id, self.name) if self.usage: out += " usage: %s" % (self.usage) if self.pos: out += " pos: %i" % (self.pos) if self.max_use: out += " max_use: %s" % (self.max_use) out += "\n" return out
[docs] def get_child_node_by_idx(self, idx: int) -> element_if | composite_if | None: """ :param idx: zero based """ if idx >= len(self.children): return None else: m = [c for c in self.children if c.seq == idx + 1] if len(m) == 1: return m[0] else: raise EngineError("idx %i not found in %s" % (idx, self.id))
[docs] def get_child_node_by_ordinal(self, ord: int) -> element_if | composite_if | None: """ Get a child element or composite by the X12 ordinal :param ord: one based element/composite index. Corresponds to the map <seq> element :type ord: int """ return self.get_child_node_by_idx(ord - 1)
[docs] def getnodebypath2(self, path_str: str) -> x12_node | None: """ Try x12 path :param path_str: remaining path to match :type path_str: string :return: matching node, or None is no match """ x12path = X12Path(path_str) if x12path.empty(): return None if x12path.ele_idx is None: return self # matched segment only ele = self.get_child_node_by_ordinal(x12path.ele_idx) if x12path.subele_idx is None: return ele if isinstance(ele, composite_if): return ele.get_child_node_by_ordinal(x12path.subele_idx) return None
[docs] def get_max_repeat(self) -> int: if self.max_use is None or self.max_use == ">1": return MAXINT return int(self.max_use)
[docs] def get_parent(self) -> x12_node | None: """ :return: ref to parent class instance :rtype: pyx12.x12_node """ return self.parent
[docs] def is_first_seg_in_loop(self) -> bool: """ :rtype: boolean """ from ._loop import loop_if # local import: loop_if imports segment_if at module load parent = self.get_parent() if isinstance(parent, loop_if) and self is parent.get_first_seg(): return True else: return False
[docs] def is_match(self, seg: pyx12.segment.Segment) -> bool: """ Is data segment given a match to this segment node? :param seg: data segment instance :return: boolean :rtype: boolean """ if seg.get_seg_id() != self.id: return False key = self._resolve_unique_key_field(seg.get_seg_id(), with_qual=False) if key is None: return True child, ele_idx, subele_idx = key path = f"{ele_idx:02d}-{subele_idx}" if subele_idx else f"{ele_idx:02d}" return seg.get_value(path) in child._valid_codes_set
[docs] def is_match_qual( self, seg_data: pyx12.segment.Segment, seg_id: str | None, qual_code: str | None, ) -> tuple[bool, str | None, int | None, int | None]: """ Is segment id and qualifier a match to this segment node and to this particular segment data? :param seg_data: data segment instance :type seg_data: L{segment<segment.Segment>} :param seg_id: data segment ID :param qual_code: an ID qualifier code :return: (True if a match, qual_code, element_index, subelement_index) :rtype: tuple(boolean, string, int, int) """ if seg_id != self.id: return (False, None, None, None) if qual_code is None: return (True, None, None, None) key = self._resolve_unique_key_field(seg_id, with_qual=True) if key is None: return (True, None, None, None) child, ele_idx, subele_idx = key path = f"{ele_idx:02d}-{subele_idx}" if subele_idx else f"{ele_idx:02d}" if qual_code in child._valid_codes_set and seg_data.get_value(path) == qual_code: return (True, qual_code, ele_idx, subele_idx) return (False, None, None, None)
def _resolve_unique_key_field( self, seg_id: str | None, *, with_qual: bool ) -> tuple[element_if, int, int | None] | None: """ Locate the child node carrying this segment's qualifier (if any). Returns ``(validating_child, ele_idx, subele_idx_or_None)`` describing where to read the qualifier value from a data segment, or ``None`` if the segment has no recognizable qualifier field. ``with_qual=False`` (used by ``is_match``) accepts the AN-typed CTX composite as a valid qualifier carrier; ``with_qual=True`` (used by ``is_match_qual``) only honors ID-typed qualifier fields. """ c0 = self.children[0] if len(self.children) > 0 else None c1 = self.children[1] if len(self.children) > 1 else None c2 = self.children[2] if len(self.children) > 2 else None # Element at position 01 — the common case if ( isinstance(c0, element_if) and c0.get_data_type() == "ID" and c0.usage == "R" and len(c0.valid_codes) > 0 ): return (c0, 1, None) # ENT-segment carries its qualifier at element 02 (820 special case) if ( seg_id == "ENT" and isinstance(c1, element_if) and c1.get_data_type() == "ID" and len(c1.valid_codes) > 0 ): return (c1, 2, None) # CTX-segment can have an AN-typed composite at 01-1 (999 special case); # is_match_qual ignores this branch. if ( not with_qual and seg_id == "CTX" and isinstance(c0, composite_if) and c0.children[0].get_data_type() == "AN" and len(c0.children[0].valid_codes) > 0 ): return (c0.children[0], 1, 1) # General ID-typed composite at 01-1 if ( isinstance(c0, composite_if) and c0.children[0].get_data_type() == "ID" and len(c0.children[0].valid_codes) > 0 ): return (c0.children[0], 1, 1) # HL-segment carries its qualifier at element 03 if seg_id == "HL" and isinstance(c2, element_if) and len(c2.valid_codes) > 0: return (c2, 3, None) return None
[docs] def guess_unique_key_id_element(self) -> element_if | None: """ Some segments, like REF, DTP, and DTP are duplicated. They are matched using the value of an ID element. Which element to use varies. This function tries to find a good candidate. """ c0 = self.children[0] if len(self.children) > 0 else None c1 = self.children[1] if len(self.children) > 1 else None c2 = self.children[2] if len(self.children) > 2 else None if isinstance(c0, element_if) and c0.get_data_type() == "ID" and len(c0.valid_codes) > 0: return c0 # Special Case for 820 elif ( self.id == "ENT" and isinstance(c1, element_if) and c1.get_data_type() == "ID" and len(c1.valid_codes) > 0 ): return c1 elif ( isinstance(c0, composite_if) and c0.children[0].get_data_type() == "ID" and len(c0.children[0].valid_codes) > 0 ): return c0.children[0] elif self.id == "HL" and isinstance(c2, element_if) and len(c2.valid_codes) > 0: return c2 return None
[docs] def get_unique_key_id_element(self, id_val: str) -> element_if | None: """ Some segments, like REF, DTP, and DTP are duplicated. They are matched using the value of an ID element. Which element to use varies. This function tries to find a good candidate, using a key value """ c0 = self.children[0] if len(self.children) > 0 else None c1 = self.children[1] if len(self.children) > 1 else None c2 = self.children[2] if len(self.children) > 2 else None if ( isinstance(c0, element_if) and c0.get_data_type() == "ID" and len(c0.valid_codes) > 0 and id_val in c0._valid_codes_set ): return c0 # Special Case for 820 elif ( self.id == "ENT" and isinstance(c1, element_if) and c1.get_data_type() == "ID" and len(c1.valid_codes) > 0 and id_val in c1._valid_codes_set ): return c1 elif ( isinstance(c0, composite_if) and c0.children[0].get_data_type() == "ID" and len(c0.children[0].valid_codes) > 0 and id_val in c0.children[0]._valid_codes_set ): return c0.children[0] elif ( self.id == "HL" and isinstance(c2, element_if) and len(c2.valid_codes) > 0 and id_val in c2._valid_codes_set ): return c2 return None
[docs] def is_segment(self) -> bool: """ :rtype: boolean """ return True
[docs] def is_valid_errors(self, seg_data: pyx12.segment.Segment) -> tuple[bool, list[EleError]]: """ Pure validator parallel to is_valid: returns (ok, errors) without touching an error handler. Seg-level errors (too many elements, too many sub-elements, syntax) leave map_node unset (=None); per-element errors carry map_node from the element validator so a cursor-tracking wrapper can replay add_ele/ele_error in the original order. """ valid = True errors: list[EleError] = [] child_count = self.get_child_count() if len(seg_data) > child_count: err_str = 'Too many elements in segment "%s" (%s). Has %i, should have %i' % ( self.name, seg_data.get_seg_id(), len(seg_data), child_count, ) ref_des = "%02i" % (child_count + 1) err_value = seg_data.get_value(ref_des) errors.append( EleError( err_cde=SEG_3_TOO_MANY_ELEMENTS, err_str=err_str, err_val=err_value, refdes=ref_des, ) ) valid = False dtype: list[str | None] = [] type_list: list[str | None] = [] for i in range(min(len(seg_data), child_count)): child_node = self.get_child_node_by_idx(i) if isinstance(child_node, composite_if): ref_des = "%02i" % (i + 1) comp_data = seg_data.get(ref_des) # When the map says the position holds a composite, seg_data.get # returns either a Composite or None — Element only appears for # element-typed positions. assert comp_data is None or isinstance(comp_data, pyx12.segment.Composite) subele_count = child_node.get_child_count() if seg_data.ele_len(ref_des) > subele_count and child_node.usage != "N": subele_node = child_node.children[subele_count + 1] err_str = 'Too many sub-elements in composite "%s" (%s)' % ( subele_node.name, subele_node.refdes, ) err_value = seg_data.get_value(ref_des) errors.append( EleError( err_cde=SEG_3_TOO_MANY_SUBELEMENTS, err_str=err_str, err_val=err_value, refdes=ref_des, ) ) ok, comp_errors = child_node.is_valid_errors(comp_data) valid &= ok errors += comp_errors elif isinstance(child_node, element_if): if ( i == 1 and seg_data.get_seg_id() == "DTP" and seg_data.get_value("02") in ("RD8", "D8", "D6", "DT", "TM") ): dtype = [seg_data.get_value("02")] if child_node.data_ele == "1250": type_list.extend(child_node.valid_codes) ele_data = seg_data.get("%02i" % (i + 1)) if i == 2 and seg_data.get_seg_id() == "DTP": ok, ele_errors = child_node.is_valid_errors(ele_data, dtype) elif child_node.data_ele == "1251" and len(type_list) > 0: ok, ele_errors = child_node.is_valid_errors(ele_data, type_list) else: ok, ele_errors = child_node.is_valid_errors(ele_data) valid &= ok errors += ele_errors for i in range(min(len(seg_data), child_count), child_count): child_node = self.get_child_node_by_idx(i) if isinstance(child_node, composite_if): ok, child_errors = child_node.is_valid_errors(None) valid &= ok errors += child_errors elif isinstance(child_node, element_if): ok, child_errors = child_node.is_valid_errors(None) valid &= ok errors += child_errors for syn in self.syntax: bResult, syn_err = is_syntax_valid(seg_data, syn) if not bResult: # When is_syntax_valid returns False, syn_err is the message string. assert syn_err is not None code = SEG_10_SYNTAX_EXCLUSIVE if syn[0] == "E" else SEG_2_SYNTAX_RELATIONAL errors.append(EleError(err_cde=code, err_str=syn_err, refdes=syn[1])) valid = False return valid, errors
def _split_syntax(self, syntax: str | None) -> list[Any] | None: """ Split a Syntax string into a list """ if syntax is None or syntax[0] not in ["P", "R", "C", "L", "E"]: return None syn: list[Any] = [syntax[0]] for i in range(len(syntax[1:]) // 2): syn.append(int(syntax[i * 2 + 1 : i * 2 + 3])) return syn
[docs] def get_cur_count(self) -> int: """ :return: current count :rtype: int """ raise DeprecationWarning("Moved to nodeCounter")
[docs] def incr_cur_count(self) -> None: raise DeprecationWarning("Moved to nodeCounter")
[docs] def reset_cur_count(self) -> None: """ Set cur_count of node to zero """ raise DeprecationWarning("Moved to nodeCounter")
[docs] def set_cur_count(self, ct: int) -> None: raise DeprecationWarning("Moved to nodeCounter")
[docs] def get_counts_list(self, ct_list: list[tuple[str, int]]) -> bool: """ Build a list of (path, ct) of the current node and parents Gets the node counts to apply to another map :param ct_list: List to append to :type ct_list: list[(string, int)] """ raise DeprecationWarning("Moved to nodeCounter")
[docs] def loop_segment_iterator(self) -> Iterator[x12_node]: yield self