Source code for pyx12.rawx12file
######################################################################
# John Holland <john@zoner.org>
# All rights reserved.
#
# This software is licensed as described in the file LICENSE.txt, which
# you should have received as part of this distribution.
#
######################################################################
"""
Low level interface to an X12 data input stream.
Iterates over segment line strings.
Used by X12Reader.
"""
from __future__ import annotations
from collections.abc import Iterator
from typing import TextIO
# Intrapackage imports
import pyx12.errors
import pyx12.segment
[docs]
DEFAULT_BUFSIZE = 8 * 1024
[docs]
class RawX12File:
"""
Interface to an X12 data file
"""
[docs]
repetition_term: str | None
def __init__(self, fin: TextIO) -> None:
"""
Initialize the file X12 file reader
:param fin: an open, readable file object
:type fin: open file object
"""
self.fd = fin
self.buffer = ""
line = self.fd.read(ISA_LEN)
if line[:3] != "ISA":
err_str = "First line does not begin with 'ISA': %s" % line[:3]
raise pyx12.errors.X12Error(err_str)
if len(line) != ISA_LEN:
err_str = "ISA line is only %i characters" % len(line)
raise pyx12.errors.X12Error(err_str)
self.icvn = line[84:89]
if self.icvn not in ("00401", "00501"):
err_str = "ISA Interchange Control Version Number is unknown: %s for %s" % (
self.icvn,
line,
)
raise pyx12.errors.X12Error(err_str)
self.seg_term = line[-1]
self.ele_term = line[3]
self.subele_term = line[-2]
self.repetition_term = line[82] if self.icvn == "00501" else None
self.buffer = line
self.buffer += self.fd.read(DEFAULT_BUFSIZE)
def __iter__(self) -> Iterator[str]:
"""
Iterate over input lines
Often, X12 files have a CR-LF after the segment delimiter.
Split the input stream on the delimiter and remove any leading CR-LF
"""
while True:
if self.buffer.find(self.seg_term) == -1:
# Need more data
self.buffer += self.fd.read(DEFAULT_BUFSIZE)
if self.buffer.find(self.seg_term) == -1:
# Still have no segment terminator
break
# Get first segment in buffer
(line, self.buffer) = self.buffer.split(self.seg_term, 1)
line = line.lstrip("\n\r")
if line == "":
break
yield line
[docs]
def get_term(self) -> tuple[str, str, str, str, str | None]:
"""
Get the original terminators
:rtype: tuple(string, string, string, string)
"""
return (self.seg_term, self.ele_term, self.subele_term, "\n", self.repetition_term)