| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757 |
- # -*- coding: utf-8 -*-
- # imageio is distributed under the terms of the (new) BSD License.
- """
- Definition of the Request object, which acts as a kind of bridge between
- what the user wants and what the plugins can.
- """
- import os
- from io import BytesIO
- import zipfile
- import tempfile
- import shutil
- import enum
- import warnings
- from ..core import urlopen, get_remote_file
- from pathlib import Path
- from urllib.parse import urlparse
- from typing import Optional
- # URI types
- URI_BYTES = 1
- URI_FILE = 2
- URI_FILENAME = 3
- URI_ZIPPED = 4
- URI_HTTP = 5
- URI_FTP = 6
- class IOMode(str, enum.Enum):
- """Available Image modes
- This is a helper enum for ``Request.Mode`` which is a composite of a
- ``Request.ImageMode`` and ``Request.IOMode``. The IOMode that tells the
- plugin if the resource should be read from or written to. Available values are
- - read ("r"): Read from the specified resource
- - write ("w"): Write to the specified resource
- """
- read = "r"
- write = "w"
- class ImageMode(str, enum.Enum):
- """Available Image modes
- This is a helper enum for ``Request.Mode`` which is a composite of a
- ``Request.ImageMode`` and ``Request.IOMode``. The image mode that tells the
- plugin the desired (and expected) image shape. Available values are
- - single_image ("i"): Return a single image extending in two spacial
- dimensions
- - multi_image ("I"): Return a list of images extending in two spacial
- dimensions
- - single_volume ("v"): Return an image extending into multiple dimensions.
- E.g. three spacial dimensions for image stacks, or two spatial and one
- time dimension for videos
- - multi_volume ("V"): Return a list of images extending into multiple
- dimensions.
- - any_mode ("?"): Return an image in any format (the plugin decides the
- appropriate action).
- """
- single_image = "i"
- multi_image = "I"
- single_volume = "v"
- multi_volume = "V"
- any_mode = "?"
- @enum.unique
- class Mode(str, enum.Enum):
- """The mode to use when interacting with the resource
- ``Request.Mode`` is a composite of ``Request.ImageMode`` and
- ``Request.IOMode``. The image mode that tells the plugin the desired (and
- expected) image shape and the ``Request.IOMode`` tells the plugin the way
- the resource should be interacted with. For a detailed description of the
- available modes, see the documentation for ``Request.ImageMode`` and
- ``Request.IOMode`` respectively.
- Available modes are all combinations of ``Request.IOMode`` and ``Request.ImageMode``:
- - read_single_image ("ri")
- - read_multi_image ("rI")
- - read_single_volume ("rv")
- - read_multi_volume ("rV")
- - read_any ("r?")
- - write_single_image ("wi")
- - write_multi_image ("wI")
- - write_single_volume ("wv")
- - write_multi_volume ("wV")
- - write_any ("w?")
- Examples
- --------
- >>> Request.Mode("rI") # a list of simple images should be read from the resource
- >>> Request.Mode("wv") # a single volume should be written to the resource
- """
- read_single_image = "ri"
- read_multi_image = "rI"
- read_single_volume = "rv"
- read_multi_volume = "rV"
- read_any = "r?"
- write_single_image = "wi"
- write_multi_image = "wI"
- write_single_volume = "wv"
- write_multi_volume = "wV"
- write_any = "w?"
- @classmethod
- def _missing_(cls, value):
- """Enable Mode("r") and Mode("w")
- The sunder method ``_missing_`` is called whenever the constructor fails
- to directly look up the corresponding enum value from the given input.
- In our case, we use it to convert the modes "r" and "w" (from the v3
- API) into their legacy versions "r?" and "w?".
- More info on _missing_:
- https://docs.python.org/3/library/enum.html#supported-sunder-names
- """
- if value == "r":
- return cls("r?")
- elif value == "w":
- return cls("w?")
- else:
- raise ValueError(f"{value} is no valid Mode.")
- @property
- def io_mode(self) -> IOMode:
- return IOMode(self.value[0])
- @property
- def image_mode(self) -> ImageMode:
- return ImageMode(self.value[1])
- def __getitem__(self, key):
- """For backwards compatibility with the old non-enum modes"""
- if key == 0:
- return self.io_mode
- elif key == 1:
- return self.image_mode
- else:
- raise IndexError(f"Mode has no item {key}")
- SPECIAL_READ_URIS = "<video", "<screen>", "<clipboard>"
- # The user can use this string in a write call to get the data back as bytes.
- RETURN_BYTES = "<bytes>"
- # Example images that will be auto-downloaded
- EXAMPLE_IMAGES = {
- "astronaut.png": "Image of the astronaut Eileen Collins",
- "camera.png": "A grayscale image of a photographer",
- "checkerboard.png": "Black and white image of a chekerboard",
- "wood.jpg": "A (repeatable) texture of wooden planks",
- "bricks.jpg": "A (repeatable) texture of stone bricks",
- "clock.png": "Photo of a clock with motion blur (Stefan van der Walt)",
- "coffee.png": "Image of a cup of coffee (Rachel Michetti)",
- "chelsea.png": "Image of Stefan's cat",
- "wikkie.png": "Image of Almar's cat",
- "coins.png": "Image showing greek coins from Pompeii",
- "horse.png": "Image showing the silhouette of a horse (Andreas Preuss)",
- "hubble_deep_field.png": "Photograph taken by Hubble telescope (NASA)",
- "immunohistochemistry.png": "Immunohistochemical (IHC) staining",
- "moon.png": "Image showing a portion of the surface of the moon",
- "page.png": "A scanned page of text",
- "text.png": "A photograph of handdrawn text",
- "bacterial_colony.tif": "Multi-page TIFF image of a bacterial colony",
- "calcium_imaging.tif": "Neuronal calcium imaging video",
- "chelsea.zip": "The chelsea.png in a zipfile (for testing)",
- "chelsea.bsdf": "The chelsea.png in a BSDF file(for testing)",
- "newtonscradle.gif": "Animated GIF of a newton's cradle",
- "cockatoo.mp4": "Video file of a cockatoo",
- "cockatoo_yuv420.mp4": "Video file of a cockatoo with yuv420 pixel format",
- "stent.npz": "Volumetric image showing a stented abdominal aorta",
- "meadow_cube.jpg": "A cubemap image of a meadow, e.g. to render a skybox.",
- }
- class Request(object):
- """ImageResource handling utility.
- Represents a request for reading or saving an image resource. This
- object wraps information to that request and acts as an interface
- for the plugins to several resources; it allows the user to read
- from filenames, files, http, zipfiles, raw bytes, etc., but offer
- a simple interface to the plugins via ``get_file()`` and
- ``get_local_filename()``.
- For each read/write operation a single Request instance is used and passed
- to the can_read/can_write method of a format, and subsequently to
- the Reader/Writer class. This allows rudimentary passing of
- information between different formats and between a format and
- associated reader/writer.
- Parameters
- ----------
- uri : {str, bytes, file}
- The resource to load the image from.
- mode : str
- The first character is "r" or "w", indicating a read or write
- request. The second character is used to indicate the kind of data:
- "i" for an image, "I" for multiple images, "v" for a volume,
- "V" for multiple volumes, "?" for don't care.
- """
- def __init__(self, uri, mode, *, extension=None, format_hint: str = None, **kwargs):
- # General
- self.raw_uri = uri
- self._uri_type = None
- self._filename = None
- self._extension = None
- self._format_hint = None
- self._kwargs = kwargs
- self._result = None # Some write actions may have a result
- # To handle the user-side
- self._filename_zip = None # not None if a zipfile is used
- self._bytes = None # Incoming bytes
- self._zipfile = None # To store a zipfile instance (if used)
- # To handle the plugin side
- self._file = None # To store the file instance
- self._file_is_local = False # whether the data needs to be copied at end
- self._filename_local = None # not None if using tempfile on this FS
- self._firstbytes = None # For easy header parsing
- # To store formats that may be able to fulfil this request
- # self._potential_formats = []
- # Check mode
- try:
- self._mode = Mode(mode)
- except ValueError:
- raise ValueError(f"Invalid Request.Mode: {mode}")
- # Parse what was given
- self._parse_uri(uri)
- # Set extension
- if extension is not None:
- if extension[0] != ".":
- raise ValueError(
- "`extension` should be a file extension starting with a `.`,"
- f" but is `{extension}`."
- )
- self._extension = extension
- elif self._filename is not None:
- if self._uri_type in (URI_FILENAME, URI_ZIPPED):
- path = self._filename
- else:
- path = urlparse(self._filename).path
- ext = Path(path).suffix.lower()
- self._extension = ext if ext != "" else None
- if format_hint is not None:
- warnings.warn(
- "The usage of `format_hint` is deprecated and will be removed "
- "in ImageIO v3. Use `extension` instead.",
- DeprecationWarning,
- )
- if format_hint is not None and format_hint[0] != ".":
- raise ValueError(
- "`format_hint` should be a file extension starting with a `.`,"
- f" but is `{format_hint}`."
- )
- self.format_hint = format_hint
- def _parse_uri(self, uri):
- """Try to figure our what we were given"""
- is_read_request = self.mode.io_mode is IOMode.read
- is_write_request = self.mode.io_mode is IOMode.write
- if isinstance(uri, str):
- # Explicit
- if uri.startswith("imageio:"):
- if is_write_request:
- raise RuntimeError("Cannot write to the standard images.")
- fn = uri.split(":", 1)[-1].lower()
- fn, _, zip_part = fn.partition(".zip/")
- if zip_part:
- fn += ".zip"
- if fn not in EXAMPLE_IMAGES:
- raise ValueError("Unknown standard image %r." % fn)
- self._uri_type = URI_FILENAME
- self._filename = get_remote_file("images/" + fn, auto=True)
- if zip_part:
- self._filename += "/" + zip_part
- elif uri.startswith("http://") or uri.startswith("https://"):
- self._uri_type = URI_HTTP
- self._filename = uri
- elif uri.startswith("ftp://") or uri.startswith("ftps://"):
- self._uri_type = URI_FTP
- self._filename = uri
- elif uri.startswith("file://"):
- self._uri_type = URI_FILENAME
- self._filename = uri[7:]
- elif uri.startswith(SPECIAL_READ_URIS) and is_read_request:
- self._uri_type = URI_BYTES
- self._filename = uri
- elif uri.startswith(RETURN_BYTES) and is_write_request:
- self._uri_type = URI_BYTES
- self._filename = uri
- else:
- self._uri_type = URI_FILENAME
- self._filename = uri
- elif isinstance(uri, memoryview) and is_read_request:
- self._uri_type = URI_BYTES
- self._filename = "<bytes>"
- self._bytes = uri.tobytes()
- elif isinstance(uri, bytes) and is_read_request:
- self._uri_type = URI_BYTES
- self._filename = "<bytes>"
- self._bytes = uri
- elif isinstance(uri, Path):
- self._uri_type = URI_FILENAME
- self._filename = str(uri)
- # Files
- elif is_read_request:
- if hasattr(uri, "read") and hasattr(uri, "close"):
- self._uri_type = URI_FILE
- self._filename = "<file>"
- self._file = uri # Data must be read from here
- elif is_write_request:
- if hasattr(uri, "write") and hasattr(uri, "close"):
- self._uri_type = URI_FILE
- self._filename = "<file>"
- self._file = uri # Data must be written here
- # Expand user dir
- if self._uri_type == URI_FILENAME and self._filename.startswith("~"):
- self._filename = os.path.expanduser(self._filename)
- # Check if a zipfile
- if self._uri_type == URI_FILENAME:
- # Search for zip extension followed by a path separator
- for needle in [".zip/", ".zip\\"]:
- zip_i = self._filename.lower().find(needle)
- if zip_i > 0:
- zip_i += 4
- zip_path = self._filename[:zip_i]
- if os.path.isdir(zip_path):
- pass # is an existing dir (see #548)
- elif is_write_request or os.path.isfile(zip_path):
- self._uri_type = URI_ZIPPED
- self._filename_zip = (
- zip_path,
- self._filename[zip_i:].lstrip("/\\"),
- )
- break
- # Check if we could read it
- if self._uri_type is None:
- uri_r = repr(uri)
- if len(uri_r) > 60:
- uri_r = uri_r[:57] + "..."
- raise IOError("Cannot understand given URI: %s." % uri_r)
- # Check if this is supported
- noWriting = [URI_HTTP, URI_FTP]
- if is_write_request and self._uri_type in noWriting:
- raise IOError("imageio does not support writing to http/ftp.")
- # Deprecated way to load standard images, give a sensible error message
- if is_read_request and self._uri_type in [URI_FILENAME, URI_ZIPPED]:
- fn = self._filename
- if self._filename_zip:
- fn = self._filename_zip[0]
- if (not os.path.exists(fn)) and (fn in EXAMPLE_IMAGES):
- raise IOError(
- "No such file: %r. This file looks like one of "
- "the standard images, but from imageio 2.1, "
- "standard images have to be specified using "
- '"imageio:%s".' % (fn, fn)
- )
- # Make filename absolute
- if self._uri_type in [URI_FILENAME, URI_ZIPPED]:
- if self._filename_zip:
- self._filename_zip = (
- os.path.abspath(self._filename_zip[0]),
- self._filename_zip[1],
- )
- else:
- self._filename = os.path.abspath(self._filename)
- # Check whether file name is valid
- if self._uri_type in [URI_FILENAME, URI_ZIPPED]:
- fn = self._filename
- if self._filename_zip:
- fn = self._filename_zip[0]
- if is_read_request:
- # Reading: check that the file exists (but is allowed a dir)
- if not os.path.exists(fn):
- raise FileNotFoundError("No such file: '%s'" % fn)
- else:
- # Writing: check that the directory to write to does exist
- dn = os.path.dirname(fn)
- if not os.path.exists(dn):
- raise FileNotFoundError("The directory %r does not exist" % dn)
- @property
- def filename(self):
- """Name of the ImageResource.
- The uri for which reading/saving was requested. This
- can be a filename, an http address, or other resource
- identifier. Do not rely on the filename to obtain the data,
- but use ``get_file()`` or ``get_local_filename()`` instead.
- """
- return self._filename
- @property
- def extension(self) -> str:
- """The (lowercase) extension of the requested filename.
- Suffixes in url's are stripped. Can be None if the request is
- not based on a filename.
- """
- return self._extension
- @property
- def format_hint(self) -> Optional[str]:
- return self._format_hint
- @format_hint.setter
- def format_hint(self, format: str) -> None:
- self._format_hint = format
- if self._extension is None:
- self._extension = format
- @property
- def mode(self):
- """The mode of the request. The first character is "r" or "w",
- indicating a read or write request. The second character is
- used to indicate the kind of data:
- "i" for an image, "I" for multiple images, "v" for a volume,
- "V" for multiple volumes, "?" for don't care.
- """
- return self._mode
- @property
- def kwargs(self):
- """The dict of keyword arguments supplied by the user."""
- return self._kwargs
- # For obtaining data
- def get_file(self):
- """get_file()
- Get a file object for the resource associated with this request.
- If this is a reading request, the file is in read mode,
- otherwise in write mode. This method is not thread safe. Plugins
- should not close the file when done.
- This is the preferred way to read/write the data. But if a
- format cannot handle file-like objects, they should use
- ``get_local_filename()``.
- """
- want_to_write = self.mode.io_mode is IOMode.write
- # Is there already a file?
- # Either _uri_type == URI_FILE, or we already opened the file,
- # e.g. by using firstbytes
- if self._file is not None:
- return self._file
- if self._uri_type == URI_BYTES:
- if want_to_write:
- # Create new file object, we catch the bytes in finish()
- self._file = BytesIO()
- self._file_is_local = True
- else:
- self._file = BytesIO(self._bytes)
- elif self._uri_type == URI_FILENAME:
- if want_to_write:
- self._file = open(self.filename, "wb")
- else:
- self._file = open(self.filename, "rb")
- elif self._uri_type == URI_ZIPPED:
- # Get the correct filename
- filename, name = self._filename_zip
- if want_to_write:
- # Create new file object, we catch the bytes in finish()
- self._file = BytesIO()
- self._file_is_local = True
- else:
- # Open zipfile and open new file object for specific file
- self._zipfile = zipfile.ZipFile(filename, "r")
- self._file = self._zipfile.open(name, "r")
- self._file = SeekableFileObject(self._file)
- elif self._uri_type in [URI_HTTP or URI_FTP]:
- assert not want_to_write # This should have been tested in init
- timeout = os.getenv("IMAGEIO_REQUEST_TIMEOUT")
- if timeout is None or not timeout.isdigit():
- timeout = 5
- self._file = urlopen(self.filename, timeout=float(timeout))
- self._file = SeekableFileObject(self._file)
- return self._file
- def get_local_filename(self):
- """get_local_filename()
- If the filename is an existing file on this filesystem, return
- that. Otherwise a temporary file is created on the local file
- system which can be used by the format to read from or write to.
- """
- if self._uri_type == URI_FILENAME:
- return self._filename
- else:
- # Get filename
- if self.extension is not None:
- ext = self.extension
- else:
- ext = os.path.splitext(self._filename)[1]
- fd, self._filename_local = tempfile.mkstemp(ext, "imageio_")
- os.close(fd)
- # Write stuff to it?
- if self.mode.io_mode == IOMode.read:
- with open(self._filename_local, "wb") as file:
- shutil.copyfileobj(self.get_file(), file)
- return self._filename_local
- def finish(self) -> None:
- """Wrap up this request.
- Finishes any pending reads or writes, closes any open files and frees
- any resources allocated by this request.
- """
- if self.mode.io_mode == IOMode.write:
- # See if we "own" the data and must put it somewhere
- bytes = None
- if self._filename_local:
- bytes = Path(self._filename_local).read_bytes()
- elif self._file_is_local:
- self._file_is_local = False
- bytes = self._file.getvalue()
- # Put the data in the right place
- if bytes is not None:
- if self._uri_type == URI_BYTES:
- self._result = bytes # Picked up by imread function
- elif self._uri_type == URI_FILE:
- self._file.write(bytes)
- elif self._uri_type == URI_ZIPPED:
- zf = zipfile.ZipFile(self._filename_zip[0], "a")
- zf.writestr(self._filename_zip[1], bytes)
- zf.close()
- # elif self._uri_type == URI_FILENAME: -> is always direct
- # elif self._uri_type == URI_FTP/HTTP: -> write not supported
- # Close open files that we know of (and are responsible for)
- if self._file and self._uri_type != URI_FILE:
- self._file.close()
- self._file = None
- if self._zipfile:
- self._zipfile.close()
- self._zipfile = None
- # Remove temp file
- if self._filename_local:
- try:
- os.remove(self._filename_local)
- except Exception: # pragma: no cover
- warnings.warn(
- "Failed to delete the temporary file at "
- f"`{self._filename_local}`. Please report this issue."
- )
- self._filename_local = None
- # Detach so gc can clean even if a reference of self lingers
- self._bytes = None
- def get_result(self):
- """For internal use. In some situations a write action can have
- a result (bytes data). That is obtained with this function.
- """
- # Is there a reason to disallow reading multiple times?
- self._result, res = None, self._result
- return res
- @property
- def firstbytes(self):
- """The first 256 bytes of the file. These can be used to
- parse the header to determine the file-format.
- """
- if self._firstbytes is None:
- self._read_first_bytes()
- return self._firstbytes
- def _read_first_bytes(self, N=256):
- if self._bytes is not None:
- self._firstbytes = self._bytes[:N]
- else:
- # Prepare
- try:
- f = self.get_file()
- except IOError:
- if os.path.isdir(self.filename): # A directory, e.g. for DICOM
- self._firstbytes = bytes()
- return
- raise
- try:
- i = f.tell()
- except Exception:
- i = None
- # Read
- self._firstbytes = read_n_bytes(f, N)
- # Set back
- try:
- if i is None:
- raise Exception("cannot seek with None")
- f.seek(i)
- except Exception:
- # Prevent get_file() from reusing the file
- self._file = None
- # If the given URI was a file object, we have a problem,
- if self._uri_type == URI_FILE:
- raise IOError("Cannot seek back after getting firstbytes!")
- def read_n_bytes(f, N):
- """read_n_bytes(file, n)
- Read n bytes from the given file, or less if the file has less
- bytes. Returns zero bytes if the file is closed.
- """
- bb = bytes()
- while len(bb) < N:
- extra_bytes = f.read(N - len(bb))
- if not extra_bytes:
- break
- bb += extra_bytes
- return bb
- class SeekableFileObject:
- """A readonly wrapper file object that add support for seeking, even if
- the wrapped file object does not. The allows us to stream from http and
- still use Pillow.
- """
- def __init__(self, f):
- self.f = f
- self._i = 0 # >=0 but can exceed buffer
- self._buffer = b""
- self._have_all = False
- self.closed = False
- def read(self, n=None):
- # Fix up n
- if n is None:
- pass
- else:
- n = int(n)
- if n < 0:
- n = None
- # Can and must we read more?
- if not self._have_all:
- more = b""
- if n is None:
- more = self.f.read()
- self._have_all = True
- else:
- want_i = self._i + n
- want_more = want_i - len(self._buffer)
- if want_more > 0:
- more = self.f.read(want_more)
- if len(more) < want_more:
- self._have_all = True
- self._buffer += more
- # Read data from buffer and update pointer
- if n is None:
- res = self._buffer[self._i :]
- else:
- res = self._buffer[self._i : self._i + n]
- self._i += len(res)
- return res
- def readline(self):
- yield from self._file.readline()
- def tell(self):
- return self._i
- def seek(self, i, mode=0):
- # Mimic BytesIO behavior
- # Get the absolute new position
- i = int(i)
- if mode == 0:
- if i < 0:
- raise ValueError("negative seek value " + str(i))
- real_i = i
- elif mode == 1:
- real_i = max(0, self._i + i) # negative ok here
- elif mode == 2:
- if not self._have_all:
- self.read()
- real_i = max(0, len(self._buffer) + i)
- else:
- raise ValueError("invalid whence (%s, should be 0, 1 or 2)" % i)
- # Read some?
- if real_i <= len(self._buffer):
- pass # no need to read
- elif not self._have_all:
- assert real_i > self._i # if we don't have all, _i cannot be > _buffer
- self.read(real_i - self._i) # sets self._i
- self._i = real_i
- return self._i
- def close(self):
- self.closed = True
- self.f.close()
- def isatty(self):
- return False
- def seekable(self):
- return True
- class InitializationError(Exception):
- """The plugin could not initialize from the given request.
- This is a _internal_ error that is raised by plugins that fail to handle
- a given request. We use this to differentiate incompatibility between
- a plugin and a request from an actual error/bug inside a plugin.
- """
- pass
|