Source code for napari.utils._dask_utils
"""Dask cache utilities."""
import collections.abc
import contextlib
from collections.abc import Iterator
from typing import Any, Callable, Optional
import dask
import dask.array as da
from dask.cache import Cache
#: dask.cache.Cache, optional : A dask cache for opportunistic caching
#: use :func:`~.resize_dask_cache` to actually register and resize.
#: this is a global cache (all layers will use it), but individual layers
#: can opt out using Layer(..., cache=False)
_DASK_CACHE = Cache(1)
_DEFAULT_MEM_FRACTION = 0.25
DaskIndexer = Callable[
[], contextlib.AbstractContextManager[Optional[tuple[dict, Cache]]]
]
[docs]
def resize_dask_cache(
nbytes: Optional[int] = None, mem_fraction: Optional[float] = None
) -> Cache:
"""Create or resize the dask cache used for opportunistic caching.
The cache object is an instance of a :class:`Cache`, (which
wraps a :class:`cachey.Cache`).
See `Dask opportunistic caching
<https://docs.dask.org/en/latest/caching.html>`_
Parameters
----------
nbytes : int, optional
The desired size of the cache, in bytes. If ``None``, the cache size
will autodetermined as fraction of the total memory in the system,
using ``mem_fraction``. If ``nbytes`` is 0. The cache is turned off.
by default, cache size is autodetermined using ``mem_fraction``.
mem_fraction : float, optional
The fraction (from 0 to 1) of total memory to use for the dask cache.
Returns
-------
dask_cache : dask.cache.Cache
An instance of a Dask Cache
Examples
--------
>>> from napari.utils import resize_dask_cache
>>> cache = resize_dask_cache() # use 25% of total memory by default
>>> # dask.Cache wraps cachey.Cache
>>> assert isinstance(cache.cache, cachey.Cache)
>>> # useful attributes
>>> cache.cache.available_bytes # full size of cache
>>> cache.cache.total_bytes # currently used bytes
"""
from psutil import virtual_memory
if nbytes is None and mem_fraction is not None:
nbytes = int(virtual_memory().total * mem_fraction)
avail = _DASK_CACHE.cache.available_bytes
# if we don't have a cache already, create one.
if avail == 1:
# If neither nbytes nor mem_fraction was provided, use default
if nbytes is None:
nbytes = int(virtual_memory().total * _DEFAULT_MEM_FRACTION)
_DASK_CACHE.cache.resize(nbytes)
elif nbytes is not None and nbytes != _DASK_CACHE.cache.available_bytes:
# if the cache has already been registered, then calling
# resize_dask_cache() without supplying either mem_fraction or nbytes
# is a no-op:
_DASK_CACHE.cache.resize(nbytes)
return _DASK_CACHE
def _is_dask_data(data: Any) -> bool:
"""Return True if data is a dask array or a list/tuple of dask arrays."""
return isinstance(data, da.Array) or (
isinstance(data, collections.abc.Sequence)
and any(isinstance(i, da.Array) for i in data)
)
def configure_dask(data: Any, cache: bool = True) -> DaskIndexer:
"""Spin up cache and return context manager that optimizes Dask indexing.
This function determines whether data is a dask array or list of dask
arrays and prepares some optimizations if so.
When a delayed dask array is given to napari, there are couple things that
need to be done to optimize performance.
1. Opportunistic caching needs to be enabled, such that we don't recompute
(or "re-read") data that has already been computed or read.
2. Dask task fusion must be turned off to prevent napari from triggering
new io on data that has already been read from disk. For example, with a
4D timelapse of 3D stacks, napari may actually *re-read* the entire 3D
tiff file every time the Z plane index is changed. Turning of Dask task
fusion with ``optimization.fuse.active == False`` prevents this.
.. note::
Turning off task fusion requires Dask version 2.15.0 or later.
For background and context, see `napari/napari#718
<https://github.com/napari/napari/issues/718>`_, `napari/napari#1124
<https://github.com/napari/napari/pull/1124>`_, and `dask/dask#6084
<https://github.com/dask/dask/pull/6084>`_.
For details on Dask task fusion, see the documentation on `Dask
Optimization <https://docs.dask.org/en/latest/optimize.html>`_.
Parameters
----------
data : Any
data, as passed to a ``Layer.__init__`` method.
Returns
-------
ContextManager
A context manager that can be used to optimize dask indexing
Examples
--------
>>> data = dask.array.ones((10,10,10))
>>> optimized_slicing = configure_dask(data)
>>> with optimized_slicing():
... data[0, 2].compute()
"""
if not _is_dask_data(data):
return contextlib.nullcontext
_cache = resize_dask_cache() if cache else contextlib.nullcontext()
@contextlib.contextmanager
def dask_optimized_slicing(
memfrac: float = 0.5,
) -> Iterator[tuple[Any, Any]]:
opts = {'optimization.fuse.active': False}
with dask.config.set(opts) as cfg, _cache as c:
yield cfg, c
return dask_optimized_slicing