"""In-memory persistent dictionary backed by a RAM-only hierarchical store.
This module provides `LocalDict`, a `PersiDict` implementation that keeps
all data in process memory using a simple tree structure (`_RAMBackend`).
It mirrors `FileDirDict` semantics without any disk or network I/O, making
it useful for tests and ephemeral workloads where durability is not required.
"""
from __future__ import annotations
import time
from copy import deepcopy
from typing import Any, Iterable, NamedTuple
from mixinforge import sort_dict_by_keys
from .persi_dict import PersiDict, NonEmptyPersiDictKey, ValueType
from .exceptions import MutationPolicyError
from .safe_str_tuple import SafeStrTuple, NonEmptySafeStrTuple
from .jokers_and_status_flags import EXECUTION_IS_COMPLETE, ETagValue, Joker
class _StoredEntry(NamedTuple):
"""A single value entry stored in the in-memory backend."""
value: Any
timestamp: float
write_counter: int
class _RAMBackend:
"""In-memory hierarchical storage backing LocalDict.
This lightweight backend models a directory-like tree entirely in RAM and
is used by LocalDict to provide a PersiDict-compliant interface without any
disk or network I/O. Keys are sequences of safe strings. Each path segment
maps to a child RAMBackend node, while leaf entries are stored in a values
bucket per serialization_format.
Attributes:
subdicts:
Mapping of first-level key segment to a child RAMBackend representing
the corresponding subtree.
values:
Mapping of serialization_format to a dictionary of
leaf-name -> _StoredEntry. Each entry holds the value, a POSIX
timestamp (time.time()), and a monotonic write counter for ETags.
_write_counter:
Shared mutable monotonic counter used for ETag generation. Stored as
a single-element list so that child nodes created via child() share
the same counter object, ensuring ETags are unique across the entire
backend tree.
Notes:
- This backend is intentionally minimal and does not enforce character
safety of key segments or serialization_format; that validation is handled by
higher-level classes (e.g., PersiDict/LocalDict).
- Not thread-safe or process-safe. If used concurrently, external
synchronization is required.
- Memory-only: all data is lost when the object is discarded.
"""
def __init__(self):
"""Initialize an empty in-memory tree.
Creates empty containers for child subtrees and for value buckets
grouped by serialization_format. No arguments; the backend starts empty.
Attributes initialized:
subdicts: Empty mapping for first-level child nodes.
values: Empty mapping for per-serialization_format value buckets.
_write_counter: Monotonic counter shared by all LocalDict instances
that reference this backend (directly or via subdicts), ensuring
ETags are unique across the entire backend tree.
"""
self.subdicts: dict[str, _RAMBackend] = {}
self.values: dict[str, dict[str, _StoredEntry]] = {}
self._write_counter: list[int] = [0]
def child(self, name: str) -> "_RAMBackend":
"""Return a child node for the given path segment, creating if missing.
Args:
name: A single safe string segment representing the first-level
part of a hierarchical key.
Returns:
The existing or newly created child backend for the
provided segment.
Notes:
- This method mutates the structure by creating a child node when
it does not exist yet.
"""
child_backend = self.subdicts.get(name)
if child_backend is None:
child_backend = _RAMBackend()
child_backend._write_counter = self._write_counter
self.subdicts[name] = child_backend
return child_backend
def get_values_bucket(self, serialization_format: str) -> dict[str, _StoredEntry]:
"""Return the per-serialization_format bucket for leaf values, creating if absent.
The bucket maps a leaf key (final segment string) to a _StoredEntry
containing the value, timestamp, and write counter.
Args:
serialization_format: Object type label under which values are
grouped (e.g., "pkl", "json"). No validation is performed here.
Returns:
The mutable mapping for this serialization_format.
Modifications affect the backend state directly.
"""
bucket = self.values.get(serialization_format)
if bucket is None:
bucket = {}
self.values[serialization_format] = bucket
return bucket
[docs]
class LocalDict(PersiDict[ValueType]):
"""In-memory PersiDict backed by a RAM-only hierarchical store.
LocalDict mirrors FileDirDict semantics but keeps all data in process
memory using a simple tree structure (RAMBackend). It is useful for tests
and ephemeral workloads where durability is not required. Keys are
hierarchical sequences of safe strings (SafeStrTuple). Values are stored
per serialization_format and tracked with modification timestamps, providing the same
API surface as other PersiDict implementations.
Attributes:
append_only: If True, items are immutable and cannot be
modified or deleted after initial creation.
base_class_for_values: Optional base class that all
stored values must inherit from. If None, any type is accepted (with
serialization_format restrictions enforced by the base class).
serialization_format: Logical serialization/format label (e.g., "pkl",
"json") used as a namespace for values and timestamps within the
backend.
_backend: The in-memory tree that actually stores data.
Notes:
- Not thread-safe or process-safe; use external synchronization if
accessed concurrently.
- Memory-only: all data is lost when the object is garbage-collected or
the process exits.
"""
def __init__(self, *,
backend: _RAMBackend | None = None,
serialization_format: str = "pkl",
append_only: bool = False,
base_class_for_values: type | None = None,
prune_interval: int | None = 64):
"""Initialize an in-memory persistent dictionary.
Args:
backend: Optional existing RAMBackend tree to
use. If None, a new empty backend is created.
serialization_format: Logical serialization/format label under which
values are grouped (e.g., "pkl", "json"). Defaults to "pkl".
append_only: If True, items are immutable and cannot
be modified or deleted after the first write. Defaults to False.
base_class_for_values: Optional base class that all
stored values must inherit from. If None, any type is accepted
(subject to serialization_format restrictions). Defaults to None.
prune_interval: If None or <= 0, disables pruning.
Otherwise, run pruning only once every N destructive
operations (deletions/clears). Higher values reduce pruning
overhead at the cost of keeping some empty nodes around until
the next prune. Defaults to 64.
Raises:
ValueError: Propagated from PersiDict if serialization_format is empty, has
unsafe characters, or is incompatible with value type policy.
TypeError: Propagated from PersiDict if base_class_for_values has an
invalid type.
"""
self._backend = backend or _RAMBackend()
# Pruning throttling
if prune_interval is None:
self._prune_interval = None
else:
try:
pi = int(prune_interval)
except (TypeError, ValueError):
pi = 64
self._prune_interval = None if pi <= 0 else pi
self._ops_since_prune: int = 0
PersiDict.__init__(self,
append_only=append_only,
base_class_for_values=base_class_for_values,
serialization_format=serialization_format)
[docs]
def get_params(self) -> dict[str, Any]:
"""Return constructor parameters needed to recreate this instance.
Note that the backend object itself is included as a reference; copying
or reconstructing a LocalDict with this parameter will share the same
in-memory store.
Returns:
A dictionary of parameters (sorted by key) suitable for
passing to the constructor.
"""
params = super().get_params()
additional_params = dict(
backend=self._backend,
prune_interval=self._prune_interval)
params = {**params, **additional_params}
sorted_params = sort_dict_by_keys(params)
return sorted_params
# No base_url/base_dir override: keep defaults (None)
def __len__(self) -> int:
"""Return the total number of items stored for this serialization_format.
Counts all keys across the entire in-memory tree that belong to the
current serialization_format namespace.
Returns:
Total number of items.
"""
def count(node: _RAMBackend) -> int:
total = len(node.values.get(self.serialization_format, {}))
for child in node.subdicts.values():
total += count(child)
return total
return count(self._backend)
[docs]
def clear(self) -> None:
"""Remove all items under this serialization_format across the entire tree.
Only entries stored for the current serialization_format are removed; data for
other serialization formats remains intact.
"""
# Override for efficiency (optional). Remove only our serialization_format data.
self._check_delete_policy()
def clear_ft(node: _RAMBackend):
node.values.pop(self.serialization_format, None)
for ch in node.subdicts.values():
clear_ft(ch)
clear_ft(self._backend)
# Throttled pruning: run only once per prune_interval destructive ops
self._maybe_prune()
def _maybe_prune(self) -> None:
"""Increment destructive-op counter and prune when threshold reached.
Pruning the entire in-memory tree can be relatively expensive for large
datasets. To amortize the cost, we only prune once every
``self._prune_interval`` deletions/clears. This keeps memory usage
bounded over time without incurring per-operation full-tree traversals.
"""
if self._prune_interval is None:
return
self._ops_since_prune += 1
if self._ops_since_prune >= self._prune_interval:
self._prune_empty_subtrees(self._backend)
self._ops_since_prune = 0
def _prune_empty_subtrees(self, node: _RAMBackend | None = None) -> bool:
"""Remove empty per-serialization_format buckets and prunes empty subtrees.
This walks the in-memory tree and:
- Deletes value buckets that became empty (no leaves for any serialization_format).
- Recursively deletes child subdicts that become empty after pruning.
A node is considered empty if it has no children (subdicts) and no
non-empty value buckets in ``values``. The method returns a boolean
indicating whether the given node is now empty.
Args:
node: Node to prune; defaults to the root.
Returns:
True if the node is empty after pruning; False otherwise.
"""
if node is None:
node = self._backend
# First, prune children depth-first
for name, child in list(node.subdicts.items()):
if self._prune_empty_subtrees(child):
del node.subdicts[name]
# Next, drop empty value buckets for any serialization_format
for ft, bucket in list(node.values.items()):
if not bucket: # empty dict
del node.values[ft]
# Node is empty if it has no children and no value buckets left
return not node.subdicts and not node.values
def _navigate_to_parent(self
, key: SafeStrTuple
, create_if_missing: bool = True
) -> tuple[_RAMBackend | None, str]:
"""Resolve a hierarchical key to its parent node and leaf name.
This helper walks all segments of the key except the last one to find
the corresponding RAMBackend node that contains the leaf bucket for this
serialization_format.
Behavior:
- When create_if_missing is True (default), missing intermediate
nodes are created as needed (write-path semantics).
- When create_if_missing is False, traversal stops and returns
(None, None) if an intermediate node is absent (read-path
semantics), ensuring no phantom nodes are created.
Args:
key: Full hierarchical key. Must be non-empty; the
last segment is treated as the leaf item name.
create_if_missing: Whether to create intermediate nodes while
traversing. Defaults to True.
Returns:
A pair consisting of the backend
node that would hold the leaf bucket (or None if not found during
lookup when create_if_missing=False) and the leaf segment (final
component).
"""
backend_node: _RAMBackend | None = self._backend
for segment in key[:-1]:
if backend_node is None:
break
if create_if_missing:
backend_node = backend_node.child(segment)
else:
backend_node = backend_node.subdicts.get(segment)
if backend_node is None:
# Early exit: path does not exist and we shouldn't create it
return None, None
return backend_node, key[-1]
def _lookup_leaf(
self
, key: NonEmptySafeStrTuple
) -> tuple[dict[str, _StoredEntry], str]:
"""Resolve a key to its values bucket and leaf name, or raise KeyError.
Navigates to the parent node for *key* and looks up the leaf in the
bucket for the current serialization_format. Raises KeyError if the
path or leaf does not exist.
Args:
key: A normalised, non-empty safe-string key.
Returns:
A (bucket, leaf) pair where *bucket* is the mutable dict mapping
leaf names to _StoredEntry instances, and *leaf* is the final
key segment.
Raises:
KeyError: If the key does not exist.
"""
parent_node, leaf = self._navigate_to_parent(
key, create_if_missing=False)
if parent_node is None:
raise KeyError(key)
bucket = parent_node.values.get(self.serialization_format, {})
if leaf not in bucket:
raise KeyError(key)
return bucket, leaf
def __contains__(self, key: NonEmptyPersiDictKey) -> bool:
"""Return True if the key exists in the current serialization_format namespace.
Args:
key: Key (string/sequence or SafeStrTuple).
Returns:
True if the key is present; False otherwise.
"""
key = NonEmptySafeStrTuple(key)
parent_node, leaf = self._navigate_to_parent(key, create_if_missing=False)
if parent_node is None:
return False
bucket = parent_node.values.get(self.serialization_format, {})
return leaf in bucket
def __getitem__(self, key: NonEmptyPersiDictKey) -> ValueType:
"""Retrieve the value stored for a key.
Args:
key: Key (string/sequence or SafeStrTuple).
Returns:
The stored value.
Raises:
KeyError: If the key does not exist.
TypeError: If base_class_for_values is set and the stored value does
not match it.
"""
key = NonEmptySafeStrTuple(key)
bucket, leaf = self._lookup_leaf(key)
value = deepcopy(bucket[leaf].value)
self._validate_returned_value(value)
return value
def _get_value_and_etag(self, key: NonEmptySafeStrTuple) -> tuple[ValueType, ETagValue]:
"""Return a consistent value and ETag for a key in a single lookup.
Args:
key: Normalized dictionary key.
Returns:
A matching (value, ETag) pair.
Raises:
KeyError: If the key does not exist.
"""
key = NonEmptySafeStrTuple(key)
bucket, leaf = self._lookup_leaf(key)
entry = bucket[leaf]
value = deepcopy(entry.value)
self._validate_returned_value(value)
etag = ETagValue(str(entry.write_counter))
return value, etag
def __setitem__(self, key: NonEmptyPersiDictKey, value: ValueType | Joker) -> None:
"""Store a value for a key.
Interprets joker values (KEEP_CURRENT, DELETE_CURRENT) using the base
class helper and enforces optional type restrictions if
base_class_for_values is set.
When append_only is True, the existence check and the insert are
performed on the same bucket dict in a single code path, closing
the TOCTOU gap that a separate ``key in self`` guard would have.
Args:
key: Key (string/sequence or SafeStrTuple).
value: Value to store, or a joker.
Raises:
MutationPolicyError: If attempting to modify an existing item
when append_only is True.
TypeError: If value is a PersiDict or does not match
base_class_for_values when it is set.
"""
key = NonEmptySafeStrTuple(key)
if self._process_setitem_args(key, value) is EXECUTION_IS_COMPLETE:
return None
parent_node, leaf = self._navigate_to_parent(key)
bucket = parent_node.get_values_bucket(self.serialization_format)
if self.append_only and leaf in bucket:
raise MutationPolicyError("append-only")
self._backend._write_counter[0] += 1
bucket[leaf] = _StoredEntry(
deepcopy(value), time.time(), self._backend._write_counter[0])
def _remove_item(self, key: NonEmptySafeStrTuple) -> None:
"""Remove *key* from the in-memory tree.
Raises:
KeyError: If the key does not exist.
"""
bucket, leaf = self._lookup_leaf(key)
del bucket[leaf] # raises KeyError if missing
# Throttled pruning: run only once per prune_interval destructive ops
self._maybe_prune()
def __delitem__(self, key: NonEmptyPersiDictKey) -> None:
"""Delete a stored value for a key.
Args:
key: Key (string/sequence or SafeStrTuple).
Raises:
MutationPolicyError: If append_only is True.
KeyError: If the key does not exist.
"""
key = NonEmptySafeStrTuple(key)
self._process_delitem_args(key)
self._remove_item(key)
def _generic_iter(self, result_type: set[str]):
"""Underlying implementation for keys/values/items/timestamps iterators.
Traverses the in-memory tree and yields entries based on the requested
result_type. The shapes of yielded items mirror FileDirDict:
- {"keys"} -> SafeStrTuple
- {"values"} -> Any
- {"keys", "values"} -> tuple[SafeStrTuple, Any]
- {"keys", "values", "timestamps"} or {"keys", "timestamps"}
-> tuples that end with a float POSIX timestamp.
Args:
result_type: Any non-empty subset of {"keys", "values",
"timestamps"} specifying which fields to yield.
Returns:
A generator over requested items.
Raises:
TypeError: If result_type is not a set, or if base_class_for_values
is set and a yielded value does not match it.
ValueError: If result_type is empty or contains unsupported labels.
"""
self._process_generic_iter_args(result_type)
def walk(prefix: tuple[str, ...], node: _RAMBackend):
# yield values at this level
bucket = node.values.get(self.serialization_format, {})
for leaf, entry in bucket.items():
full_key = SafeStrTuple((*prefix, leaf))
value = deepcopy(entry.value)
if "values" in result_type:
self._validate_returned_value(value)
yield self._assemble_iter_result(
result_type
, key=full_key
, value=value
, timestamp=entry.timestamp)
# then recurse into children
for name, child in node.subdicts.items():
yield from walk((*prefix, name), child)
return walk((), self._backend)
[docs]
def timestamp(self, key: NonEmptyPersiDictKey) -> float:
"""Return the last modification time of a key.
Args:
key: Key (string/sequence or SafeStrTuple).
Returns:
POSIX timestamp (seconds since Unix epoch) when the value was
last written.
Raises:
KeyError: If the key does not exist.
"""
key = NonEmptySafeStrTuple(key)
bucket, leaf = self._lookup_leaf(key)
return bucket[leaf].timestamp
[docs]
def etag(self, key: NonEmptyPersiDictKey) -> ETagValue:
"""Return a unique ETag for a key based on a monotonic write counter.
Unlike the base class (which formats the timestamp), LocalDict uses
an integer counter stored on the backend that increments on every
write, guaranteeing a distinct ETag even when two writes occur
within the same clock tick or from different LocalDict instances
sharing the same backend.
Args:
key: Key (string/sequence or SafeStrTuple).
Returns:
A unique opaque string identifying the current version.
Raises:
KeyError: If the key does not exist.
"""
key = NonEmptySafeStrTuple(key)
bucket, leaf = self._lookup_leaf(key)
return ETagValue(str(bucket[leaf].write_counter))
[docs]
def get_subdict(self, prefix_key: Iterable[str] | SafeStrTuple) -> 'LocalDict[ValueType]':
"""Return a view rooted at the given key prefix.
The returned LocalDict shares the same underlying RAMBackend, but its
root is moved to the subtree identified by prefix_key. If intermediate
nodes do not exist, they are created (resulting in an empty subdict).
Modifications to a sub-dictionary will affect the parent dictionary
and any other sub-dictionaries that share the same backend.
Args:
prefix_key: Key prefix identifying the
subtree to expose. May be empty to refer to the current root.
Returns:
A LocalDict instance whose operations are restricted to
the keys under the specified prefix.
"""
prefix = SafeStrTuple(prefix_key) if not isinstance(prefix_key, SafeStrTuple) else prefix_key
root_node = self._backend
for segment in prefix:
root_node = root_node.child(segment)
# Create a new LocalDict rooted at this backend
return LocalDict(backend=root_node,
serialization_format=self.serialization_format,
append_only=self.append_only,
base_class_for_values=self.base_class_for_values,
prune_interval=self._prune_interval)