"""A file interface to nodes for PyTables databases.
The FileNode module provides a file interface for using inside of
PyTables database files. Use the new_node() function to create a brand
new file node which can be read and written as any ordinary Python
file. Use the open_node() function to open an existing (i.e. created
with new_node()) node for read-only or read-write access. Read access
is always available. Write access (enabled on new files and files
opened with mode 'a+') only allows appending data to a file node.
Currently only binary I/O is supported.
See :ref:`filenode_usersguide` for instructions on use.
.. versionchanged:: 3.0
In version 3.0 the module as been completely rewritten to be fully
compliant with the interfaces defined in the :mod:`io` module.
"""
import io
import os
import re
import warnings
from pathlib import Path
import numpy as np
import tables as tb
NodeType = 'file'
"""Value for NODE_TYPE node system attribute."""
NodeTypeVersions = [1, 2]
"""Supported values for NODE_TYPE_VERSION node system attribute."""
[docs]
class RawPyTablesIO(io.RawIOBase):
"""Base class for raw binary I/O on HDF5 files using PyTables."""
# A lambda to turn a size into a shape, for each version.
_size_to_shape = [
None,
lambda l: (l, 1),
lambda l: (l, ),
]
def __init__(self, node, mode=None):
super().__init__()
self._check_node(node)
self._check_attributes(node)
if mode is None:
mode = node._v_file.mode
else:
self._check_mode(mode)
self._cross_check_mode(mode, node._v_file.mode)
self._node = node
self._mode = mode
self._pos = 0
self._version = int(node.attrs.NODE_TYPE_VERSION)
self._vshape = self._size_to_shape[self._version]
self._vtype = node.atom.dtype.base.type
# read only attribute
@property
def mode(self):
"""File mode."""
return self._mode
# def tell(self) -> int:
[docs]
def tell(self):
"""Return current stream position."""
self._checkClosed()
return self._pos
# def seek(self, pos: int, whence: int = 0) -> int:
[docs]
def seek(self, pos, whence=0):
"""Change stream position.
Change the stream position to byte offset offset. offset is
interpreted relative to the position indicated by whence. Values
for whence are:
* 0 -- start of stream (the default); offset should be zero or positive
* 1 -- current stream position; offset may be negative
* 2 -- end of stream; offset is usually negative
Return the new absolute position.
"""
self._checkClosed()
try:
pos = pos.__index__()
# except AttributeError as err:
# raise TypeError("an integer is required") from err
except AttributeError:
raise TypeError("an integer is required")
if whence == 0:
if pos < 0:
raise ValueError(f"negative seek position {pos!r}")
self._pos = pos
elif whence == 1:
self._pos = max(0, self._pos + pos)
elif whence == 2:
self._pos = max(0, self._node.nrows + pos)
else:
raise ValueError("invalid whence value")
return self._pos
# def seekable(self) -> bool:
[docs]
def seekable(self):
"""Return whether object supports random access.
If False, seek(), tell() and truncate() will raise IOError. This
method may need to do a test seek().
"""
return True
# def fileno(self) -> int:
[docs]
def fileno(self):
"""Returns underlying file descriptor if one exists.
An IOError is raised if the IO object does not use a file
descriptor.
"""
self._checkClosed()
return self._node._v_file.fileno()
# def close(self) -> None:
[docs]
def close(self):
"""Flush and close the IO object.
This method has no effect if the file is already closed.
"""
if not self.closed:
if getattr(self._node, '_v_file', None) is None:
warnings.warn("host PyTables file is already closed!")
try:
super().close()
finally:
# Release node object to allow closing the file.
self._node = None
[docs]
def flush(self):
"""Flush write buffers, if applicable.
This is not implemented for read-only and non-blocking streams.
"""
self._checkClosed()
self._node.flush()
# def truncate(self, pos: int = None) -> int:
[docs]
def truncate(self, pos=None):
"""Truncate file to size bytes.
Size defaults to the current IO position as reported by tell().
Return the new size.
Currently, this method only makes sense to grow the file node,
since data can not be rewritten nor deleted.
"""
self._checkClosed()
self._checkWritable()
if pos is None:
pos = self._pos
elif pos < 0:
raise ValueError(f"negative truncate position {pos!r}")
if pos < self._node.nrows:
raise OSError("truncating is only allowed for growing a file")
self._append_zeros(pos - self._node.nrows)
return self.seek(pos)
# def readable(self) -> bool:
[docs]
def readable(self):
"""Return whether object was opened for reading.
If False, read() will raise IOError.
"""
mode = self._mode
return 'r' in mode or '+' in mode
# def writable(self) -> bool:
[docs]
def writable(self):
"""Return whether object was opened for writing.
If False, write() and truncate() will raise IOError.
"""
mode = self._mode
return 'w' in mode or 'a' in mode or '+' in mode
# def readinto(self, b: bytearray) -> int:
[docs]
def readinto(self, b):
"""Read up to len(b) bytes into b.
Returns number of bytes read (0 for EOF), or None if the object
is set not to block as has no data to read.
"""
self._checkClosed()
self._checkReadable()
if self._pos >= self._node.nrows:
return 0
n = len(b)
start = self._pos
stop = self._pos + n
# XXX optimized path
# if stop <= self._node.nrows and isinstance(b, np.ndarray):
# self._node.read(start, stop, out=b)
# self._pos += n
# return n
if stop > self._node.nrows:
stop = self._node.nrows
n = stop - start
# XXX This ought to work with anything that supports the buffer API
b[:n] = self._node.read(start, stop).tobytes()
self._pos += n
return n
# def readline(self, limit: int = -1) -> bytes:
[docs]
def readline(self, limit=-1):
"""Read and return a line from the stream.
If limit is specified, at most limit bytes will be read.
The line terminator is always ``\\n`` for binary files; for text
files, the newlines argument to open can be used to select the line
terminator(s) recognized.
"""
self._checkClosed()
self._checkReadable()
chunksize = self._node.chunkshape[0] if self._node.chunkshape else -1
# XXX: check
lsep = b'\n'
lseplen = len(lsep)
# Set the remaining bytes to read to the specified size.
remsize = limit
partial = []
finished = False
while not finished:
# Read a string limited by the remaining number of bytes.
if limit <= 0:
ibuff = self.read(chunksize)
else:
ibuff = self.read(min(remsize, chunksize))
ibufflen = len(ibuff)
remsize -= ibufflen
if ibufflen >= lseplen:
# Separator fits, look for EOL string.
eolindex = ibuff.find(lsep)
elif ibufflen == 0:
# EOF was immediately reached.
finished = True
continue
else: # ibufflen < lseplen
# EOF was hit and separator does not fit. ;)
partial.append(ibuff)
finished = True
continue
if eolindex >= 0:
# Found an EOL. If there are trailing characters,
# cut the input buffer and seek back;
# else add the whole input buffer.
trailing = ibufflen - lseplen - eolindex # Bytes beyond EOL.
if trailing > 0:
obuff = ibuff[:-trailing]
self.seek(-trailing, 1)
remsize += trailing
else:
obuff = ibuff
finished = True
elif lseplen > 1 and (limit <= 0 or remsize > 0):
# Seek back a little since the end of the read string
# may have fallen in the middle of the line separator.
obuff = ibuff[:-lseplen + 1]
self.seek(-lseplen + 1, 1)
remsize += lseplen - 1
else: # eolindex<0 and (lseplen<=1 or (limit>0 and remsize<=0))
# Did not find an EOL, add the whole input buffer.
obuff = ibuff
# Append (maybe cut) buffer.
partial.append(obuff)
# If a limit has been specified and the remaining count
# reaches zero, the reading is finished.
if limit > 0 and remsize <= 0:
finished = True
return b''.join(partial)
# def write(self, b: bytes) -> int:
[docs]
def write(self, b):
"""Write the given buffer to the IO stream.
Returns the number of bytes written, which may be less than
len(b).
"""
self._checkClosed()
self._checkWritable()
if isinstance(b, str):
raise TypeError("can't write str to binary stream")
n = len(b)
if n == 0:
return 0
pos = self._pos
# Is the pointer beyond the real end of data?
end2off = pos - self._node.nrows
if end2off > 0:
# Zero-fill the gap between the end of data and the pointer.
self._append_zeros(end2off)
# Append data.
self._node.append(
np.ndarray(buffer=b, dtype=self._vtype, shape=self._vshape(n)))
self._pos += n
return n
def _checkClosed(self):
"""Checks if file node is open.
Checks whether the file node is open or has been closed. In the
second case, a ValueError is raised. If the host PyTables has
been closed, ValueError is also raised.
"""
super()._checkClosed()
if getattr(self._node, '_v_file', None) is None:
raise ValueError("host PyTables file is already closed!")
def _check_node(self, node):
if not isinstance(node, tb.EArray):
raise TypeError('the "node" parameter should be a tables.EArray')
if not isinstance(node.atom, tb.UInt8Atom):
raise TypeError('only nodes with atom "UInt8Atom" are allowed')
def _check_mode(self, mode):
if not isinstance(mode, str):
raise TypeError("invalid mode: %r" % mode)
modes = set(mode)
if modes - set("arwb+tU") or len(mode) > len(modes):
raise ValueError("invalid mode: %r" % mode)
reading = "r" in modes
writing = "w" in modes
appending = "a" in modes
# updating = "+" in modes
text = "t" in modes
binary = "b" in modes
if "U" in modes:
if writing or appending:
raise ValueError("can't use U and writing mode at once")
reading = True
if text and binary:
raise ValueError("can't have text and binary mode at once")
if reading + writing + appending > 1:
raise ValueError("can't have read/write/append mode at once")
if not (reading or writing or appending):
raise ValueError("must have exactly one of read/write/append mode")
def _cross_check_mode(self, mode, h5filemode):
# XXX: check
# readable = bool('r' in mode or '+' in mode)
# h5readable = bool('r' in h5filemode or '+' in h5filemode)
#
# if readable and not h5readable:
# raise ValueError("RawPyTablesIO can't be open in read mode if "
# "the underlying hdf5 file is not readable")
writable = bool('w' in mode or 'a' in mode or '+' in mode)
h5writable = bool('w' in h5filemode or 'a' in h5filemode or
'+' in h5filemode)
if writable and not h5writable:
raise ValueError("RawPyTablesIO can't be open in write mode if "
"the underlying hdf5 file is not writable")
def _check_attributes(self, node):
"""Checks file node-specific attributes.
Checks for the presence and validity
of the system attributes 'NODE_TYPE' and 'NODE_TYPE_VERSION'
in the specified PyTables node (leaf).
ValueError is raised if an attribute is missing or incorrect.
"""
attrs = node.attrs
ltype = getattr(attrs, 'NODE_TYPE', None)
ltypever = getattr(attrs, 'NODE_TYPE_VERSION', None)
if ltype != NodeType:
raise ValueError(f"invalid type of node object: {ltype}")
if ltypever not in NodeTypeVersions:
raise ValueError(
f"unsupported type version of node object: {ltypever}")
def _append_zeros(self, size):
"""_append_zeros(size) -> None. Appends a string of zeros.
Appends a string of 'size' zeros to the array,
without moving the file pointer.
"""
# Appending an empty array would raise an error.
if size == 0:
return
# XXX This may be redone to avoid a potentially large in-memory array.
self._node.append(
np.zeros(dtype=self._vtype, shape=self._vshape(size)))
class FileNodeMixin:
"""Mixin class for FileNode objects.
It provides access to the attribute set of the node that becomes
available via the attrs property. You can add attributes there, but
try to avoid attribute names in all caps or starting with '_', since
they may clash with internal attributes.
"""
# The attribute set property methods.
def _get_attrs(self):
"""Returns the attribute set of the file node."""
# sefl._checkClosed()
return self._node.attrs
def _set_attrs(self, value):
"""set_attrs(string) -> None. Raises ValueError."""
raise ValueError("changing the whole attribute set is not allowed")
def _del_attrs(self):
"""del_attrs() -> None. Raises ValueError."""
raise ValueError("deleting the whole attribute set is not allowed")
# The attribute set property.
attrs = property(
_get_attrs, _set_attrs, _del_attrs,
"A property pointing to the attribute set of the file node.")
[docs]
class ROFileNode(FileNodeMixin, RawPyTablesIO):
"""Creates a new read-only file node.
Creates a new read-only file node associated with the specified
PyTables node, providing a standard Python file interface to it.
The node has to have been created on a previous occasion
using the new_node() function.
The node used as storage is also made available via the read-only
attribute node. Please do not tamper with this object if it's
avoidable, since you may break the operation of the file node object.
The constructor is not intended to be used directly.
Use the open_node() function in read-only mode ('r') instead.
:Version 1:
implements the file storage as a UInt8 uni-dimensional EArray.
:Version 2:
uses an UInt8 N vector EArray.
.. versionchanged:: 3.0
The offset attribute is no more available, please use seek/tell
methods instead.
.. versionchanged:: 3.0
The line_separator property is no more available.
The only line separator used for binary I/O is ``\\n``.
"""
def __init__(self, node):
RawPyTablesIO.__init__(self, node, 'r')
self._checkReadable()
@property
def node(self):
return self._node
[docs]
class RAFileNode(FileNodeMixin, RawPyTablesIO):
"""Creates a new read-write file node.
The first syntax opens the specified PyTables node, while the
second one creates a new node in the specified PyTables file.
In the second case, additional named arguments 'where' and 'name'
must be passed to specify where the file node is to be created.
Other named arguments such as 'title' and 'filters' may also be
passed. The special named argument 'expectedsize', indicating an
estimate of the file size in bytes, may also be passed.
Write access means reading as well as appending data is allowed.
The node used as storage is also made available via the read-only
attribute node. Please do not tamper with this object if it's
avoidable, since you may break the operation of the file node object.
The constructor is not intended to be used directly.
Use the new_node() or open_node() functions instead.
:Version 1:
implements the file storage as a UInt8 uni-dimensional EArray.
:Version 2:
uses an UInt8 N vector EArray.
.. versionchanged:: 3.0
The offset attribute is no more available, please use seek/tell
methods instead.
.. versionchanged:: 3.0
The line_separator property is no more available.
The only line separator used for binary I/O is ``\\n``.
"""
# The atom representing a byte in the array, for each version.
_byte_shape = [
None,
(0, 1),
(0,),
]
__allowed_init_kwargs = [
'where', 'name', 'title', 'filters', 'expectedsize']
def __init__(self, node, h5file, **kwargs):
if node is not None:
# Open an existing node and get its version.
self._check_attributes(node)
self._version = node.attrs.NODE_TYPE_VERSION
elif h5file is not None:
# Check for allowed keyword arguments,
# to avoid unwanted arguments falling through to array constructor.
for kwarg in kwargs:
if kwarg not in self.__allowed_init_kwargs:
raise TypeError(
"%s keyword argument is not allowed" % repr(kwarg))
# Turn 'expectedsize' into 'expectedrows'.
if 'expectedsize' in kwargs:
# These match since one byte is stored per row.
expectedrows = kwargs['expectedsize']
kwargs = kwargs.copy()
del kwargs['expectedsize']
kwargs['expectedrows'] = expectedrows
# Create a new array in the specified PyTables file.
self._version = NodeTypeVersions[-1]
shape = self._byte_shape[self._version]
node = h5file.create_earray(
atom=tb.UInt8Atom(), shape=shape, **kwargs)
# Set the node attributes, else remove the array itself.
try:
self._set_attributes(node)
except RuntimeError:
h5file.remove_node(kwargs['where'], kwargs['name'])
raise
RawPyTablesIO.__init__(self, node, 'a+')
self._checkReadable()
self._checkWritable()
@property
def node(self):
return self._node
def _set_attributes(self, node):
"""_set_attributes(node) -> None. Adds file node-specific attributes.
Sets the system attributes 'NODE_TYPE' and 'NODE_TYPE_VERSION'
in the specified PyTables node (leaf).
"""
attrs = node.attrs
attrs.NODE_TYPE = NodeType
attrs.NODE_TYPE_VERSION = NodeTypeVersions[-1]
[docs]
def new_node(h5file, **kwargs):
"""Creates a new file node object in the specified PyTables file object.
Additional named arguments where and name must be passed to specify where
the file node is to be created. Other named arguments such as title and
filters may also be passed.
The special named argument expectedsize, indicating an estimate of the
file size in bytes, may also be passed. It returns the file node object.
"""
return RAFileNode(None, h5file, **kwargs)
[docs]
def open_node(node, mode='r'):
"""Opens an existing file node.
Returns a file node object from the existing specified PyTables
node. If mode is not specified or it is 'r', the file can only be
read, and the pointer is positioned at the beginning of the file. If
mode is 'a+', the file can be read and appended, and the pointer is
positioned at the end of the file.
"""
if mode == 'r':
return ROFileNode(node)
elif mode == 'a+':
return RAFileNode(node, None)
else:
raise OSError(f"invalid mode: {mode}")
[docs]
def save_to_filenode(h5file, filename, where, name=None, overwrite=False,
title="", filters=None):
"""Save a file's contents to a filenode inside a PyTables file.
.. versionadded:: 3.2
Parameters
----------
h5file
The PyTables file to be written to; can be either a string
giving the file's location or a :class:`File` object. If a file
with name *h5file* already exists, it will be opened in
mode ``a``.
filename
Path of the file which shall be stored within the PyTables file.
where, name
Location of the filenode where the data shall be stored. If
*name* is not given, and *where* is either a :class:`Group`
object or a string ending on ``/``, the leaf name will be set to
the file name of *filename*. The *name* will be modified to
adhere to Python's natural naming convention; the original
filename will be preserved in the filenode's *_filename*
attribute.
overwrite
Whether or not a possibly existing filenode of the specified
name shall be overwritten.
title
A description for this node (it sets the ``TITLE`` HDF5
attribute on disk).
filters
An instance of the :class:`Filters` class that provides
information about the desired I/O filters to be applied
during the life of this object.
"""
path = Path(filename).resolve()
# sanity checks
if not os.access(path, os.R_OK):
raise OSError(f"The file '{path}' could not be read")
if isinstance(h5file, tb.file.File) and h5file.mode == "r":
raise OSError(f"The file '{h5file.filename}' is opened read-only")
# guess filenode's name if necessary
if name is None:
if isinstance(where, tb.group.Group):
name = os.path.split(filename)[1]
if isinstance(where, str):
if where.endswith("/"):
name = os.path.split(filename)[1]
else:
nodepath = where.split("/")
where = "/" + "/".join(nodepath[:-1])
name = nodepath[-1]
# sanitize name if necessary
if not tb.path._python_id_re.match(name):
name = re.sub('(?![a-zA-Z0-9_]).', "_",
re.sub('^(?![a-zA-Z_]).', "_", name))
new_h5file = not isinstance(h5file, tb.file.File)
f = tb.File(h5file, "a") if new_h5file else h5file
# check for already existing filenode
try:
f.get_node(where=where, name=name)
if not overwrite:
if new_h5file:
f.close()
raise OSError(
f"Specified node already exists in file '{f.filename}'"
)
except tb.NoSuchNodeError:
pass
# read data from disk
data = path.read_bytes()
# remove existing filenode if present
try:
f.remove_node(where=where, name=name)
except tb.NoSuchNodeError:
pass
# write file's contents to filenode
fnode = new_node(f, where=where, name=name, title=title, filters=filters)
fnode.write(data)
fnode.attrs._filename = path.name
fnode.close()
# cleanup
if new_h5file:
f.close()
[docs]
def read_from_filenode(h5file, filename, where, name=None, overwrite=False,
create_target=False):
r"""Read a filenode from a PyTables file and write its contents to a file.
.. versionadded:: 3.2
Parameters
----------
h5file
The PyTables file to be read from; can be either a string
giving the file's location or a :class:`File` object.
filename
Path of the file where the contents of the filenode shall be
written to. If *filename* points to a directory or ends with
``/`` (``\`` on Windows), the filename will be set to the
*_filename* (if present; otherwise the *name*) attribute of the
read filenode.
where, name
Location of the filenode where the data shall be read from. If
no node *name* can be found at *where*, the first node at
*where* whose *_filename* attribute matches *name* will be read.
overwrite
Whether or not a possibly existing file of the specified
*filename* shall be overwritten.
create_target
Whether or not the folder hierarchy needed to accomodate the
given target ``filename`` will be created.
"""
path = Path(filename).resolve()
new_h5file = not isinstance(h5file, tb.file.File)
f = tb.File(h5file, "r") if new_h5file else h5file
try:
fnode = open_node(f.get_node(where=where, name=name))
except tb.NoSuchNodeError:
fnode = None
for n in f.walk_nodes(where=where, classname="EArray"):
if n.attrs._filename == name:
fnode = open_node(n)
break
if fnode is None:
f.close()
raise tb.NoSuchNodeError("A filenode '%s' cannot be found at "
"'%s'" % (name, where))
# guess output filename if necessary
# TODO: pathlib.Path strips trailing slash automatically :-(
if path.is_dir() or filename.endswith(os.path.sep):
try:
path = path / fnode.node.attrs._filename
except Exception:
path = path / fnode.node.name
if os.access(path, os.R_OK) and not overwrite:
if new_h5file:
f.close()
raise OSError(f"The file '{path}' already exists")
# create folder hierarchy if necessary
if create_target:
path.parent.mkdir(parents=True, exist_ok=True)
if not os.access(path.parent, os.W_OK):
if new_h5file:
f.close()
raise OSError("The file '%s' cannot be written to" % filename)
# read data from filenode
data = fnode.read()
fnode.close()
# store data to file
path.write_bytes(data)
# cleanup
del data
if new_h5file:
f.close()