Source code for napari.utils.events.containers._evented_list

"""MutableSequence that emits events when altered.

Note For Developers
===================

Be cautious when re-implementing typical list-like methods here (e.g. extend,
pop, clear, etc...).  By not re-implementing those methods, we force ALL "CRUD"
(create, read, update, delete) operations to go through a few key methods
defined by the abc.MutableSequence interface, where we can emit the necessary
events.

Specifically:

- ``insert`` = "create" : add a new item/index to the list
- ``__getitem__`` = "read" : get the value of an existing index
- ``__setitem__`` = "update" : update the value of an existing index
- ``__delitem__`` = "delete" : remove an existing index from the list

All of the additional list-like methods are provided by the MutableSequence
interface, and call one of those 4 methods.  So if you override a method, you
MUST make sure that all the appropriate events are emitted.  (Tests should
cover this in test_evented_list.py)
"""

import contextlib
import logging
from collections.abc import Generator, Iterable, Sequence
from typing import (
    Callable,
    Optional,
    Union,
)

from napari.utils.events.containers._typed import (
    _L,
    _T,
    Index,
    TypedMutableSequence,
)
from napari.utils.events.event import EmitterGroup, Event
from napari.utils.events.types import SupportsEvents
from napari.utils.translations import trans

logger = logging.getLogger(__name__)


[docs] class EventedList(TypedMutableSequence[_T]): """Mutable Sequence that emits events when altered. This class is designed to behave exactly like the builtin ``list``, but will emit events before and after all mutations (insertion, removal, setting, and moving). Parameters ---------- data : iterable, optional Elements to initialize the list with. basetype : type or sequence of types, optional Type of the elements in the list. lookup : dict of Type[L] : function(object) -> L Mapping between a type, and a function that converts items in the list to that type. Events ------ inserting (index: int) emitted before an item is inserted at ``index`` inserted (index: int, value: T) emitted after ``value`` is inserted at ``index`` removing (index: int) emitted before an item is removed at ``index`` removed (index: int, value: T) emitted after ``value`` is removed at ``index`` moving (index: int, new_index: int) emitted before an item is moved from ``index`` to ``new_index`` moved (index: int, new_index: int, value: T) emitted after ``value`` is moved from ``index`` to ``new_index`` changed (index: int, old_value: T, value: T) emitted when item at ``index`` is changed from ``old_value`` to ``value`` changed <OVERLOAD> (index: slice, old_value: List[_T], value: List[_T]) emitted when item at ``index`` is changed from ``old_value`` to ``value`` reordered (value: self) emitted when the list is reordered (eg. moved/reversed). """ events: EmitterGroup def __init__( self, data: Iterable[_T] = (), *, basetype: Union[type[_T], Sequence[type[_T]]] = (), lookup: Optional[dict[type[_L], Callable[[_T], Union[_T, _L]]]] = None, ) -> None: if lookup is None: lookup = {} _events = { 'inserting': None, # int 'inserted': None, # Tuple[int, Any] - (idx, value) 'removing': None, # int 'removed': None, # Tuple[int, Any] - (idx, value) 'moving': None, # Tuple[int, int] 'moved': None, # Tuple[Tuple[int, int], Any] 'changed': None, # Tuple[int, Any, Any] - (idx, old, new) 'reordered': None, # None } # For inheritance: If the mro already provides an EmitterGroup, add... if hasattr(self, 'events') and isinstance(self.events, EmitterGroup): self.events.add(**_events) else: # otherwise create a new one self.events = EmitterGroup( source=self, auto_connect=False, **_events ) super().__init__(data, basetype=basetype, lookup=lookup) # WAIT!! ... Read the module docstring before reimplement these methods # def append(self, item): ... # def clear(self): ... # def pop(self, index=-1): ... # def extend(self, value: Iterable[_T]): ... # def remove(self, value: T): ... def __setitem__(self, key, value: _T) -> None: old = self._list[key] # https://github.com/napari/napari/pull/2120 if isinstance(key, slice): if not isinstance(value, Iterable): raise TypeError( trans._( 'Can only assign an iterable to slice', deferred=True, ) ) value = list( value ) # make sure we don't empty generators and reuse them if value == old: return [self._type_check(v) for v in value] # before we mutate the list if key.step is not None: # extended slices are more restricted indices = list(range(*key.indices(len(self)))) if not len(value) == len(indices): raise ValueError( trans._( 'attempt to assign sequence of size {size} to extended slice of size {slice_size}', deferred=True, size=len(value), slice_size=len(indices), ) ) for i, v in zip(indices, value): self.__setitem__(i, v) else: del self[key] start = key.start or 0 for i, v in enumerate(value): self.insert(start + i, v) else: if value is old: return super().__setitem__(key, value) self.events.changed(index=key, old_value=old, value=value) def _delitem_indices( self, key: Index ) -> Iterable[tuple['EventedList[_T]', int]]: # returning List[(self, int)] allows subclasses to pass nested members if isinstance(key, int): return [(self, key if key >= 0 else key + len(self))] if isinstance(key, slice): return [(self, i) for i in range(*key.indices(len(self)))] if type(key) in self._lookup: return [(self, self.index(key))] valid = {int, slice}.union(set(self._lookup)) raise TypeError( trans._( 'Deletion index must be {valid!r}, got {dtype}', deferred=True, valid=valid, dtype=type(key), ) ) def __delitem__(self, key: Index) -> None: # delete from the end for parent, index in sorted(self._delitem_indices(key), reverse=True): parent.events.removing(index=index) self._disconnect_child_emitters(parent[index]) item = parent._list.pop(index) self._process_delete_item(item) parent.events.removed(index=index, value=item) def _process_delete_item(self, item: _T) -> None: """Allow process item in inherited class before event was emitted"""
[docs] def insert(self, index: int, value: _T) -> None: """Insert ``value`` before index.""" self.events.inserting(index=index) super().insert(index, value) self.events.inserted(index=index, value=value) self._connect_child_emitters(value)
def _reemit_child_event(self, event: Event) -> None: """An item in the list emitted an event. Re-emit with index""" if not hasattr(event, 'index'): with contextlib.suppress(ValueError): event.index = self.index(event.source) # reemit with this object's EventEmitter self.events(event) def _disconnect_child_emitters(self, child: _T) -> None: """Disconnect all events from the child from the reemitter.""" if isinstance(child, SupportsEvents): child.events.disconnect(self._reemit_child_event) def _connect_child_emitters(self, child: _T) -> None: """Connect all events from the child to be reemitted.""" if isinstance(child, SupportsEvents): # make sure the event source has been set on the child if child.events.source is None: child.events.source = child child.events.connect(self._reemit_child_event)
[docs] def move(self, src_index: int, dest_index: int = 0) -> bool: """Insert object at ``src_index`` before ``dest_index``. Both indices refer to the list prior to any object removal (pre-move space). """ if dest_index < 0: dest_index += len(self) + 1 if dest_index in (src_index, src_index + 1): # this is a no-op return False self.events.moving(index=src_index, new_index=dest_index) item = self._list.pop(src_index) if dest_index > src_index: dest_index -= 1 self._list.insert(dest_index, item) self.events.moved(index=src_index, new_index=dest_index, value=item) self.events.reordered(value=self) return True
[docs] def move_multiple( self, sources: Iterable[Index], dest_index: int = 0 ) -> int: """Move a batch of `sources` indices, to a single destination. Note, if `dest_index` is higher than any of the `sources`, then the resulting position of the moved objects after the move operation is complete will be lower than `dest_index`. Parameters ---------- sources : Sequence[int or slice] A sequence of indices dest_index : int, optional The destination index. All sources will be inserted before this index (in pre-move space), by default 0... which has the effect of "bringing to front" everything in ``sources``, or acting as a "reorder" method if ``sources`` contains all indices. Returns ------- int The number of successful move operations completed. Raises ------ TypeError If the destination index is a slice, or any of the source indices are not ``int`` or ``slice``. """ logger.debug( 'move_multiple(sources={sources}, dest_index={dest_index})', extra={'sources': sources, 'dest_index': dest_index}, ) # calling list here makes sure that there are no index errors up front move_plan = list(self._move_plan(sources, dest_index)) # don't assume index adjacency ... so move objects one at a time # this *could* be simplified with an intermediate list ... but this way # allows any views (such as QtViews) to update themselves more easily. # If this needs to be changed in the future for performance reasons, # then the associated QtListView will need to changed from using # `beginMoveRows` & `endMoveRows` to using `layoutAboutToBeChanged` & # `layoutChanged` while *manually* updating model indices with # `changePersistentIndexList`. That becomes much harder to do with # nested tree-like models. with self.events.reordered.blocker(): for src, dest in move_plan: self.move(src, dest) self.events.reordered(value=self) return len(move_plan)
def _move_plan( self, sources: Iterable[Index], dest_index: int ) -> Generator[tuple[int, int], None, None]: """Prepared indices for a multi-move. Given a set of ``sources`` from anywhere in the list, and a single ``dest_index``, this function computes and yields ``(from_index, to_index)`` tuples that can be used sequentially in single move operations. It keeps track of what has moved where and updates the source and destination indices to reflect the model at each point in the process. This is useful for a drag-drop operation with a QtModel/View. Parameters ---------- sources : Iterable[tuple[int, ...]] An iterable of tuple[int] that should be moved to ``dest_index``. dest_index : Tuple[int] The destination for sources. """ if isinstance(dest_index, slice): raise TypeError( trans._( 'Destination index may not be a slice', deferred=True, ) ) to_move: list[int] = [] for idx in sources: if isinstance(idx, slice): to_move.extend(list(range(*idx.indices(len(self))))) elif isinstance(idx, int): to_move.append(idx) else: raise TypeError( trans._( 'Can only move integer or slice indices, not {t}', deferred=True, t=type(idx), ) ) to_move = list(dict.fromkeys(to_move)) if dest_index < 0: dest_index += len(self) + 1 d_inc = 0 popped: list[int] = [] for i, src in enumerate(to_move): if src != dest_index: # we need to decrement the src_i by 1 for each time we have # previously pulled items out from in front of the src_i src -= sum(x <= src for x in popped) # if source is past the insertion point, increment src for each # previous insertion if src >= dest_index: src += i yield src, dest_index + d_inc popped.append(src) # if the item moved up, increment the destination index if dest_index <= src: d_inc += 1
[docs] def reverse(self) -> None: """Reverse list *IN PLACE*.""" # reimplementing this method to emit a change event # If this method were removed, .reverse() would still be available, # it would just emit a "changed" event for each moved index in the list self._list.reverse() self.events.reordered(value=self)