Source code for persidict.local_dict

"""In-memory persistent dictionary backed by a RAM-only hierarchical store.

This module provides `LocalDict`, a `PersiDict` implementation that keeps
all data in process memory using a simple tree structure (`_RAMBackend`).
It mirrors `FileDirDict` semantics without any disk or network I/O, making
it useful for tests and ephemeral workloads where durability is not required.
"""

from __future__ import annotations

import time
from copy import deepcopy
from typing import Any, Iterable, NamedTuple

from mixinforge import sort_dict_by_keys

from .persi_dict import PersiDict, NonEmptyPersiDictKey, ValueType
from .exceptions import MutationPolicyError
from .safe_str_tuple import SafeStrTuple, NonEmptySafeStrTuple
from .jokers_and_status_flags import EXECUTION_IS_COMPLETE, ETagValue, Joker


class _StoredEntry(NamedTuple):
    """A single value entry stored in the in-memory backend."""
    value: Any
    timestamp: float
    write_counter: int


class _RAMBackend:
    """In-memory hierarchical storage backing LocalDict.

    This lightweight backend models a directory-like tree entirely in RAM and
    is used by LocalDict to provide a PersiDict-compliant interface without any
    disk or network I/O. Keys are sequences of safe strings. Each path segment
    maps to a child RAMBackend node, while leaf entries are stored in a values
    bucket per serialization_format.

    Attributes:
        subdicts:
            Mapping of first-level key segment to a child RAMBackend representing
            the corresponding subtree.
        values:
            Mapping of serialization_format to a dictionary of
            leaf-name -> _StoredEntry. Each entry holds the value, a POSIX
            timestamp (time.time()), and a monotonic write counter for ETags.
        _write_counter:
            Shared mutable monotonic counter used for ETag generation. Stored as
            a single-element list so that child nodes created via child() share
            the same counter object, ensuring ETags are unique across the entire
            backend tree.

    Notes:
        - This backend is intentionally minimal and does not enforce character
          safety of key segments or serialization_format; that validation is handled by
          higher-level classes (e.g., PersiDict/LocalDict).
        - Not thread-safe or process-safe. If used concurrently, external
          synchronization is required.
        - Memory-only: all data is lost when the object is discarded.
    """

    def __init__(self):
        """Initialize an empty in-memory tree.

        Creates empty containers for child subtrees and for value buckets
        grouped by serialization_format. No arguments; the backend starts empty.

        Attributes initialized:
            subdicts: Empty mapping for first-level child nodes.
            values: Empty mapping for per-serialization_format value buckets.
            _write_counter: Monotonic counter shared by all LocalDict instances
                that reference this backend (directly or via subdicts), ensuring
                ETags are unique across the entire backend tree.
        """
        self.subdicts: dict[str, _RAMBackend] = {}
        self.values: dict[str, dict[str, _StoredEntry]] = {}
        self._write_counter: list[int] = [0]

    def child(self, name: str) -> "_RAMBackend":
        """Return a child node for the given path segment, creating if missing.

        Args:
            name: A single safe string segment representing the first-level
                part of a hierarchical key.

        Returns:
            The existing or newly created child backend for the
            provided segment.

        Notes:
            - This method mutates the structure by creating a child node when
              it does not exist yet.
        """
        child_backend = self.subdicts.get(name)
        if child_backend is None:
            child_backend = _RAMBackend()
            child_backend._write_counter = self._write_counter
            self.subdicts[name] = child_backend
        return child_backend

    def get_values_bucket(self, serialization_format: str) -> dict[str, _StoredEntry]:
        """Return the per-serialization_format bucket for leaf values, creating if absent.

        The bucket maps a leaf key (final segment string) to a _StoredEntry
        containing the value, timestamp, and write counter.

        Args:
            serialization_format: Object type label under which values are
                grouped (e.g., "pkl", "json"). No validation is performed here.

        Returns:
            The mutable mapping for this serialization_format.
            Modifications affect the backend state directly.
        """
        bucket = self.values.get(serialization_format)
        if bucket is None:
            bucket = {}
            self.values[serialization_format] = bucket
        return bucket


[docs] class LocalDict(PersiDict[ValueType]): """In-memory PersiDict backed by a RAM-only hierarchical store. LocalDict mirrors FileDirDict semantics but keeps all data in process memory using a simple tree structure (RAMBackend). It is useful for tests and ephemeral workloads where durability is not required. Keys are hierarchical sequences of safe strings (SafeStrTuple). Values are stored per serialization_format and tracked with modification timestamps, providing the same API surface as other PersiDict implementations. Attributes: append_only: If True, items are immutable and cannot be modified or deleted after initial creation. base_class_for_values: Optional base class that all stored values must inherit from. If None, any type is accepted (with serialization_format restrictions enforced by the base class). serialization_format: Logical serialization/format label (e.g., "pkl", "json") used as a namespace for values and timestamps within the backend. _backend: The in-memory tree that actually stores data. Notes: - Not thread-safe or process-safe; use external synchronization if accessed concurrently. - Memory-only: all data is lost when the object is garbage-collected or the process exits. """ def __init__(self, *, backend: _RAMBackend | None = None, serialization_format: str = "pkl", append_only: bool = False, base_class_for_values: type | None = None, prune_interval: int | None = 64): """Initialize an in-memory persistent dictionary. Args: backend: Optional existing RAMBackend tree to use. If None, a new empty backend is created. serialization_format: Logical serialization/format label under which values are grouped (e.g., "pkl", "json"). Defaults to "pkl". append_only: If True, items are immutable and cannot be modified or deleted after the first write. Defaults to False. base_class_for_values: Optional base class that all stored values must inherit from. If None, any type is accepted (subject to serialization_format restrictions). Defaults to None. prune_interval: If None or <= 0, disables pruning. Otherwise, run pruning only once every N destructive operations (deletions/clears). Higher values reduce pruning overhead at the cost of keeping some empty nodes around until the next prune. Defaults to 64. Raises: ValueError: Propagated from PersiDict if serialization_format is empty, has unsafe characters, or is incompatible with value type policy. TypeError: Propagated from PersiDict if base_class_for_values has an invalid type. """ self._backend = backend or _RAMBackend() # Pruning throttling if prune_interval is None: self._prune_interval = None else: try: pi = int(prune_interval) except (TypeError, ValueError): pi = 64 self._prune_interval = None if pi <= 0 else pi self._ops_since_prune: int = 0 PersiDict.__init__(self, append_only=append_only, base_class_for_values=base_class_for_values, serialization_format=serialization_format)
[docs] def get_params(self) -> dict[str, Any]: """Return constructor parameters needed to recreate this instance. Note that the backend object itself is included as a reference; copying or reconstructing a LocalDict with this parameter will share the same in-memory store. Returns: A dictionary of parameters (sorted by key) suitable for passing to the constructor. """ params = super().get_params() additional_params = dict( backend=self._backend, prune_interval=self._prune_interval) params = {**params, **additional_params} sorted_params = sort_dict_by_keys(params) return sorted_params
# No base_url/base_dir override: keep defaults (None) def __len__(self) -> int: """Return the total number of items stored for this serialization_format. Counts all keys across the entire in-memory tree that belong to the current serialization_format namespace. Returns: Total number of items. """ def count(node: _RAMBackend) -> int: total = len(node.values.get(self.serialization_format, {})) for child in node.subdicts.values(): total += count(child) return total return count(self._backend)
[docs] def clear(self) -> None: """Remove all items under this serialization_format across the entire tree. Only entries stored for the current serialization_format are removed; data for other serialization formats remains intact. """ # Override for efficiency (optional). Remove only our serialization_format data. self._check_delete_policy() def clear_ft(node: _RAMBackend): node.values.pop(self.serialization_format, None) for ch in node.subdicts.values(): clear_ft(ch) clear_ft(self._backend) # Throttled pruning: run only once per prune_interval destructive ops self._maybe_prune()
def _maybe_prune(self) -> None: """Increment destructive-op counter and prune when threshold reached. Pruning the entire in-memory tree can be relatively expensive for large datasets. To amortize the cost, we only prune once every ``self._prune_interval`` deletions/clears. This keeps memory usage bounded over time without incurring per-operation full-tree traversals. """ if self._prune_interval is None: return self._ops_since_prune += 1 if self._ops_since_prune >= self._prune_interval: self._prune_empty_subtrees(self._backend) self._ops_since_prune = 0 def _prune_empty_subtrees(self, node: _RAMBackend | None = None) -> bool: """Remove empty per-serialization_format buckets and prunes empty subtrees. This walks the in-memory tree and: - Deletes value buckets that became empty (no leaves for any serialization_format). - Recursively deletes child subdicts that become empty after pruning. A node is considered empty if it has no children (subdicts) and no non-empty value buckets in ``values``. The method returns a boolean indicating whether the given node is now empty. Args: node: Node to prune; defaults to the root. Returns: True if the node is empty after pruning; False otherwise. """ if node is None: node = self._backend # First, prune children depth-first for name, child in list(node.subdicts.items()): if self._prune_empty_subtrees(child): del node.subdicts[name] # Next, drop empty value buckets for any serialization_format for ft, bucket in list(node.values.items()): if not bucket: # empty dict del node.values[ft] # Node is empty if it has no children and no value buckets left return not node.subdicts and not node.values def _navigate_to_parent(self , key: SafeStrTuple , create_if_missing: bool = True ) -> tuple[_RAMBackend | None, str]: """Resolve a hierarchical key to its parent node and leaf name. This helper walks all segments of the key except the last one to find the corresponding RAMBackend node that contains the leaf bucket for this serialization_format. Behavior: - When create_if_missing is True (default), missing intermediate nodes are created as needed (write-path semantics). - When create_if_missing is False, traversal stops and returns (None, None) if an intermediate node is absent (read-path semantics), ensuring no phantom nodes are created. Args: key: Full hierarchical key. Must be non-empty; the last segment is treated as the leaf item name. create_if_missing: Whether to create intermediate nodes while traversing. Defaults to True. Returns: A pair consisting of the backend node that would hold the leaf bucket (or None if not found during lookup when create_if_missing=False) and the leaf segment (final component). """ backend_node: _RAMBackend | None = self._backend for segment in key[:-1]: if backend_node is None: break if create_if_missing: backend_node = backend_node.child(segment) else: backend_node = backend_node.subdicts.get(segment) if backend_node is None: # Early exit: path does not exist and we shouldn't create it return None, None return backend_node, key[-1] def _lookup_leaf( self , key: NonEmptySafeStrTuple ) -> tuple[dict[str, _StoredEntry], str]: """Resolve a key to its values bucket and leaf name, or raise KeyError. Navigates to the parent node for *key* and looks up the leaf in the bucket for the current serialization_format. Raises KeyError if the path or leaf does not exist. Args: key: A normalised, non-empty safe-string key. Returns: A (bucket, leaf) pair where *bucket* is the mutable dict mapping leaf names to _StoredEntry instances, and *leaf* is the final key segment. Raises: KeyError: If the key does not exist. """ parent_node, leaf = self._navigate_to_parent( key, create_if_missing=False) if parent_node is None: raise KeyError(key) bucket = parent_node.values.get(self.serialization_format, {}) if leaf not in bucket: raise KeyError(key) return bucket, leaf def __contains__(self, key: NonEmptyPersiDictKey) -> bool: """Return True if the key exists in the current serialization_format namespace. Args: key: Key (string/sequence or SafeStrTuple). Returns: True if the key is present; False otherwise. """ key = NonEmptySafeStrTuple(key) parent_node, leaf = self._navigate_to_parent(key, create_if_missing=False) if parent_node is None: return False bucket = parent_node.values.get(self.serialization_format, {}) return leaf in bucket def __getitem__(self, key: NonEmptyPersiDictKey) -> ValueType: """Retrieve the value stored for a key. Args: key: Key (string/sequence or SafeStrTuple). Returns: The stored value. Raises: KeyError: If the key does not exist. TypeError: If base_class_for_values is set and the stored value does not match it. """ key = NonEmptySafeStrTuple(key) bucket, leaf = self._lookup_leaf(key) value = deepcopy(bucket[leaf].value) self._validate_returned_value(value) return value def _get_value_and_etag(self, key: NonEmptySafeStrTuple) -> tuple[ValueType, ETagValue]: """Return a consistent value and ETag for a key in a single lookup. Args: key: Normalized dictionary key. Returns: A matching (value, ETag) pair. Raises: KeyError: If the key does not exist. """ key = NonEmptySafeStrTuple(key) bucket, leaf = self._lookup_leaf(key) entry = bucket[leaf] value = deepcopy(entry.value) self._validate_returned_value(value) etag = ETagValue(str(entry.write_counter)) return value, etag def __setitem__(self, key: NonEmptyPersiDictKey, value: ValueType | Joker) -> None: """Store a value for a key. Interprets joker values (KEEP_CURRENT, DELETE_CURRENT) using the base class helper and enforces optional type restrictions if base_class_for_values is set. When append_only is True, the existence check and the insert are performed on the same bucket dict in a single code path, closing the TOCTOU gap that a separate ``key in self`` guard would have. Args: key: Key (string/sequence or SafeStrTuple). value: Value to store, or a joker. Raises: MutationPolicyError: If attempting to modify an existing item when append_only is True. TypeError: If value is a PersiDict or does not match base_class_for_values when it is set. """ key = NonEmptySafeStrTuple(key) if self._process_setitem_args(key, value) is EXECUTION_IS_COMPLETE: return None parent_node, leaf = self._navigate_to_parent(key) bucket = parent_node.get_values_bucket(self.serialization_format) if self.append_only and leaf in bucket: raise MutationPolicyError("append-only") self._backend._write_counter[0] += 1 bucket[leaf] = _StoredEntry( deepcopy(value), time.time(), self._backend._write_counter[0]) def _remove_item(self, key: NonEmptySafeStrTuple) -> None: """Remove *key* from the in-memory tree. Raises: KeyError: If the key does not exist. """ bucket, leaf = self._lookup_leaf(key) del bucket[leaf] # raises KeyError if missing # Throttled pruning: run only once per prune_interval destructive ops self._maybe_prune() def __delitem__(self, key: NonEmptyPersiDictKey) -> None: """Delete a stored value for a key. Args: key: Key (string/sequence or SafeStrTuple). Raises: MutationPolicyError: If append_only is True. KeyError: If the key does not exist. """ key = NonEmptySafeStrTuple(key) self._process_delitem_args(key) self._remove_item(key) def _generic_iter(self, result_type: set[str]): """Underlying implementation for keys/values/items/timestamps iterators. Traverses the in-memory tree and yields entries based on the requested result_type. The shapes of yielded items mirror FileDirDict: - {"keys"} -> SafeStrTuple - {"values"} -> Any - {"keys", "values"} -> tuple[SafeStrTuple, Any] - {"keys", "values", "timestamps"} or {"keys", "timestamps"} -> tuples that end with a float POSIX timestamp. Args: result_type: Any non-empty subset of {"keys", "values", "timestamps"} specifying which fields to yield. Returns: A generator over requested items. Raises: TypeError: If result_type is not a set, or if base_class_for_values is set and a yielded value does not match it. ValueError: If result_type is empty or contains unsupported labels. """ self._process_generic_iter_args(result_type) def walk(prefix: tuple[str, ...], node: _RAMBackend): # yield values at this level bucket = node.values.get(self.serialization_format, {}) for leaf, entry in bucket.items(): full_key = SafeStrTuple((*prefix, leaf)) value = deepcopy(entry.value) if "values" in result_type: self._validate_returned_value(value) yield self._assemble_iter_result( result_type , key=full_key , value=value , timestamp=entry.timestamp) # then recurse into children for name, child in node.subdicts.items(): yield from walk((*prefix, name), child) return walk((), self._backend)
[docs] def timestamp(self, key: NonEmptyPersiDictKey) -> float: """Return the last modification time of a key. Args: key: Key (string/sequence or SafeStrTuple). Returns: POSIX timestamp (seconds since Unix epoch) when the value was last written. Raises: KeyError: If the key does not exist. """ key = NonEmptySafeStrTuple(key) bucket, leaf = self._lookup_leaf(key) return bucket[leaf].timestamp
[docs] def etag(self, key: NonEmptyPersiDictKey) -> ETagValue: """Return a unique ETag for a key based on a monotonic write counter. Unlike the base class (which formats the timestamp), LocalDict uses an integer counter stored on the backend that increments on every write, guaranteeing a distinct ETag even when two writes occur within the same clock tick or from different LocalDict instances sharing the same backend. Args: key: Key (string/sequence or SafeStrTuple). Returns: A unique opaque string identifying the current version. Raises: KeyError: If the key does not exist. """ key = NonEmptySafeStrTuple(key) bucket, leaf = self._lookup_leaf(key) return ETagValue(str(bucket[leaf].write_counter))
[docs] def get_subdict(self, prefix_key: Iterable[str] | SafeStrTuple) -> 'LocalDict[ValueType]': """Return a view rooted at the given key prefix. The returned LocalDict shares the same underlying RAMBackend, but its root is moved to the subtree identified by prefix_key. If intermediate nodes do not exist, they are created (resulting in an empty subdict). Modifications to a sub-dictionary will affect the parent dictionary and any other sub-dictionaries that share the same backend. Args: prefix_key: Key prefix identifying the subtree to expose. May be empty to refer to the current root. Returns: A LocalDict instance whose operations are restricted to the keys under the specified prefix. """ prefix = SafeStrTuple(prefix_key) if not isinstance(prefix_key, SafeStrTuple) else prefix_key root_node = self._backend for segment in prefix: root_node = root_node.child(segment) # Create a new LocalDict rooted at this backend return LocalDict(backend=root_node, serialization_format=self.serialization_format, append_only=self.append_only, base_class_for_values=self.base_class_for_values, prune_interval=self._prune_interval)