677 lines
21 KiB
Python
677 lines
21 KiB
Python
# -*- coding: utf-8 -*-
|
|
# imageio is distributed under the terms of the (new) BSD License.
|
|
|
|
import re
|
|
import warnings
|
|
from numbers import Number
|
|
from pathlib import Path
|
|
from typing import Dict
|
|
|
|
import numpy as np
|
|
|
|
from imageio.core.legacy_plugin_wrapper import LegacyPlugin
|
|
from imageio.core.util import Array
|
|
from imageio.core.v3_plugin_api import PluginV3
|
|
|
|
from . import formats
|
|
from .config import known_extensions, known_plugins
|
|
from .core import RETURN_BYTES
|
|
from .core.imopen import imopen
|
|
|
|
MEMTEST_DEFAULT_MIM = "256MB"
|
|
MEMTEST_DEFAULT_MVOL = "1GB"
|
|
|
|
|
|
mem_re = re.compile(r"^(\d+\.?\d*)\s*([kKMGTPEZY]?i?)B?$")
|
|
sizes = {"": 1, None: 1}
|
|
for i, si in enumerate([""] + list("kMGTPEZY")):
|
|
sizes[si] = 1000**i
|
|
if si:
|
|
sizes[si.upper() + "i"] = 1024**i
|
|
|
|
|
|
def to_nbytes(arg, default=None):
|
|
if not arg:
|
|
arg = float("inf")
|
|
|
|
if arg is True:
|
|
arg = default
|
|
|
|
if isinstance(arg, Number):
|
|
return arg
|
|
|
|
match = mem_re.match(arg)
|
|
if match is None:
|
|
raise ValueError(
|
|
"Memory size could not be parsed "
|
|
"(is your capitalisation correct?): {}".format(arg)
|
|
)
|
|
|
|
num, unit = match.groups()
|
|
|
|
try:
|
|
return float(num) * sizes[unit]
|
|
except KeyError: # pragma: no cover
|
|
# Note: I don't think we can reach this
|
|
raise ValueError(
|
|
"Memory size unit not recognised "
|
|
"(is your capitalisation correct?): {}".format(unit)
|
|
)
|
|
|
|
|
|
def help(name=None):
|
|
"""help(name=None)
|
|
|
|
Print the documentation of the format specified by name, or a list
|
|
of supported formats if name is omitted.
|
|
|
|
Parameters
|
|
----------
|
|
name : str
|
|
Can be the name of a format, a filename extension, or a full
|
|
filename. See also the :doc:`formats page <../formats/index>`.
|
|
"""
|
|
if not name:
|
|
print(formats)
|
|
else:
|
|
print(formats[name])
|
|
|
|
|
|
def decypher_format_arg(format_name: str) -> Dict[str, str]:
|
|
"""Split format into plugin and format
|
|
|
|
The V2 API aliases plugins and supported formats. This function
|
|
splits these so that they can be fed separately to `iio.imopen`.
|
|
|
|
"""
|
|
|
|
plugin = None
|
|
extension = None
|
|
|
|
if format_name is None:
|
|
pass # nothing to do
|
|
elif Path(format_name).suffix.lower() in known_extensions:
|
|
extension = Path(format_name).suffix.lower()
|
|
elif format_name in known_plugins:
|
|
plugin = format_name
|
|
elif format_name.upper() in known_plugins:
|
|
plugin = format_name.upper()
|
|
elif format_name.lower() in known_extensions:
|
|
extension = format_name.lower()
|
|
elif "." + format_name.lower() in known_extensions:
|
|
extension = "." + format_name.lower()
|
|
else:
|
|
raise IndexError(f"No format known by name `{plugin}`.")
|
|
|
|
return {"plugin": plugin, "extension": extension}
|
|
|
|
|
|
class LegacyReader:
|
|
def __init__(self, plugin_instance: PluginV3, **kwargs):
|
|
self.instance = plugin_instance
|
|
self.last_index = 0
|
|
self.closed = False
|
|
|
|
if (
|
|
type(self.instance).__name__ == "PillowPlugin"
|
|
and kwargs.get("pilmode") is not None
|
|
):
|
|
kwargs["mode"] = kwargs["pilmode"]
|
|
del kwargs["pilmode"]
|
|
|
|
self.read_args = kwargs
|
|
|
|
def close(self):
|
|
if not self.closed:
|
|
self.instance.close()
|
|
self.closed = True
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
self.close()
|
|
|
|
def __del__(self):
|
|
self.close()
|
|
|
|
@property
|
|
def request(self):
|
|
return self.instance.request
|
|
|
|
@property
|
|
def format(self):
|
|
raise TypeError("V3 Plugins don't have a format.")
|
|
|
|
def get_length(self):
|
|
return self.instance.properties(index=...).n_images
|
|
|
|
def get_data(self, index):
|
|
self.last_index = index
|
|
img = self.instance.read(index=index, **self.read_args)
|
|
metadata = self.instance.metadata(index=index, exclude_applied=False)
|
|
return Array(img, metadata)
|
|
|
|
def get_next_data(self):
|
|
return self.get_data(self.last_index + 1)
|
|
|
|
def set_image_index(self, index):
|
|
self.last_index = index - 1
|
|
|
|
def get_meta_data(self, index=None):
|
|
return self.instance.metadata(index=index, exclude_applied=False)
|
|
|
|
def iter_data(self):
|
|
for idx, img in enumerate(self.instance.iter()):
|
|
metadata = self.instance.metadata(index=idx, exclude_applied=False)
|
|
yield Array(img, metadata)
|
|
|
|
def __iter__(self):
|
|
return self.iter_data()
|
|
|
|
def __len__(self):
|
|
return self.get_length()
|
|
|
|
|
|
class LegacyWriter:
|
|
def __init__(self, plugin_instance: PluginV3, **kwargs):
|
|
self.instance = plugin_instance
|
|
self.last_index = 0
|
|
self.closed = False
|
|
|
|
if type(self.instance).__name__ == "PillowPlugin" and "pilmode" in kwargs:
|
|
kwargs["mode"] = kwargs["pilmode"]
|
|
del kwargs["pilmode"]
|
|
|
|
self.write_args = kwargs
|
|
|
|
def close(self):
|
|
if not self.closed:
|
|
self.instance.close()
|
|
self.closed = True
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
self.close()
|
|
|
|
def __del__(self):
|
|
self.close()
|
|
|
|
@property
|
|
def request(self):
|
|
return self.instance.request
|
|
|
|
@property
|
|
def format(self):
|
|
raise TypeError("V3 Plugins don't have a format.")
|
|
|
|
def append_data(self, im, meta=None):
|
|
# TODO: write metadata in the future; there is currently no
|
|
# generic way to do this with v3 plugins :(
|
|
if meta is not None:
|
|
warnings.warn(
|
|
"V3 Plugins currently don't have a uniform way to"
|
|
" write metadata, so any metadata is ignored."
|
|
)
|
|
|
|
# total_meta = dict()
|
|
# if meta is None:
|
|
# meta = {}
|
|
# if hasattr(im, "meta") and isinstance(im.meta, dict):
|
|
# total_meta.update(im.meta)
|
|
# total_meta.update(meta)
|
|
|
|
return self.instance.write(im, **self.write_args)
|
|
|
|
def set_meta_data(self, meta):
|
|
# TODO: write metadata
|
|
raise NotImplementedError(
|
|
"V3 Plugins don't have a uniform way to write metadata (yet)."
|
|
)
|
|
|
|
|
|
def is_batch(ndimage):
|
|
if isinstance(ndimage, (list, tuple)):
|
|
return True
|
|
|
|
ndimage = np.asarray(ndimage)
|
|
if ndimage.ndim <= 2:
|
|
return False
|
|
elif ndimage.ndim == 3 and ndimage.shape[2] < 5:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def is_volume(ndimage):
|
|
ndimage = np.asarray(ndimage)
|
|
if not is_batch(ndimage):
|
|
return False
|
|
|
|
if ndimage.ndim == 3 and ndimage.shape[2] >= 5:
|
|
return True
|
|
elif ndimage.ndim == 4 and ndimage.shape[3] < 5:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
# Base functions that return a reader/writer
|
|
|
|
|
|
def get_reader(uri, format=None, mode="?", **kwargs):
|
|
"""get_reader(uri, format=None, mode='?', **kwargs)
|
|
|
|
Returns a :class:`.Reader` object which can be used to read data
|
|
and meta data from the specified file.
|
|
|
|
Parameters
|
|
----------
|
|
uri : {str, pathlib.Path, bytes, file}
|
|
The resource to load the image from, e.g. a filename, pathlib.Path,
|
|
http address or file object, see the docs for more info.
|
|
format : str
|
|
The format to use to read the file. By default imageio selects
|
|
the appropriate for you based on the filename and its contents.
|
|
mode : {'i', 'I', 'v', 'V', '?'}
|
|
Used to give the reader a hint on what the user expects (default "?"):
|
|
"i" for an image, "I" for multiple images, "v" for a volume,
|
|
"V" for multiple volumes, "?" for don't care.
|
|
kwargs : ...
|
|
Further keyword arguments are passed to the reader. See :func:`.help`
|
|
to see what arguments are available for a particular format.
|
|
"""
|
|
|
|
imopen_args = decypher_format_arg(format)
|
|
imopen_args["legacy_mode"] = True
|
|
|
|
image_file = imopen(uri, "r" + mode, **imopen_args)
|
|
|
|
if isinstance(image_file, LegacyPlugin):
|
|
return image_file.legacy_get_reader(**kwargs)
|
|
else:
|
|
return LegacyReader(image_file, **kwargs)
|
|
|
|
|
|
def get_writer(uri, format=None, mode="?", **kwargs):
|
|
"""get_writer(uri, format=None, mode='?', **kwargs)
|
|
|
|
Returns a :class:`.Writer` object which can be used to write data
|
|
and meta data to the specified file.
|
|
|
|
Parameters
|
|
----------
|
|
uri : {str, pathlib.Path, file}
|
|
The resource to write the image to, e.g. a filename, pathlib.Path
|
|
or file object, see the docs for more info.
|
|
format : str
|
|
The format to use to write the file. By default imageio selects
|
|
the appropriate for you based on the filename.
|
|
mode : {'i', 'I', 'v', 'V', '?'}
|
|
Used to give the writer a hint on what the user expects (default '?'):
|
|
"i" for an image, "I" for multiple images, "v" for a volume,
|
|
"V" for multiple volumes, "?" for don't care.
|
|
kwargs : ...
|
|
Further keyword arguments are passed to the writer. See :func:`.help`
|
|
to see what arguments are available for a particular format.
|
|
"""
|
|
|
|
imopen_args = decypher_format_arg(format)
|
|
imopen_args["legacy_mode"] = True
|
|
|
|
image_file = imopen(uri, "w" + mode, **imopen_args)
|
|
if isinstance(image_file, LegacyPlugin):
|
|
return image_file.legacy_get_writer(**kwargs)
|
|
else:
|
|
return LegacyWriter(image_file, **kwargs)
|
|
|
|
|
|
# Images
|
|
|
|
|
|
def imread(uri, format=None, **kwargs):
|
|
"""imread(uri, format=None, **kwargs)
|
|
|
|
Reads an image from the specified file. Returns a numpy array, which
|
|
comes with a dict of meta data at its 'meta' attribute.
|
|
|
|
Note that the image data is returned as-is, and may not always have
|
|
a dtype of uint8 (and thus may differ from what e.g. PIL returns).
|
|
|
|
Parameters
|
|
----------
|
|
uri : {str, pathlib.Path, bytes, file}
|
|
The resource to load the image from, e.g. a filename, pathlib.Path,
|
|
http address or file object, see the docs for more info.
|
|
format : str
|
|
The format to use to read the file. By default imageio selects
|
|
the appropriate for you based on the filename and its contents.
|
|
kwargs : ...
|
|
Further keyword arguments are passed to the reader. See :func:`.help`
|
|
to see what arguments are available for a particular format.
|
|
"""
|
|
|
|
imopen_args = decypher_format_arg(format)
|
|
imopen_args["legacy_mode"] = True
|
|
|
|
with imopen(uri, "ri", **imopen_args) as file:
|
|
result = file.read(index=0, **kwargs)
|
|
|
|
return result
|
|
|
|
|
|
def imwrite(uri, im, format=None, **kwargs):
|
|
"""imwrite(uri, im, format=None, **kwargs)
|
|
|
|
Write an image to the specified file.
|
|
|
|
Parameters
|
|
----------
|
|
uri : {str, pathlib.Path, file}
|
|
The resource to write the image to, e.g. a filename, pathlib.Path
|
|
or file object, see the docs for more info.
|
|
im : numpy.ndarray
|
|
The image data. Must be NxM, NxMx3 or NxMx4.
|
|
format : str
|
|
The format to use to write the file. By default imageio selects
|
|
the appropriate for you based on the filename and its contents.
|
|
kwargs : ...
|
|
Further keyword arguments are passed to the writer. See :func:`.help`
|
|
to see what arguments are available for a particular format.
|
|
"""
|
|
|
|
# Test image
|
|
imt = type(im)
|
|
im = np.asarray(im)
|
|
if not np.issubdtype(im.dtype, np.number):
|
|
raise ValueError("Image is not numeric, but {}.".format(imt.__name__))
|
|
|
|
if is_batch(im) or im.ndim < 2:
|
|
raise ValueError("Image must be 2D (grayscale, RGB, or RGBA).")
|
|
|
|
imopen_args = decypher_format_arg(format)
|
|
imopen_args["legacy_mode"] = True
|
|
with imopen(uri, "wi", **imopen_args) as file:
|
|
return file.write(im, **kwargs)
|
|
|
|
|
|
# Multiple images
|
|
|
|
|
|
def mimread(uri, format=None, memtest=MEMTEST_DEFAULT_MIM, **kwargs):
|
|
"""mimread(uri, format=None, memtest="256MB", **kwargs)
|
|
|
|
Reads multiple images from the specified file. Returns a list of
|
|
numpy arrays, each with a dict of meta data at its 'meta' attribute.
|
|
|
|
Parameters
|
|
----------
|
|
uri : {str, pathlib.Path, bytes, file}
|
|
The resource to load the images from, e.g. a filename,pathlib.Path,
|
|
http address or file object, see the docs for more info.
|
|
format : str
|
|
The format to use to read the file. By default imageio selects
|
|
the appropriate for you based on the filename and its contents.
|
|
memtest : {bool, int, float, str}
|
|
If truthy, this function will raise an error if the resulting
|
|
list of images consumes greater than the amount of memory specified.
|
|
This is to protect the system from using so much memory that it needs
|
|
to resort to swapping, and thereby stall the computer. E.g.
|
|
``mimread('hunger_games.avi')``.
|
|
|
|
If the argument is a number, that will be used as the threshold number
|
|
of bytes.
|
|
|
|
If the argument is a string, it will be interpreted as a number of bytes with
|
|
SI/IEC prefixed units (e.g. '1kB', '250MiB', '80.3YB').
|
|
|
|
- Units are case sensitive
|
|
- k, M etc. represent a 1000-fold change, where Ki, Mi etc. represent 1024-fold
|
|
- The "B" is optional, but if present, must be capitalised
|
|
|
|
If the argument is True, the default will be used, for compatibility reasons.
|
|
|
|
Default: '256MB'
|
|
kwargs : ...
|
|
Further keyword arguments are passed to the reader. See :func:`.help`
|
|
to see what arguments are available for a particular format.
|
|
"""
|
|
|
|
# used for mimread and mvolread
|
|
nbyte_limit = to_nbytes(memtest, MEMTEST_DEFAULT_MIM)
|
|
|
|
images = list()
|
|
nbytes = 0
|
|
|
|
imopen_args = decypher_format_arg(format)
|
|
imopen_args["legacy_mode"] = True
|
|
with imopen(uri, "rI", **imopen_args) as file:
|
|
for image in file.iter(**kwargs):
|
|
images.append(image)
|
|
nbytes += image.nbytes
|
|
if nbytes > nbyte_limit:
|
|
raise RuntimeError(
|
|
"imageio.mimread() has read over {}B of "
|
|
"image data.\nStopped to avoid memory problems."
|
|
" Use imageio.get_reader(), increase threshold, or memtest=False".format(
|
|
int(nbyte_limit)
|
|
)
|
|
)
|
|
|
|
if len(images) == 1 and is_batch(images[0]):
|
|
images = [*images[0]]
|
|
|
|
return images
|
|
|
|
|
|
def mimwrite(uri, ims, format=None, **kwargs):
|
|
"""mimwrite(uri, ims, format=None, **kwargs)
|
|
|
|
Write multiple images to the specified file.
|
|
|
|
Parameters
|
|
----------
|
|
uri : {str, pathlib.Path, file}
|
|
The resource to write the images to, e.g. a filename, pathlib.Path
|
|
or file object, see the docs for more info.
|
|
ims : sequence of numpy arrays
|
|
The image data. Each array must be NxM, NxMx3 or NxMx4.
|
|
format : str
|
|
The format to use to read the file. By default imageio selects
|
|
the appropriate for you based on the filename and its contents.
|
|
kwargs : ...
|
|
Further keyword arguments are passed to the writer. See :func:`.help`
|
|
to see what arguments are available for a particular format.
|
|
"""
|
|
|
|
if not is_batch(ims):
|
|
raise ValueError("Image data must be a sequence of ndimages.")
|
|
|
|
imopen_args = decypher_format_arg(format)
|
|
imopen_args["legacy_mode"] = True
|
|
with imopen(uri, "wI", **imopen_args) as file:
|
|
return file.write(ims, is_batch=True, **kwargs)
|
|
|
|
|
|
# Volumes
|
|
|
|
|
|
def volread(uri, format=None, **kwargs):
|
|
"""volread(uri, format=None, **kwargs)
|
|
|
|
Reads a volume from the specified file. Returns a numpy array, which
|
|
comes with a dict of meta data at its 'meta' attribute.
|
|
|
|
Parameters
|
|
----------
|
|
uri : {str, pathlib.Path, bytes, file}
|
|
The resource to load the volume from, e.g. a filename, pathlib.Path,
|
|
http address or file object, see the docs for more info.
|
|
format : str
|
|
The format to use to read the file. By default imageio selects
|
|
the appropriate for you based on the filename and its contents.
|
|
kwargs : ...
|
|
Further keyword arguments are passed to the reader. See :func:`.help`
|
|
to see what arguments are available for a particular format.
|
|
"""
|
|
|
|
imopen_args = decypher_format_arg(format)
|
|
imopen_args["legacy_mode"] = True
|
|
with imopen(uri, "rv", **imopen_args) as file:
|
|
return file.read(index=0, **kwargs)
|
|
|
|
|
|
def volwrite(uri, im, format=None, **kwargs):
|
|
"""volwrite(uri, vol, format=None, **kwargs)
|
|
|
|
Write a volume to the specified file.
|
|
|
|
Parameters
|
|
----------
|
|
uri : {str, pathlib.Path, file}
|
|
The resource to write the image to, e.g. a filename, pathlib.Path
|
|
or file object, see the docs for more info.
|
|
vol : numpy.ndarray
|
|
The image data. Must be NxMxL (or NxMxLxK if each voxel is a tuple).
|
|
format : str
|
|
The format to use to read the file. By default imageio selects
|
|
the appropriate for you based on the filename and its contents.
|
|
kwargs : ...
|
|
Further keyword arguments are passed to the writer. See :func:`.help`
|
|
to see what arguments are available for a particular format.
|
|
"""
|
|
|
|
# Test image
|
|
im = np.asarray(im)
|
|
if not is_volume(im):
|
|
raise ValueError("Image must be 3D, or 4D if each voxel is a tuple.")
|
|
|
|
imopen_args = decypher_format_arg(format)
|
|
imopen_args["legacy_mode"] = True
|
|
|
|
with imopen(uri, "wv", **imopen_args) as file:
|
|
return file.write(im, is_batch=False, **kwargs)
|
|
|
|
|
|
# Multiple volumes
|
|
|
|
|
|
def mvolread(uri, format=None, memtest=MEMTEST_DEFAULT_MVOL, **kwargs):
|
|
"""mvolread(uri, format=None, memtest='1GB', **kwargs)
|
|
|
|
Reads multiple volumes from the specified file. Returns a list of
|
|
numpy arrays, each with a dict of meta data at its 'meta' attribute.
|
|
|
|
Parameters
|
|
----------
|
|
uri : {str, pathlib.Path, bytes, file}
|
|
The resource to load the volumes from, e.g. a filename, pathlib.Path,
|
|
http address or file object, see the docs for more info.
|
|
format : str
|
|
The format to use to read the file. By default imageio selects
|
|
the appropriate for you based on the filename and its contents.
|
|
memtest : {bool, int, float, str}
|
|
If truthy, this function will raise an error if the resulting
|
|
list of images consumes greater than the amount of memory specified.
|
|
This is to protect the system from using so much memory that it needs
|
|
to resort to swapping, and thereby stall the computer. E.g.
|
|
``mimread('hunger_games.avi')``.
|
|
|
|
If the argument is a number, that will be used as the threshold number
|
|
of bytes.
|
|
|
|
If the argument is a string, it will be interpreted as a number of bytes with
|
|
SI/IEC prefixed units (e.g. '1kB', '250MiB', '80.3YB').
|
|
|
|
- Units are case sensitive
|
|
- k, M etc. represent a 1000-fold change, where Ki, Mi etc. represent 1024-fold
|
|
- The "B" is optional, but if present, must be capitalised
|
|
|
|
If the argument is True, the default will be used, for compatibility reasons.
|
|
|
|
Default: '1GB'
|
|
kwargs : ...
|
|
Further keyword arguments are passed to the reader. See :func:`.help`
|
|
to see what arguments are available for a particular format.
|
|
"""
|
|
|
|
# used for mimread and mvolread
|
|
nbyte_limit = to_nbytes(memtest, MEMTEST_DEFAULT_MVOL)
|
|
|
|
images = list()
|
|
nbytes = 0
|
|
imopen_args = decypher_format_arg(format)
|
|
imopen_args["legacy_mode"] = True
|
|
with imopen(uri, "rV", **imopen_args) as file:
|
|
for image in file.iter(**kwargs):
|
|
images.append(image)
|
|
nbytes += image.nbytes
|
|
if nbytes > nbyte_limit:
|
|
raise RuntimeError(
|
|
"imageio.mimread() has read over {}B of "
|
|
"image data.\nStopped to avoid memory problems."
|
|
" Use imageio.get_reader(), increase threshold, or memtest=False".format(
|
|
int(nbyte_limit)
|
|
)
|
|
)
|
|
|
|
return images
|
|
|
|
|
|
def mvolwrite(uri, ims, format=None, **kwargs):
|
|
"""mvolwrite(uri, vols, format=None, **kwargs)
|
|
|
|
Write multiple volumes to the specified file.
|
|
|
|
Parameters
|
|
----------
|
|
uri : {str, pathlib.Path, file}
|
|
The resource to write the volumes to, e.g. a filename, pathlib.Path
|
|
or file object, see the docs for more info.
|
|
ims : sequence of numpy arrays
|
|
The image data. Each array must be NxMxL (or NxMxLxK if each
|
|
voxel is a tuple).
|
|
format : str
|
|
The format to use to read the file. By default imageio selects
|
|
the appropriate for you based on the filename and its contents.
|
|
kwargs : ...
|
|
Further keyword arguments are passed to the writer. See :func:`.help`
|
|
to see what arguments are available for a particular format.
|
|
"""
|
|
|
|
for im in ims:
|
|
if not is_volume(im):
|
|
raise ValueError("Image must be 3D, or 4D if each voxel is a tuple.")
|
|
|
|
imopen_args = decypher_format_arg(format)
|
|
imopen_args["legacy_mode"] = True
|
|
with imopen(uri, "wV", **imopen_args) as file:
|
|
return file.write(ims, is_batch=True, **kwargs)
|
|
|
|
|
|
# aliases
|
|
read = get_reader
|
|
save = get_writer
|
|
imsave = imwrite
|
|
mimsave = mimwrite
|
|
volsave = volwrite
|
|
mvolsave = mvolwrite
|
|
|
|
__all__ = [
|
|
"imread",
|
|
"mimread",
|
|
"volread",
|
|
"mvolread",
|
|
"imwrite",
|
|
"mimwrite",
|
|
"volwrite",
|
|
"mvolwrite",
|
|
# misc
|
|
"help",
|
|
"get_reader",
|
|
"get_writer",
|
|
"RETURN_BYTES",
|
|
]
|