#####################################################################
# Copyright
# John Holland <john@zoner.org>
# All rights reserved.
#
# This software is licensed as described in the file LICENSE.txt, which
# you should have received as part of this distribution.
#
######################################################################
"""
Parse a ANSI X12 data file.
Maintain context state
Start saving context and segments
Interface to read and alter segments
TODO: Attach errors to returned dicts
"""
from __future__ import annotations
from collections.abc import Iterator
from types import TracebackType
from typing import Any, Literal, TextIO
# Intrapackage imports
import pyx12
import pyx12.segment
from . import error_handler, errors, map_if, map_index, path, x12file
from .map_if._segment import apply_segment_errors
from .map_walker import apply_walk_errors, pop_to_parent_loop, walk_tree
[docs]
class X12DataNode:
"""
Capture the segment data and X12 definition for a loop subtree
Alter relational data
Iterate over contents
"""
[docs]
seg_data: pyx12.segment.Segment | None
[docs]
parent: X12DataNode | None
[docs]
children: list[X12DataNode]
[docs]
cur_line_number: int | None
def __init__(
self, x12_node: Any, seg_data: pyx12.segment.Segment | None, ntype: str = "seg"
) -> None:
""" """
self.x12_map_node = x12_node
self.type = ntype
self.seg_data = seg_data
self.parent = None
self.children = []
self.errors = []
self.seg_count = None
self.cur_line_number = None
# { Public Methods
[docs]
def delete(self) -> None:
"""
Delete this node. Mark type as deleted.
"""
self.x12_map_node = None
self.type = None
self.seg_data = None
self.parent = None
self.children = []
self.errors = []
[docs]
def iterate_segments(self) -> Iterator[dict[str, Any]]:
"""
Iterate over this node and children, return any segments found
"""
raise NotImplementedError("Override in sub-class")
[docs]
def iterate_loop_segments(self) -> Iterator[dict[str, Any]]:
"""
Iterate over this node and children, return loop start and loop end
and any segments found
"""
raise NotImplementedError("Override in sub-class")
[docs]
def get_value(self, x12_path: str) -> str | None:
"""
:return: the element value at the relative X12 path
:rtype: string
"""
raise NotImplementedError("Override in sub-class")
[docs]
def set_value(self, x12_path: str, val: str) -> None:
"""
Set the value of simple element at the first found segment at the given path
:param x12_path: An X12 path
:type x12_path: string
:param val: The new element value
:type val: string
"""
raise NotImplementedError("Override in sub-class")
[docs]
def exists(self, x12_path_str: str) -> bool:
"""
Does at least one child at the x12 path exist?
:param x12_path_str: Relative X12 path - 2400/2430
:type x12_path_str: string
:return: True if found
:rtype: boolean
"""
curr, new_path = self._get_start_node(x12_path_str)
xpath = path.X12Path(new_path)
for n in curr._select(xpath):
return True
return False
[docs]
def select(self, x12_path_str: str) -> Iterator[X12DataNode]:
"""
Get a slice of sub-nodes at the relative X12 path.
Note: All interaction/modification with a X12DataNode tree (having a loop
root) is done in place.
:param x12_path_str: Relative X12 path - 2400/2430
:type x12_path_str: string
:return: Iterator on the matching sub-nodes, relative to the instance.
:rtype: L{node<x12context.X12DataNode>}
"""
curr, new_path = self._get_start_node(x12_path_str)
xpath = path.X12Path(new_path)
for n in curr._select(xpath):
if xpath.seg_id is not None:
if n.id != xpath.seg_id:
raise errors.EngineError(
'Selected node id "%s" does not match xpath seg_id "%s"'
% (n.id, xpath.seg_id)
)
else:
if len(xpath.loop_list) == 0:
raise errors.EngineError("xpath has no seg_id and an empty loop list")
if n.id != xpath.loop_list[-1]:
raise errors.EngineError(
'Selected node id "%s" does not match xpath final loop "%s"'
% (n.id, xpath.loop_list[-1])
)
if n.parent is None:
raise errors.EngineError('Node "%s" has no parent' % (n.id))
yield n
[docs]
def first(self, x12_path_str: str) -> X12DataNode | None:
"""
Get the first sub-node matching the relative X12 path.
Note: All interaction/modification with a X12DataNode tree (having a loop
root) is done in place.
:param x12_path_str: Relative X12 path - ie 2400/2430
:type x12_path_str: string
:return: The matching sub-node, relative to the instance.
:rtype: L{node<x12context.X12DataNode>}
"""
if not self.exists(x12_path_str):
return None
for node in self.select(x12_path_str):
return node
return None
[docs]
def count(self, x12_path_str: str) -> int:
"""
Get a count of sub-nodes at the relative X12 path.
:param x12_path_str: Relative X12 path - 2400/2430
:type x12_path_str: string
:return: Count of matching sub-nodes
:rtype: int
"""
ct = 0
curr, new_path = self._get_start_node(x12_path_str)
xpath = path.X12Path(new_path)
for n in curr._select(xpath):
ct += 1
return ct
# { Private Methods
def _cleanup(self) -> None:
"""
Remove deleted nodes
"""
self.children = [x for x in self.children if x.type is not None]
def _get_insert_idx(self, x12_node: Any) -> int:
"""
Find the index of self.children before which the x12_node belongs
Nodes will be inserted after the last node with matching ordinals
"""
self._cleanup()
map_idx = x12_node.pos
idx: int | None = None
for i in range(len(self.children)):
if self.children[i].x12_map_node.pos <= map_idx:
idx = i
if idx is not None:
return idx + 1
return len(self.children)
[docs]
def get_first_matching_segment(self, x12_path_str: str) -> pyx12.segment.Segment | None:
"""
Get first found Segment at the given relative path. If the path is not a
valid relative path or if the given segment index does not exist, the function
returns None.
:param x12_path_str: Relative X12 Path
:type x12_path_str: string
:return: First matching data segment
:rtype: L{node<segment.Segment>}
:raises X12PathError: On blank or invalid path
"""
raise NotImplementedError("Override in sub-class")
def _get_start_node(self, x12_path_str: str) -> tuple[X12DataNode, str]:
"""
Move up the tree. Get the new starting node and the altered path
"""
curr: X12DataNode = self
while x12_path_str[:3] == "../":
if curr.parent is None:
raise errors.X12PathError(
"Current node %s does not have a parent: %s" % (self.id, x12_path_str)
)
curr = curr.parent
x12_path_str = x12_path_str[3:]
return (curr, x12_path_str)
def _select(self, x12path: path.X12Path) -> Iterator[X12DataNode]:
"""
Get the child node at the path
:param x12path: x12 map path
:type x12path: L{path<path.X12Path>}
"""
if len(x12path.loop_list) == 0:
# Only segment left
cur_node_id = x12path.seg_id
qual = x12path.id_val
for child in [x for x in self.children if x.type is not None]:
if child.type == "seg":
is_match, qual_code, ele_idx, subele_idx = child.x12_map_node.is_match_qual(
child.seg_data, cur_node_id, qual
)
if is_match:
yield child
else:
if child.id == cur_node_id:
yield child
else:
cur_node_id = x12path.loop_list[0]
cur_loop_list = x12path.loop_list[1:]
for child in [x for x in self.children if x.type is not None]:
if child.id == cur_node_id:
if len(cur_loop_list) == 0 and x12path.seg_id is None:
yield child
else:
child_path = path.X12Path(x12path.format())
child_path.loop_list = cur_loop_list
yield from child._select(child_path)
def __copy__(self) -> X12DataNode:
"""
Returns a copy of this node
"""
raise NotImplementedError("Override in sub-class")
[docs]
def copy(self) -> X12DataNode:
return self.__copy__()
# { Property Accessors
@property
[docs]
def id(self) -> str | None:
"""
:return: x12 node id
:rtype: string
"""
if self.x12_map_node is None:
raise errors.EngineError("This node has been deleted")
result: str | None = self.x12_map_node.id
return result
@property
[docs]
def cur_path(self) -> str:
"""
:return: x12 node path
:rtype: string
"""
if self.x12_map_node is None:
raise errors.EngineError("This node has been deleted")
result: str = self.x12_map_node.get_path()
return result
[docs]
class X12LoopDataNode(X12DataNode):
"""
Capture the X12 definition for a loop subtree
Alter relational data
Iterate over contents
"""
def __init__(
self,
x12_node: Any,
end_loops: list[Any] | None = None,
parent: X12DataNode | None = None,
) -> None:
"""
Construct an X12LoopDataNode
"""
if end_loops is None:
end_loops = []
[docs]
self.x12_map_node = x12_node
self.end_loops = end_loops # we might need to close a preceeding loop
# { Public Methods
[docs]
def delete(self) -> None:
"""
Delete this node. Mark type as deleted.
"""
self.end_loops = []
X12DataNode.delete(self)
[docs]
def get_value(self, x12_path_str: str) -> str | None:
"""
Returns the element value at the given relative path. If the path is not a
valid relative path or if the given segment index does not exist, the function
returns None. If multiple values exist, this function returns the first.
:param x12_path_str: Relative X12 Path
:type x12_path_str: string
:return: the element value at the relative X12 path
:rtype: string
:raises X12PathError: On blank or invalid path
"""
curr, new_path = self._get_start_node(x12_path_str)
seg_data = curr.get_first_matching_segment(new_path)
if seg_data is None:
return None
xpath = path.X12Path(new_path)
xpath.loop_list = []
xpath.id_val = None
seg_part = xpath.format()
return seg_data.get_value(seg_part)
[docs]
def set_value(self, x12_path_str: str, val: str) -> None:
"""
Set the value of simple element at the first found segment at the given path
:param x12_path_str: Relative X12 Path
:type x12_path_str: string
:param val: The new element value
:type val: string
"""
curr, new_path = self._get_start_node(x12_path_str)
seg_data = curr.get_first_matching_segment(new_path)
if seg_data is None:
raise errors.X12PathError("X12 Path is invalid or was not found: %s" % (x12_path_str))
xpath = path.X12Path(new_path)
xpath.loop_list = []
xpath.id_val = None
seg_part = xpath.format()
seg_data.set(seg_part, val)
[docs]
def iterate_segments(self) -> Iterator[dict[str, Any]]:
"""
Iterate over this node and children
"""
for child in [x for x in self.children if x.type is not None]:
yield from child.iterate_segments()
[docs]
def iterate_loop_segments(self) -> Iterator[dict[str, Any]]:
"""
Iterate over this node and children, return loop start and loop end
"""
for loop in self.end_loops:
yield {"node": loop, "type": "loop_end", "id": loop.id}
yield {"type": "loop_start", "id": self.id, "node": self.x12_map_node}
for child in [x for x in self.children if x.type is not None]:
yield from child.iterate_loop_segments()
yield {"type": "loop_end", "id": self.id, "node": self.x12_map_node}
[docs]
def add_segment(self, seg_data: pyx12.segment.Segment | str) -> X12SegmentDataNode:
"""
Add the segment to this loop node
iif the segment is the anchor for a child loop, also adds the loop
:param seg_data: Segment data
:type seg_data: L{node<segment.Segment>} or string
:return: New segment, or None if failed
:rtype: L{node<x12context.X12SegmentDataNode>}
:raises pyx12.errors.X12PathError: If invalid segment
TODO: Check counts?
"""
seg_data = self._get_segment(seg_data)
x12_seg_node = self.x12_map_node.get_child_seg_node(seg_data)
if x12_seg_node is None:
raise errors.X12PathError(
"The segment %s is not a member of loop %s" % (seg_data.__repr__(), self.id)
)
new_data_node = X12SegmentDataNode(x12_seg_node, seg_data, self)
child_idx = self._get_insert_idx(x12_seg_node)
self.children.insert(child_idx, new_data_node)
return new_data_node
[docs]
def add_loop(self, seg_data: pyx12.segment.Segment | str) -> X12LoopDataNode:
"""
Add a new loop in the correct location
:param seg_data: Segment data
:type seg_data: L{node<segment.Segment>} or string
:return: New loop_data_node, or None if failed
:rtype: L{node<x12context.X12LoopDataNode>}
"""
seg_data = self._get_segment(seg_data)
x12_loop_node = self.x12_map_node.get_child_loop_node(seg_data)
if x12_loop_node is None:
raise errors.X12PathError(
"The segment %s is not a member of loop %s" % (seg_data.__repr__(), self.id)
)
new_data_loop = self._add_loop_node(x12_loop_node)
# Now, add the segment
x12_seg_node = new_data_loop.x12_map_node.get_child_seg_node(seg_data)
new_data_node = X12SegmentDataNode(x12_seg_node, seg_data, new_data_loop)
new_data_loop.add_node(new_data_node)
return new_data_loop
[docs]
def add_node(self, data_node: X12DataNode) -> None:
"""
Add a X12DataNode instance
The x12_map_node of the given data_node must be a direct child of this
object's x12_map_node
:param data_node: The child loop node to add
:type data_node : L{node<x12context.X12DataNode>}
:raises errors.X12PathError: On blank or invalid path
"""
if data_node.x12_map_node.parent != self.x12_map_node:
raise errors.X12PathError(
'The loop_data_node "%s" is not a child of "%s"'
% (data_node.x12_map_node.id, self.x12_map_node.id)
)
data_node.parent = self
child_idx = self._get_insert_idx(data_node.x12_map_node)
self.children.insert(child_idx, data_node)
[docs]
def delete_segment(self, seg_data: pyx12.segment.Segment | str) -> bool:
"""
Delete the given segment from this loop node
- Do not delete the first segment in a loop
- Does not descend into child loops
- Only delete the first found matching segment
:param seg_data: Segment data
:type seg_data: L{node<segment.Segment>} or string
:return: True if found and deleted, else False
:rtype: Boolean
TODO: Check counts?
"""
seg_data = self._get_segment(seg_data)
x12_seg_node = self.x12_map_node.get_child_seg_node(seg_data)
if x12_seg_node is None:
return False
# Iterate over data nodes, except first
self._cleanup()
for i in range(1, len(self.children)):
if self.children[i].type == "seg" and self.children[i].seg_data == seg_data:
del self.children[i]
return True
return False
[docs]
def delete_node(self, x12_path_str: str) -> bool:
"""
Delete the first node at the given relative path. If the path is not a
valid relative path, return False If multiple values exist, this
function deletes the first.
:return: True if found and deleted, else False
:rtype: Boolean
:raises X12PathError: On blank or invalid path
TODO: Check counts?
"""
curr, new_path = self._get_start_node(x12_path_str)
xpath = path.X12Path(new_path)
for n in curr._select(xpath):
n.delete()
return True
return False
def _add_loop_node(self, x12_loop_node: Any) -> X12LoopDataNode:
"""
Add a loop data node to the current tree
:param x12_loop_node: X12 Loop node
:type x12_loop_node: L{node<map_if.loop_if>}
:return: New X12 Loop Data Node
:rtype: L{node<x12context.X12LoopDataNode>}
"""
new_node = X12LoopDataNode(x12_loop_node, parent=self)
# Iterate over data nodes
child_idx = self._get_insert_idx(x12_loop_node)
self.children.insert(child_idx, new_node)
return new_node
[docs]
def get_first_matching_segment(self, x12_path_str: str) -> pyx12.segment.Segment | None:
"""
Get first found Segment at the given relative path. If the path is not a
valid relative path or if the given segment index does not exist, the function
returns None.
:param x12_path_str: Relative X12 Path
:type x12_path_str: string
:return: First matching data segment
:rtype: L{node<segment.Segment>}
:raises X12PathError: On blank or invalid path
"""
if len(x12_path_str) == 0:
raise errors.X12PathError("Blank X12 Path")
curr, new_path = self._get_start_node(x12_path_str)
xpath = path.X12Path(new_path)
if xpath.seg_id is None:
return None
if len(xpath.loop_list) == 0:
seg_id = xpath.seg_id
qual = xpath.id_val
try:
for seg in [seg for seg in curr.children if seg.type == "seg"]:
is_match, qual_code, ele_idx, subele_idx = seg.x12_map_node.is_match_qual(
seg.seg_data, seg_id, qual
)
if is_match:
return seg.seg_data
return None
except errors.EngineError:
raise errors.X12PathError(
"X12 Path is invalid or was not found: %s" % (x12_path_str)
) from None
else:
next_id = xpath.loop_list[0]
del xpath.loop_list[0]
try:
for loop in [loop for loop in curr.children if loop.type == "loop"]:
if loop.id == next_id:
result: pyx12.segment.Segment | None = loop.get_first_matching_segment(
xpath.format()
)
return result
return None
except errors.EngineError:
raise errors.X12PathError(
"X12 Path is invalid or was not found: %s" % (x12_path_str)
) from None
def _get_segment(self, seg_obj: pyx12.segment.Segment | str) -> pyx12.segment.Segment:
"""
Get a pyx12.segment.Segment instance, building one from a string
"""
if isinstance(seg_obj, pyx12.segment.Segment):
return seg_obj
elif isinstance(seg_obj, str):
seg_term, ele_term, subele_term = self._get_terminators()
if seg_term is None or ele_term is None or subele_term is None:
raise errors.EngineError(
"Cannot build Segment: terminators unknown (node has no X12SegmentDataNode children)"
)
return pyx12.segment.Segment(seg_obj, seg_term, ele_term, subele_term)
else:
raise errors.EngineError(
"Unknown type %s for seg_obj %i. Expecting a pyx12.segment.Segment or a str"
% (seg_obj.__class__, seg_obj)
)
def _get_terminators(self) -> tuple[str | None, str | None, str | None]:
for child in self.children:
if (
isinstance(child, X12SegmentDataNode)
and child.seg_data is not None
and child.seg_data.seg_term is not None
):
return (
child.seg_data.seg_term,
child.seg_data.ele_term,
child.seg_data.subele_term,
)
if self.parent is None:
raise errors.EngineError("Cannot find terminators: no parent loop")
result: tuple[str | None, str | None, str | None] = self.parent._get_terminators() # type: ignore[attr-defined]
return result
def __copy__(self) -> X12LoopDataNode:
"""
Returns a copy of this node
"""
ret = X12LoopDataNode(self.x12_map_node)
ret.end_loops = list(self.end_loops)
ret.parent = self.parent
for child in self.children:
ret.children.append(child.copy())
return ret
@property
[docs]
def seg_count(self) -> int | None: # type: ignore[override]
for child in [x for x in self.children if x.type == "seg"]:
return child.seg_count
return None
@property
[docs]
def cur_line_number(self) -> int | None: # type: ignore[override]
for child in [x for x in self.children if x.type == "seg"]:
return child.cur_line_number
return None
[docs]
class X12SegmentDataNode(X12DataNode):
"""
Capture the segment data and X12 definition
Alter relational data
Iterate over contents
"""
def __init__(
self,
x12_node: Any,
seg_data: pyx12.segment.Segment,
parent: X12DataNode | None = None,
start_loops: list[Any] | None = None,
end_loops: list[Any] | None = None,
) -> None:
if start_loops is None:
start_loops = []
if end_loops is None:
end_loops = []
[docs]
self.x12_map_node = x12_node
[docs]
self.seg_data = seg_data
self.start_loops = start_loops
self.end_loops = end_loops
self.err_isa = []
self.err_gs = []
self.err_st = []
self.err_seg = []
self.err_ele = []
[docs]
self.cur_line_number = None
# { Public Methods
[docs]
def handle_errh_errors(self, errh: Any) -> None:
"""
Attach validation errors to segment node
TODO: move errors to parent loops if necessary
"""
self.err_isa.extend(errh.err_isa)
self.err_gs.extend(errh.err_gs)
self.err_st.extend(errh.err_st)
self.err_seg.extend(errh.err_seg)
self.err_ele.extend(errh.err_ele)
[docs]
def delete(self) -> None:
"""
Delete this node. Mark type as deleted.
"""
self.start_loops = []
self.end_loops = []
X12DataNode.delete(self)
[docs]
def get_value(self, x12_path_str: str) -> str | None:
"""
Get the value of the first found element at the given path
:param x12_path_str: Relative X12 Path
:type x12_path_str: string
:return: the element value at the relative X12 path
:rtype: string
"""
seg_data = self.get_first_matching_segment(x12_path_str)
if seg_data is None:
return None
return seg_data.get_value(x12_path_str)
[docs]
def set_value(self, x12_path_str: str, val: str) -> None:
"""
Set the value of simple element at the first found segment at the given path
:param x12_path_str: Relative X12 Path
:type x12_path_str: string
:param val: The new element value
:type val: string
"""
seg_data = self.get_first_matching_segment(x12_path_str)
if seg_data is None:
raise errors.X12PathError("X12 Path is invalid or was not found: %s" % (x12_path_str))
seg_data.set(x12_path_str, val)
[docs]
def get_first_matching_segment(self, x12_path_str: str) -> pyx12.segment.Segment | None:
"""
Get first found Segment at the given relative path. If the path is not a
valid relative path or if the given segment index does not exist, the function
returns None.
:param x12_path_str: Relative X12 Path
:type x12_path_str: string
:return: First matching data segment
:rtype: L{node<segment.Segment>}
:raises X12PathError: On blank or invalid path
"""
curr, new_path_str = self._get_start_node(x12_path_str)
xpath = path.X12Path(new_path_str)
if len(xpath.loop_list) != 0:
raise errors.X12PathError("This X12 Path should not contain loops: %s" % (x12_path_str))
seg_id = xpath.seg_id
qual = xpath.id_val
ele_idx = xpath.ele_idx
if ele_idx is not None and seg_id is None:
return self.seg_data
try:
is_match, qual_code, matched_ele_idx, matched_subele_idx = (
curr.x12_map_node.is_match_qual(curr.seg_data, seg_id, qual)
)
if is_match:
seg_result: pyx12.segment.Segment | None = curr.seg_data
return seg_result
return None
except errors.EngineError:
raise errors.X12PathError(
"X12 Path is invalid or was not found: %s" % (x12_path_str)
) from None
[docs]
def iterate_segments(self) -> Iterator[dict[str, Any]]:
"""
Iterate on this node, return the segment
"""
yield {
"type": "seg",
"id": self.x12_map_node.id,
"path": self.x12_map_node.x12path,
"segment": self.seg_data,
"seg_count": self.seg_count,
"cur_line_number": self.cur_line_number,
}
[docs]
def iterate_loop_segments(self) -> Iterator[dict[str, Any]]:
"""
Iterate over this node and children, return loop start and loop end
and any segments found
"""
for loop in self.end_loops:
yield {"node": loop, "type": "loop_end", "id": loop.id}
for loop in self.start_loops:
yield {"node": loop, "type": "loop_start", "id": loop.id}
yield {
"type": "seg",
"id": self.id,
"segment": self.seg_data,
"start_loops": self.start_loops,
"end_loops": self.end_loops,
"seg_count": self.seg_count,
"cur_line_number": self.cur_line_number,
}
def __copy__(self) -> X12SegmentDataNode:
"""
Returns a copy of this node
"""
if self.seg_data is None:
raise errors.EngineError("Cannot copy X12SegmentDataNode with no seg_data")
seg_data = self.seg_data.copy()
ret = X12SegmentDataNode(self.x12_map_node, seg_data, self.parent)
ret.start_loops = list(self.start_loops)
ret.end_loops = list(self.end_loops)
return ret
[docs]
def select(self, x12_path_str: str) -> Iterator[X12DataNode]:
"""
Segment nodes have no sub-nodes so return None
:param x12_path_str: Relative X12 path - 2400/2430
:type x12_path_str: string
:return: Iterator on the matching sub-nodes, relative to the instance.
:rtype: L{node<x12context.X12DataNode>}
"""
return iter([])
def _select(self, x12path: path.X12Path) -> Iterator[X12DataNode]:
"""
Empty iter for segment nodes
:param x12path: x12 map path
:type x12path: L{path<path.X12Path>}
"""
return iter([])
# { Property Accessors
@property
[docs]
def err_ct(self) -> int:
"""
:return: Count of errors for this segment
:rtype: int
"""
return (
len(self.err_isa)
+ len(self.err_gs)
+ len(self.err_st)
+ len(self.err_seg)
+ len(self.err_ele)
)
[docs]
class X12ContextReader:
"""
Read an X12 input stream
Keep context when needed
"""
[docs]
map_path: str | None
[docs]
src: x12file.X12Reader
[docs]
map_file: str | None
[docs]
map_index_if: map_index.map_index
def __init__(
self,
param: Any,
errh: Any,
src_file_obj: str | TextIO,
xslt_files: Any = None,
map_path: str | None = None,
) -> None:
"""
:param param: pyx12.param instance
:param errh: Error Handler object
:param src_file_obj: Source document
:type src_file_obj: string
:rtype: boolean
"""
self.param = param
self.map_path = map_path
# Use the caller's error handler. The historical hardcode here
# discarded the parameter, leaving callers no way to capture
# validation errors (and `iter_segments` only ever attached
# errors to data nodes via a fresh local errh_list, never
# populating a tree). Threading the parameter through unlocks
# the err_handler tree path: callers who pass an
# `error_handler.err_handler()` get a populated tree they can
# walk with a visitor (e.g. `errh_json_visitor`); callers who
# keep passing `errh_null()` see no behavior change since all
# the lifecycle methods are no-ops there.
self.errh = errh
suppress = param.get("suppress_error_codes")
if suppress:
self.errh.suppress_error_codes = set(suppress)
self.icvn = None
self.fic = None
self.vriic = None
self.tspc = None
# Get X12 DATA file
self.src = x12file.X12Reader(src_file_obj)
# Get Map of Control Segments
self.map_file = (
"x12.control.00501.xml" if self.src.icvn == "00501" else "x12.control.00401.xml"
)
self.control_map = map_if.load_map_file(self.map_file, param, self.map_path)
self.map_index_if = map_index.map_index(self.map_path)
self.x12_map_node = self.control_map.getnodebypath("/ISA_LOOP/ISA")
self.walker = walk_tree()
[docs]
def close(self) -> None:
"""Close the underlying X12Reader. Idempotent."""
self.src.close()
def __enter__(self) -> X12ContextReader:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> Literal[False]:
self.close()
return False
# { Public Methods
[docs]
def iter_segments(self, loop_id: str | None = None) -> Iterator[X12DataNode]:
"""
Simple segment or tree iterator
:return: X12 Data Node - simple segment or tree
:rtype: L{node<x12context.X12DataNode>}
"""
cur_tree: X12LoopDataNode | None = None
cur_data_node: X12DataNode | None = None
icvn: str | None = None
fic: str | None = None
vriic: str | None = None
cur_map: Any = None
for seg in self.src:
# find node
orig_node = self.x12_map_node
pop_loops: list[Any] = []
push_loops: list[Any] = []
errh = error_handler.errh_list()
if seg.get_seg_id() == "ISA":
tpath = "/ISA_LOOP/ISA"
self.x12_map_node = self.control_map.getnodebypath(tpath)
elif seg.get_seg_id() == "GS":
tpath = "/ISA_LOOP/GS_LOOP/GS"
self.x12_map_node = self.control_map.getnodebypath(tpath)
else:
try:
seg_node, pop_loops, push_loops, walk_errors = self.walker.walk_errors(
self.x12_map_node,
seg,
self.src.get_seg_count(),
self.src.get_cur_line(),
self.src.get_ls_id(),
)
apply_walk_errors(errh, walk_errors)
# Also propagate the same walker errors into the caller's
# error handler so the err_handler tree (when caller
# passed `error_handler.err_handler()`) carries them.
apply_walk_errors(self.errh, walk_errors)
self.x12_map_node = seg_node
except errors.EngineError:
raise
if self.x12_map_node is None:
self.x12_map_node = orig_node
else:
seg_id = seg.get_seg_id()
if seg_id == "ISA":
icvn = seg.get_value("ISA12")
self.errh.add_isa_loop(seg, self.src)
self.errh.handle_errors(self.src.pop_errors())
elif seg_id == "IEA":
self.errh.handle_errors(self.src.pop_errors())
self.errh.close_isa_loop(self.x12_map_node, seg, self.src)
elif seg_id == "GS":
fic = seg.get_value("GS01")
vriic = seg.get_value("GS08")
map_file_new = self.map_index_if.get_filename(icvn, vriic, fic)
if self.map_file != map_file_new:
self.map_file = map_file_new
if self.map_file is None:
raise pyx12.errors.EngineError(
"Map not found. icvn=%s, fic=%s, vriic=%s" % (icvn, fic, vriic)
)
cur_map = map_if.load_map_file(self.map_file, self.param, self.map_path)
if cur_map.id == "837":
self.src.check_837_lx = True
else:
self.src.check_837_lx = False
self._reset_counter_to_isa_counts()
self._reset_counter_to_gs_counts()
tpath = "/ISA_LOOP/GS_LOOP/GS"
self.x12_map_node = cur_map.getnodebypath(tpath)
self.errh.add_gs_loop(seg, self.src)
self.errh.handle_errors(self.src.pop_errors())
elif seg_id == "GE":
self.errh.handle_errors(self.src.pop_errors())
self.errh.close_gs_loop(self.x12_map_node, seg, self.src)
elif seg_id == "ST":
self.errh.add_st_loop(seg, self.src)
self.errh.handle_errors(self.src.pop_errors())
elif seg_id == "SE":
self.errh.handle_errors(self.src.pop_errors())
self.errh.close_st_loop(self.x12_map_node, seg, self.src)
elif seg_id == "BHT":
if vriic in ("004010X094", "004010X094A1"):
tspc = seg.get_value("BHT02")
map_file_new = self.map_index_if.get_filename(icvn, vriic, fic, tspc)
if self.map_file != map_file_new:
self.map_file = map_file_new
if self.map_file is None:
err_str = "Map not found. icvn=%s, fic=%s, vriic=%s, tspc=%s" % (
icvn,
fic,
vriic,
tspc,
)
raise pyx12.errors.EngineError(err_str)
cur_map = map_if.load_map_file(self.map_file, self.param, self.map_path)
if cur_map.id == "837":
self.src.check_837_lx = True
else:
self.src.check_837_lx = False
self._apply_loop_count(self.x12_map_node, cur_map)
tpath = "/ISA_LOOP/GS_LOOP/ST_LOOP/HEADER/BHT"
self.x12_map_node = cur_map.getnodebypath(tpath)
self.errh.add_seg(
self.x12_map_node,
seg,
self.src.get_seg_count(),
self.src.get_cur_line(),
self.src.get_ls_id(),
)
self.errh.handle_errors(self.src.pop_errors())
else:
self.errh.add_seg(
self.x12_map_node,
seg,
self.src.get_seg_count(),
self.src.get_cur_line(),
self.src.get_ls_id(),
)
self.errh.handle_errors(self.src.pop_errors())
# Run element / segment-level validation against the
# caller's error handler. Skipped for envelope segments
# which are matched via the control map and not the
# transaction map; element validation against the control
# map is x12n_document's job, not the streaming context
# reader's. errh_null silently absorbs all of these calls
# so existing context-reader tests are unchanged.
if seg_id not in ("ISA", "IEA", "GS", "GE", "ST", "SE") and isinstance(
self.x12_map_node, map_if.segment_if
):
apply_segment_errors(self.x12_map_node, seg, self.errh)
node_x12path = self.x12_map_node.x12path
# If we are in the requested tree, wait until we have the whole thing
if loop_id is not None and loop_id in node_x12path.loop_list:
# Are we at the start of the requested tree?
if (
node_x12path.loop_list[-1] == loop_id
and self.x12_map_node.is_first_seg_in_loop()
):
if cur_tree is not None:
# Found root loop repeat. Yield existing, create new tree
yield cur_tree
# Make new tree on parent loop
cur_tree = X12LoopDataNode(
x12_node=self.x12_map_node.parent, end_loops=pop_loops
)
cur_data_node = self._add_segment(
cur_tree, self.x12_map_node, seg, pop_loops, push_loops
)
cur_data_node.seg_count = self.src.get_seg_count()
cur_data_node.cur_line_number = self.src.get_cur_line()
else:
if cur_data_node is None or self.x12_map_node is None:
raise errors.EngineError(
"Either cur_data_node or self.x12_map_node is None"
)
cur_data_node = self._add_segment(
cur_data_node, self.x12_map_node, seg, pop_loops, push_loops
)
cur_data_node.seg_count = self.src.get_seg_count()
cur_data_node.cur_line_number = self.src.get_cur_line()
else:
if cur_tree is not None:
# We have completed a tree
yield cur_tree
cur_tree = None
if cur_data_node is not None:
if loop_id:
pop_loops = [
x12_node
for x12_node in pop_loops
if x12_node.get_path().find(loop_id) == -1
]
if loop_id in [x12.id for x12 in push_loops]:
raise errors.EngineError(
"Loop ID %s should not be in push loops" % (loop_id)
)
if loop_id in [x12.id for x12 in pop_loops]:
raise errors.EngineError(
"Loop ID %s should not be in pop loops" % (loop_id)
)
cur_data_node = X12SegmentDataNode(
self.x12_map_node,
seg,
parent=None,
start_loops=push_loops,
end_loops=pop_loops,
)
cur_data_node.seg_count = self.src.get_seg_count()
cur_data_node.cur_line_number = self.src.get_cur_line()
else:
cur_data_node = X12SegmentDataNode(self.x12_map_node, seg)
cur_data_node.seg_count = self.src.get_seg_count()
cur_data_node.cur_line_number = self.src.get_cur_line()
# Get errors caught by x12Reader
errh.handle_errors(self.src.pop_errors())
# Handle errors captured in errh_list
cur_data_node.handle_errh_errors(errh)
yield cur_data_node
[docs]
def register_error_callback(self, callback: Any, err_type: str) -> None:
"""
Future: Callbacks for X12 validation errors
"""
pass
# { Property Accessors
@property
[docs]
def seg_term(self) -> str | None:
"""
:return: Current X12 segment terminator
:rtype: string
"""
return self.src.seg_term
@property
[docs]
def ele_term(self) -> str | None:
"""
:return: Current X12 element terminator
:rtype: string
"""
return self.src.ele_term
@property
[docs]
def subele_term(self) -> str | None:
"""
:return: Current X12 sub-element terminator
:rtype: string
"""
return self.src.subele_term
@property
[docs]
def cur_seg_count(self) -> int:
return self.src.get_seg_count()
@property
[docs]
def get_cur_line(self) -> int:
return self.src.get_cur_line()
# { Private Methods
def _add_segment(
self,
cur_data_node: X12DataNode,
segment_x12_node: Any,
seg_data: pyx12.segment.Segment,
pop_loops: list[Any],
push_loops: list[Any],
) -> X12SegmentDataNode:
"""
From the last position in the X12 Data Node Tree, find the correct
position for the new segment; moving up or down the tree as appropriate.
G{callgraph}
:param cur_data_node: Current X12 Data Node
:type cur_data_node: L{node<x12context.X12DataNode>}
:param segment_x12_node: Segment Map Node
:type segment_x12_node: L{node<map_if.x12_node>}
:return: New X12 Data Node
:rtype: L{node<x12context.X12DataNode>}
"""
if not segment_x12_node.is_segment():
raise errors.EngineError("Node must be a segment")
# Get enclosing loop
orig_data_node = cur_data_node
parent_x12_node = pop_to_parent_loop(segment_x12_node)
cur_loop_node: X12DataNode | None = cur_data_node
if cur_loop_node is not None and cur_loop_node.type == "seg":
cur_loop_node = cur_loop_node.parent
# check path for new loops to be added
new_path = parent_x12_node.x12path
if cur_loop_node is None:
raise errors.EngineError("cur_loop_node is None")
last_path = cur_loop_node.x12_map_node.x12path
if last_path != new_path:
for x12_loop in pop_loops:
if cur_loop_node is None or cur_loop_node.id != x12_loop.id:
raise errors.EngineError(
"Loop pop: %s != %s"
% (cur_loop_node.id if cur_loop_node else None, x12_loop.id)
)
cur_loop_node = cur_loop_node.parent
for x12_loop in push_loops:
if cur_loop_node is None:
raise errors.EngineError("cur_loop_node is None. x12_loop: %s" % (x12_loop.id))
# push new loop nodes, if needed
cur_loop_node = cur_loop_node._add_loop_node(x12_loop) # type: ignore[attr-defined]
else:
# handle loop repeat
if (
cur_loop_node is not None
and cur_loop_node.parent is not None
and segment_x12_node.is_first_seg_in_loop()
):
cur_loop_node = cur_loop_node.parent._add_loop_node( # type: ignore[attr-defined]
segment_x12_node.parent
)
try:
new_node = X12SegmentDataNode(self.x12_map_node, seg_data)
except Exception:
mypath = self.x12_map_node.get_path()
err_str = f"X12SegmentDataNode failed: x12_path={mypath}, seg_date={seg_data}"
raise errors.EngineError(err_str) from None
try:
new_node.parent = cur_loop_node
if cur_loop_node is None:
raise errors.EngineError("cur_loop_node is None")
cur_loop_node.children.append(new_node)
except Exception:
err_str = "X12SegmentDataNode child append failed:"
err_str += " seg_x12_path=%s" % (segment_x12_node.get_path())
err_str += ", orig_datanode=%s" % (orig_data_node.cur_path)
err_str += ", cur_datanode=%s" % (cur_data_node.cur_path)
err_str += ", seg_data=%s" % (seg_data)
raise errors.EngineError(err_str) from None
return new_node
def _apply_loop_count(self, orig_node: Any, new_map: Any) -> None:
"""Stub - referenced from iter_segments BHT branch."""
pass
def _reset_counter_to_isa_counts(self) -> None:
"""
Reset ISA instance counts
"""
self.walker.counter.reset_to_node("/ISA_LOOP")
self.walker.counter.increment("/ISA_LOOP")
self.walker.counter.increment("/ISA_LOOP/ISA")
def _reset_counter_to_gs_counts(self) -> None:
"""
Reset GS instance counts
"""
self.walker.counter.reset_to_node("/ISA_LOOP/GS_LOOP")
self.walker.counter.increment("/ISA_LOOP/GS_LOOP")
self.walker.counter.increment("/ISA_LOOP/GS_LOOP/GS")