"""Persistent, dict-like API for durable key-value stores.
PersiDict defines a unified interface for persistent dictionaries. The API is
similar to Python's built-in dict with some differences (e.g., insertion order
is not guaranteed) and several additional convenience methods.
Keys are non-empty sequences of URL/filename-safe strings
represented by SafeStrTuple. Plain strings or sequences of strings are accepted
and automatically coerced to SafeStrTuple. Values can be
arbitrary Python objects unless an implementation restricts them
via `base_class_for_values`.
Persistence means items are stored durably (e.g., in local files or cloud
objects) and remain accessible across process lifetimes.
"""
from __future__ import annotations
from abc import abstractmethod
from collections.abc import MutableMapping
import heapq
from itertools import zip_longest
import joblib
import jsonpickle
import random
import time
from typing import Any, Sequence, Iterator, Mapping, Self
from mixinforge import ParameterizableMixin, sort_dict_by_keys
from . import NonEmptySafeStrTuple
from .jokers_and_status_flags import (KEEP_CURRENT, DELETE_CURRENT, Joker,
CONTINUE_NORMAL_EXECUTION, StatusFlag, EXECUTION_IS_COMPLETE,
ETagValue, ETagConditionFlag,
ANY_ETAG, ETAG_IS_THE_SAME, ETAG_HAS_CHANGED,
RetrieveValueFlag, ALWAYS_RETRIEVE,
NEVER_RETRIEVE, IF_ETAG_CHANGED,
ITEM_NOT_AVAILABLE, VALUE_NOT_RETRIEVED,
ETagIfExists, TransformingFunction,
ValueType,
OperationResult, ConditionalOperationResult)
from .exceptions import MutationPolicyError, ConcurrencyConflictError
from .safe_chars import contains_unsafe_chars
from .safe_str_tuple import SafeStrTuple
PersiDictKey = SafeStrTuple | Sequence[str] | str
"""Key type that includes empty tuples, used for prefix-based operations.
Accepts a SafeStrTuple (possibly empty), a string, or a sequence of strings.
Used in operations like get_subdict where an empty prefix is meaningful.
"""
NonEmptyPersiDictKey = NonEmptySafeStrTuple | Sequence[str] | str
"""Key type for item-level operations that require a non-empty key.
Accepts a NonEmptySafeStrTuple, a string, or a non-empty sequence of strings.
Used in __getitem__, __setitem__, __delitem__, __contains__, and other
operations that address a specific item. Characters within strings must be
URL/filename-safe. Strings and sequences are automatically converted into
SafeStrTuple.
"""
_GET_VALUE_AND_ETAG_MAX_RETRIES: int = 3
[docs]
class PersiDict(MutableMapping[NonEmptySafeStrTuple, ValueType], ParameterizableMixin):
"""Abstract dict-like interface for durable key-value stores.
Keys are URL/filename-safe sequences of strings (SafeStrTuple). Concrete
subclasses implement storage backends (e.g., filesystem, S3). The API is
similar to Python's dict but does not guarantee insertion order and adds
persistence-specific helpers (e.g., timestamp()).
Attributes (can't be changed after initialization):
append_only:
If True, items are immutable and non-removable: existing values
cannot be modified or deleted.
base_class_for_values:
Optional base class that all values must inherit from. If None, any
type is accepted.
serialization_format:
File extension/format for stored values (e.g., "pkl", "json").
"""
append_only:bool
base_class_for_values: type | None
serialization_format:str
def __init__(self,
*,
append_only: bool = False,
base_class_for_values: type|None = None,
serialization_format: str = "pkl"):
"""Initialize base parameters shared by all persistent dictionaries.
Args:
append_only: If True, items cannot be modified or deleted.
Defaults to False.
base_class_for_values: Optional base class that values
must inherit from. If None, values are not type-restricted.
Defaults to None.
serialization_format: File extension/format for stored values.
Defaults to "pkl".
Raises:
ValueError: If serialization_format is an empty string,
or contains unsafe characters, or not 'json' or 'pkl'
for non-string values.
TypeError: If base_class_for_values is not a type or None.
"""
self._append_only = bool(append_only)
if len(serialization_format) == 0:
raise ValueError("serialization_format must be a non-empty string")
if contains_unsafe_chars(serialization_format):
raise ValueError("serialization_format must contain only URL/filename-safe characters")
self.serialization_format = str(serialization_format)
if not isinstance(base_class_for_values, (type, type(None))):
raise TypeError("base_class_for_values must be a type or None")
if (base_class_for_values is None or
not issubclass(base_class_for_values, str)):
if serialization_format not in {"json", "pkl"}:
raise ValueError("For non-string values serialization_format must be either 'pkl' or 'json'.")
self.base_class_for_values = base_class_for_values
ParameterizableMixin.__init__(self)
[docs]
def get_params(self) -> dict[str, Any]:
"""Return configuration parameters of this dictionary.
Returns:
A sorted dictionary of parameters used to reconstruct the instance.
This supports the Parameterizable API and is absent in the
built-in dict.
"""
params = dict(
append_only=self.append_only,
base_class_for_values=self.base_class_for_values,
serialization_format=self.serialization_format
)
sorted_params = sort_dict_by_keys(params)
return sorted_params
def __copy__(self) -> Self:
"""Return a shallow copy of the dictionary.
This creates a new PersiDict instance with the same parameters, pointing
to the same underlying storage. This is analogous to `dict.copy()`.
Returns:
A new instance of the same concrete type that is a shallow copy of this one.
"""
if type(self) is PersiDict:
raise NotImplementedError("PersiDict is an abstract base class"
" and cannot be copied directly")
params = self.get_params()
return self.__class__(**params)
@property
def append_only(self) -> bool:
"""Whether the store is append-only.
Returns:
True if the store is append-only (contains immutable items
that cannot be modified or deleted), False otherwise.
"""
return self._append_only
def __repr__(self) -> str:
"""Return a reproducible string representation.
Returns:
Representation including class name and constructor parameters.
"""
params = self.get_params()
params_str = ', '.join(f'{k}={v!r}' for k, v in params.items())
return f'{self.__class__.__name__}({params_str})'
def __str__(self) -> str:
"""Return a user-friendly string with all items.
Returns:
Stringified dict of items.
"""
return str(dict(self.items()))
def __bool__(self) -> bool:
"""Return True if the dictionary is non-empty, False otherwise.
This provides an efficient truth-value check that avoids calling
__len__(), which can be expensive for large persistent stores
(e.g., full directory traversal for FileDirDict, S3 pagination
for BasicS3Dict). Instead, it attempts to retrieve just the first
key using the streaming iterator.
Returns:
True if at least one key exists; False if empty.
"""
try:
next(iter(self))
return True
except StopIteration:
return False
@abstractmethod
def __contains__(self, key:NonEmptyPersiDictKey) -> bool:
"""Check whether a key exists in the store.
Args:
key: Key (string or sequence of strings) or SafeStrTuple.
Returns:
True if key exists, False otherwise.
"""
raise NotImplementedError("PersiDict is an abstract base class"
" and cannot check items directly")
def _check_condition(
self,
condition: ETagConditionFlag,
expected_etag: ETagIfExists,
actual_etag: ETagIfExists
) -> bool:
"""Evaluate an ETag condition.
Args:
condition: The condition to check (ANY_ETAG, ETAG_IS_THE_SAME,
or ETAG_HAS_CHANGED).
expected_etag: The caller's expected ETag value, or
ITEM_NOT_AVAILABLE if the caller believes the key is absent.
actual_etag: The actual ETag value, or ITEM_NOT_AVAILABLE if
the key is absent.
Returns:
True if the condition is satisfied.
Raises:
ValueError: If condition is not a recognized ETagConditionFlag.
"""
if condition is ANY_ETAG:
return True
if condition is ETAG_IS_THE_SAME:
return expected_etag == actual_etag
if condition is ETAG_HAS_CHANGED:
return expected_etag != actual_etag
raise ValueError(
f"condition must be ANY_ETAG, ETAG_IS_THE_SAME, or ETAG_HAS_CHANGED, got {condition!r}")
def _actual_etag(self, key: NonEmptySafeStrTuple) -> ETagIfExists:
"""Return the actual ETag for a key, or ITEM_NOT_AVAILABLE if absent.
Args:
key: Normalized dictionary key.
Returns:
The ETag value, or ITEM_NOT_AVAILABLE if the key
does not exist.
"""
try:
return self.etag(key)
except KeyError:
return ITEM_NOT_AVAILABLE
def _get_value_and_etag(
self, key: NonEmptySafeStrTuple,
) -> tuple[ValueType, ETagValue]:
"""Return a consistent value and ETag for a key.
Uses a read-with-validation pattern (check+get+check) to detect
concurrent modifications. If the ETag changes between the pre-read
check and the post-read check, the read is retried.
Subclasses can override to fetch both in a single backend pass.
Args:
key: Normalized dictionary key.
Returns:
A matching (value, ETag) pair.
Raises:
KeyError: If the key does not exist.
ConcurrencyConflictError: If retries are exhausted due to
concurrent modification.
"""
for _ in range(_GET_VALUE_AND_ETAG_MAX_RETRIES):
etag_before = self.etag(key)
value = self[key]
etag_after = self.etag(key)
if etag_before == etag_after:
return value, etag_after
raise ConcurrencyConflictError(key, _GET_VALUE_AND_ETAG_MAX_RETRIES)
# --- ConditionalOperationResult factory methods ---
@staticmethod
def _result_item_not_available(
satisfied: bool
) -> ConditionalOperationResult[ValueType]:
"""Build result when the key is absent."""
return ConditionalOperationResult(
condition_was_satisfied=satisfied,
actual_etag=ITEM_NOT_AVAILABLE,
resulting_etag=ITEM_NOT_AVAILABLE,
new_value=ITEM_NOT_AVAILABLE)
@staticmethod
def _result_unchanged(
satisfied: bool,
actual_etag: ETagIfExists,
new_value: Any
) -> ConditionalOperationResult[ValueType]:
"""Build result when no mutation occurred (resulting_etag == actual_etag)."""
return ConditionalOperationResult(
condition_was_satisfied=satisfied,
actual_etag=actual_etag,
resulting_etag=actual_etag,
new_value=new_value)
@staticmethod
def _result_write_success(
actual_etag: ETagIfExists,
resulting_etag: ETagValue,
new_value: Any
) -> ConditionalOperationResult[ValueType]:
"""Build result for a successful write."""
return ConditionalOperationResult(
condition_was_satisfied=True,
actual_etag=actual_etag,
resulting_etag=resulting_etag,
new_value=new_value)
@staticmethod
def _result_delete_success(
actual_etag: ETagIfExists
) -> ConditionalOperationResult[ValueType]:
"""Build result for a successful delete."""
return ConditionalOperationResult(
condition_was_satisfied=True,
actual_etag=actual_etag,
resulting_etag=ITEM_NOT_AVAILABLE,
new_value=ITEM_NOT_AVAILABLE)
[docs]
def get_with_etag(
self,
key: NonEmptyPersiDictKey
) -> ConditionalOperationResult[ValueType]:
"""Retrieve the value and its ETag for a key in a single operation.
Convenience wrapper around get_item_if that fetches the current
value and ETag without requiring condition parameters. On backends
that support it (e.g., S3), both are obtained in a single network
round-trip.
The result uses the same ConditionalOperationResult type as the
conditional _if methods, with condition_was_satisfied always True.
When the key is absent,
new_value and actual_etag are both ITEM_NOT_AVAILABLE.
Args:
key: Dictionary key.
Returns:
ConditionalOperationResult with the value in new_value and
the ETag in actual_etag (and resulting_etag).
"""
return self.get_item_if(
key, condition=ANY_ETAG, expected_etag=ITEM_NOT_AVAILABLE,
retrieve_value=ALWAYS_RETRIEVE)
[docs]
def get_item_if(
self,
key: NonEmptyPersiDictKey,
*,
condition: ETagConditionFlag,
expected_etag: ETagIfExists,
retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED
) -> ConditionalOperationResult[ValueType]:
"""Retrieve the value for a key only if an ETag condition is satisfied.
If the key is absent, actual_etag is ITEM_NOT_AVAILABLE and the
condition is evaluated normally. No KeyError is raised.
Warning:
This base class implementation is not atomic. Subclasses that
offer concurrency safety should override this method.
Args:
key: Dictionary key.
condition: ANY_ETAG, ETAG_IS_THE_SAME, or ETAG_HAS_CHANGED.
expected_etag: The caller's expected ETag, or ITEM_NOT_AVAILABLE
if the caller believes the key is absent.
retrieve_value: Controls value retrieval. IF_ETAG_CHANGED
(default) skips the fetch when expected_etag == actual_etag,
returning VALUE_NOT_RETRIEVED instead. ALWAYS_RETRIEVE
always fetches the value. NEVER_RETRIEVE always returns
VALUE_NOT_RETRIEVED when the key exists.
Returns:
ConditionalOperationResult with the outcome of the operation.
"""
self._validate_retrieve_value(retrieve_value)
key = NonEmptySafeStrTuple(key)
if retrieve_value is ALWAYS_RETRIEVE:
try:
value, actual_etag = self._get_value_and_etag(key)
except KeyError:
satisfied = self._check_condition(
condition, expected_etag, ITEM_NOT_AVAILABLE)
return self._result_item_not_available(satisfied)
satisfied = self._check_condition(condition, expected_etag, actual_etag)
return self._result_unchanged(satisfied, actual_etag, value)
actual_etag = self._actual_etag(key)
if actual_etag is ITEM_NOT_AVAILABLE:
satisfied = self._check_condition(condition, expected_etag, actual_etag)
return self._result_item_not_available(satisfied)
if retrieve_value is NEVER_RETRIEVE or expected_etag == actual_etag:
satisfied = self._check_condition(condition, expected_etag, actual_etag)
return self._result_unchanged(
satisfied, actual_etag, VALUE_NOT_RETRIEVED)
try:
value, actual_etag = self._get_value_and_etag(key)
except KeyError:
satisfied = self._check_condition(
condition, expected_etag, ITEM_NOT_AVAILABLE)
return self._result_item_not_available(satisfied)
satisfied = self._check_condition(condition, expected_etag, actual_etag)
return self._result_unchanged(satisfied, actual_etag, value)
[docs]
def set_item_if(
self,
key: NonEmptyPersiDictKey,
*,
value: ValueType | Joker,
condition: ETagConditionFlag,
expected_etag: ETagIfExists,
retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED
) -> ConditionalOperationResult[ValueType]:
"""Store a value only if an ETag condition is satisfied.
If the key is absent, actual_etag is ITEM_NOT_AVAILABLE and the
condition is evaluated normally. No KeyError is raised.
Warning:
This base class implementation is not atomic. Subclasses that
require concurrency safety should override this method.
Args:
key: Dictionary key.
value: Value to store.
condition: ANY_ETAG, ETAG_IS_THE_SAME, or ETAG_HAS_CHANGED.
expected_etag: The caller's expected ETag, or ITEM_NOT_AVAILABLE
if the caller believes the key is absent.
retrieve_value: Controls whether the existing value is fetched.
Applies both when the condition is not satisfied and when
KEEP_CURRENT is used with a satisfied condition.
IF_ETAG_CHANGED (default) fetches only if
expected_etag != actual_etag. ALWAYS_RETRIEVE fetches
the existing value. NEVER_RETRIEVE returns
VALUE_NOT_RETRIEVED.
Returns:
ConditionalOperationResult with the outcome of the operation.
"""
self._validate_retrieve_value(retrieve_value)
key = NonEmptySafeStrTuple(key)
actual_etag = self._actual_etag(key)
satisfied = self._check_condition(condition, expected_etag, actual_etag)
if not satisfied:
if actual_etag is ITEM_NOT_AVAILABLE:
return self._result_item_not_available(False)
if retrieve_value is NEVER_RETRIEVE or (
retrieve_value is IF_ETAG_CHANGED
and expected_etag == actual_etag):
existing_value = VALUE_NOT_RETRIEVED
else:
try:
existing_value, actual_etag = self._get_value_and_etag(key)
except KeyError:
return self._result_item_not_available(False)
return self._result_unchanged(False, actual_etag, existing_value)
if value is KEEP_CURRENT:
if actual_etag is ITEM_NOT_AVAILABLE:
return self._result_item_not_available(True)
if retrieve_value is NEVER_RETRIEVE or (
retrieve_value is IF_ETAG_CHANGED
and expected_etag == actual_etag):
existing_value = VALUE_NOT_RETRIEVED
else:
try:
existing_value, actual_etag = self._get_value_and_etag(key)
except KeyError:
return self._result_item_not_available(True)
return self._result_unchanged(
True, actual_etag, existing_value)
if value is DELETE_CURRENT:
if not self.discard(key):
return self._result_item_not_available(satisfied)
return self._result_delete_success(actual_etag)
self[key] = value
resulting_etag = self._actual_etag(key)
return self._result_write_success(
actual_etag, resulting_etag, value)
[docs]
def setdefault_if(
self,
key: NonEmptyPersiDictKey,
*,
default_value: ValueType,
condition: ETagConditionFlag,
expected_etag: ETagIfExists,
retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED
) -> ConditionalOperationResult[ValueType]:
"""Insert default_value if key is absent; conditioned on ETag check.
If the key is absent and the condition is satisfied, default_value
is inserted. If the key is present, no mutation occurs regardless
of whether the condition is satisfied.
Warning:
This base class implementation is not atomic. Subclasses that
require concurrency safety should override this method.
Args:
key: Dictionary key.
default_value: Value to insert if the key is absent and the
condition is satisfied.
condition: ANY_ETAG, ETAG_IS_THE_SAME, or ETAG_HAS_CHANGED.
expected_etag: The caller's expected ETag, or ITEM_NOT_AVAILABLE
if the caller believes the key is absent.
retrieve_value: Controls value retrieval when the key exists.
IF_ETAG_CHANGED (default) fetches only if
expected_etag != actual_etag. ALWAYS_RETRIEVE fetches
the existing value. NEVER_RETRIEVE returns
VALUE_NOT_RETRIEVED.
Returns:
ConditionalOperationResult with the outcome of the operation.
"""
if isinstance(default_value, Joker):
raise TypeError("default_value must be a regular value, not a Joker command")
self._validate_retrieve_value(retrieve_value)
key = NonEmptySafeStrTuple(key)
actual_etag = self._actual_etag(key)
satisfied = self._check_condition(condition, expected_etag, actual_etag)
if actual_etag is ITEM_NOT_AVAILABLE:
if satisfied:
self[key] = default_value
resulting_etag = self._actual_etag(key)
return self._result_write_success(
ITEM_NOT_AVAILABLE, resulting_etag, default_value)
else:
return self._result_item_not_available(False)
if retrieve_value is NEVER_RETRIEVE or (
retrieve_value is IF_ETAG_CHANGED
and expected_etag == actual_etag):
existing_value = VALUE_NOT_RETRIEVED
else:
try:
existing_value, actual_etag = self._get_value_and_etag(key)
except KeyError:
return self._result_item_not_available(False)
return self._result_unchanged(
satisfied, actual_etag, existing_value)
[docs]
def discard_if(
self,
key: NonEmptyPersiDictKey,
*,
condition: ETagConditionFlag,
expected_etag: ETagIfExists
) -> ConditionalOperationResult[ValueType]:
"""Discard a key only if an ETag condition is satisfied.
No retrieve_value parameter — new_value is ITEM_NOT_AVAILABLE
on delete success or missing key; on condition failure it is
VALUE_NOT_RETRIEVED.
Warning:
This base class implementation is not atomic. Subclasses that
require concurrency safety should override this method.
Args:
key: Dictionary key.
condition: ANY_ETAG, ETAG_IS_THE_SAME, or ETAG_HAS_CHANGED.
expected_etag: The caller's expected ETag, or ITEM_NOT_AVAILABLE
if the caller believes the key is absent.
Returns:
ConditionalOperationResult with the outcome of the operation.
"""
key = NonEmptySafeStrTuple(key)
actual_etag = self._actual_etag(key)
satisfied = self._check_condition(condition, expected_etag, actual_etag)
if actual_etag is ITEM_NOT_AVAILABLE:
return self._result_item_not_available(satisfied)
if satisfied:
self._check_delete_policy()
try:
self._remove_item(key)
except KeyError:
return self._result_item_not_available(False)
return self._result_delete_success(actual_etag)
return self._result_unchanged(
False, actual_etag, VALUE_NOT_RETRIEVED)
@abstractmethod
def __getitem__(self, key:NonEmptyPersiDictKey) -> ValueType:
"""Retrieve the value for a key.
Args:
key: Key (string or sequence of strings) or SafeStrTuple.
Returns:
The stored value.
"""
raise NotImplementedError("PersiDict is an abstract base class"
" and cannot retrieve items directly")
def _validate_value(self, value: ValueType) -> None:
"""Validate that a value is acceptable for storage.
Joker commands (KEEP_CURRENT, DELETE_CURRENT) are silently accepted.
Args:
value: Value to store, or a joker command.
Raises:
TypeError: If the value is a PersiDict instance or does not match
the required base_class_for_values when specified.
"""
if isinstance(value, PersiDict):
raise TypeError("Cannot store a PersiDict instance directly")
if value is not KEEP_CURRENT and value is not DELETE_CURRENT:
if self.base_class_for_values is not None:
if not isinstance(value, self.base_class_for_values):
raise TypeError(f"Value must be an instance of"
f" {self.base_class_for_values.__name__}")
@staticmethod
def _validate_retrieve_value(
retrieve_value: RetrieveValueFlag) -> None:
"""Validate that retrieve_value is a RetrieveValueFlag instance.
Raises:
TypeError: If retrieve_value is not a RetrieveValueFlag.
"""
if not isinstance(retrieve_value, RetrieveValueFlag):
raise TypeError(
f"retrieve_value must be ALWAYS_RETRIEVE, NEVER_RETRIEVE,"
f" or IF_ETAG_CHANGED, got {type(retrieve_value).__name__}")
def _validate_returned_value(self, value: ValueType) -> None:
"""Validate that a value retrieved from storage matches base_class_for_values.
Args:
value: The deserialized value retrieved from storage.
Raises:
TypeError: If base_class_for_values is set and the value
does not match it.
"""
if self.base_class_for_values is not None:
if not isinstance(value, self.base_class_for_values):
raise TypeError(
f"Value must be an instance of"
f" {self.base_class_for_values.__name__},"
f" but it is {type(value).__name__} instead.")
def _serialize_to_file(
self
, value: Any
, f: Any
, *
, pkl_compress: str | None = None
) -> None:
"""Serialize a value into an open file-like object.
Dispatches on self.serialization_format to write the value using the
appropriate library. The caller is responsible for opening the file in
the correct mode ('wb' for pkl, 'w' for json/text) with UTF-8 encoding
for text modes, and for any post-write actions (flush, fsync, close).
Args:
value: The Python object to serialize.
f: A writable file-like object (binary for pkl, text for others).
pkl_compress: Compression algorithm passed to joblib.dump when
serialization_format is 'pkl' (e.g. 'lz4', 'zlib').
None means joblib's default. Ignored for non-pkl formats.
"""
if self.serialization_format == 'json':
f.write(jsonpickle.dumps(value, indent=4))
elif self.serialization_format == 'pkl':
if pkl_compress is not None:
joblib.dump(value, f, compress=pkl_compress)
else:
joblib.dump(value, f)
else:
f.write(str(value))
def _deserialize_from_file(self, f: Any) -> Any:
"""Deserialize a value from an open file-like object.
Dispatches on self.serialization_format to read the value using the
appropriate library. The caller is responsible for opening the file in
the correct mode ('rb' for pkl, 'r' for json/text) with UTF-8 encoding
for text modes, and for closing it afterward.
Args:
f: A readable file-like object (binary for pkl, text for others).
Returns:
The deserialized Python object.
"""
if self.serialization_format == 'json':
return jsonpickle.loads(f.read())
elif self.serialization_format == 'pkl':
return joblib.load(f)
else:
return f.read()
def _validate_setitem_args(self, key: NonEmptyPersiDictKey, value: ValueType | Joker
) -> NonEmptySafeStrTuple:
"""Validate setitem arguments without applying joker side effects.
Args:
key: Dictionary key (string or sequence of strings
or NonEmptySafeStrTuple).
value: Value to store, or a joker command (KEEP_CURRENT or
DELETE_CURRENT).
Raises:
MutationPolicyError: If value is DELETE_CURRENT and
append_only is True.
TypeError: If the value is a PersiDict instance or does not match
the required base_class_for_values when specified.
Returns:
Normalized key.
"""
if self.append_only and value is not KEEP_CURRENT:
if value is DELETE_CURRENT:
raise MutationPolicyError("append-only")
self._validate_value(value)
key = NonEmptySafeStrTuple(key)
return key
def _process_setitem_args(self, key: NonEmptyPersiDictKey, value: ValueType | Joker
) -> StatusFlag:
"""Perform the first steps for setting an item.
Handles joker commands (KEEP_CURRENT, DELETE_CURRENT) and validates
value types. Does NOT enforce the append-only existence check —
callers (concrete ``__setitem__`` implementations) are responsible
for that, using either an inline ``key in self`` guard or an atomic
``setdefault_if`` call.
Args:
key: Dictionary key (string or sequence of strings
or NonEmptySafeStrTuple).
value: Value to store, or a joker command (KEEP_CURRENT or
DELETE_CURRENT).
Raises:
MutationPolicyError: If value is DELETE_CURRENT and append_only is True.
TypeError: If the value is a PersiDict instance or does not match
the required base_class_for_values when specified.
Returns:
CONTINUE_NORMAL_EXECUTION if the caller should
proceed with storing the value; EXECUTION_IS_COMPLETE if a
joker command was processed and no further action is needed.
"""
if value is KEEP_CURRENT:
return EXECUTION_IS_COMPLETE
key = self._validate_setitem_args(key, value)
if value is DELETE_CURRENT:
self.discard(key)
return EXECUTION_IS_COMPLETE
return CONTINUE_NORMAL_EXECUTION
@abstractmethod
def __setitem__(self, key:NonEmptyPersiDictKey, value: ValueType | Joker) -> None:
"""Set the value for a key.
Special values KEEP_CURRENT and DELETE_CURRENT are interpreted as
commands to keep or delete the current value respectively.
Args:
key: Key (string or sequence of strings) or SafeStrTuple.
value: Value to store, or a Joker command.
Raises:
MutationPolicyError: If attempting to modify an existing key
when append_only is True.
NotImplementedError: Subclasses must implement actual writing.
"""
if self._process_setitem_args(key, value) is EXECUTION_IS_COMPLETE:
return
raise NotImplementedError("PersiDict is an abstract base class"
" and cannot store items directly")
def _check_delete_policy(self) -> None:
"""Raise MutationPolicyError if deletion is not allowed on this dict.
Raises:
MutationPolicyError: If append_only is True.
"""
if self.append_only:
raise MutationPolicyError("append-only")
def _remove_item(self, key: NonEmptySafeStrTuple) -> None:
"""Remove a key from the backend storage.
Performs the raw deletion without policy or existence pre-checks.
Subclasses should override this with a direct backend deletion
so that ``discard`` and ``discard_if`` can bypass the
existence check in ``_process_delitem_args``. The default
falls back to ``del self[key]``.
Args:
key: Already-validated NonEmptySafeStrTuple.
Raises:
KeyError: If the key does not exist in the backend.
"""
del self[key]
def _process_delitem_args(self, key: NonEmptyPersiDictKey) -> None:
"""Perform the first steps for deleting an item.
Args:
key: Dictionary key (string or sequence of strings
or NonEmptySafeStrTuple).
Raises:
MutationPolicyError: If attempting to delete an item when
append_only is True.
KeyError: If the key does not exist.
"""
self._check_delete_policy()
key = NonEmptySafeStrTuple(key)
if key not in self:
raise KeyError(key)
@abstractmethod
def __delitem__(self, key:NonEmptyPersiDictKey):
"""Delete a key and its value.
Args:
key: Key (string or sequence of strings) or SafeStrTuple.
Raises:
MutationPolicyError: If append_only is True.
KeyError: If the key does not exist.
NotImplementedError: Subclasses must implement deletion.
"""
self._process_delitem_args(key)
raise NotImplementedError("PersiDict is an abstract base class"
" and cannot delete items directly")
@abstractmethod
def __len__(self) -> int:
"""Return the number of stored items.
Returns:
Number of key-value pairs.
Raises:
NotImplementedError: Subclasses must implement counting.
"""
raise NotImplementedError("PersiDict is an abstract base class"
" and cannot count items directly")
def _process_generic_iter_args(self, result_type: set[str]) -> None:
"""Validate arguments for iterator helpers.
Args:
result_type: A set indicating desired fields among {'keys',
'values', 'timestamps'}.
Raises:
TypeError: If result_type is not a set.
ValueError: If result_type contains invalid entries or an invalid number of items.
"""
if not isinstance(result_type, set):
raise TypeError("result_type must be a set of strings")
if not (1 <= len(result_type) <= 3):
raise ValueError("result_type must contain between 1 and 3 elements")
allowed = {"keys", "values", "timestamps"}
if (result_type | allowed) != allowed:
raise ValueError("result_type can only contain 'keys', 'values', 'timestamps'")
if not (1 <= len(result_type & allowed) <= 3):
raise ValueError("result_type must include at least one of 'keys', 'values', 'timestamps'")
def _assemble_iter_result(
self
, result_type: set[str]
, *
, key: Any = None
, value: Any = None
, timestamp: Any = None
) -> Any:
"""Build a single yield value for _generic_iter from named fields.
Assembles an iteration result by selecting the requested fields
(key, value, timestamp) in canonical order and returning either a
single value or a tuple, depending on how many fields were requested.
Args:
result_type: The same set passed to _generic_iter —
a non-empty subset of {"keys", "values", "timestamps"}.
key: The key to include when "keys" is in result_type.
value: The value to include when "values" is in result_type.
timestamp: The timestamp to include when "timestamps" is in
result_type.
Returns:
A single item if only one field is requested, otherwise a tuple
of the requested fields in (key, value, timestamp) order.
"""
to_return: list[Any] = []
if "keys" in result_type:
to_return.append(key)
if "values" in result_type:
to_return.append(value)
if "timestamps" in result_type:
to_return.append(timestamp)
if len(result_type) == 1:
return to_return[0]
return tuple(to_return)
@abstractmethod
def _generic_iter(self, result_type: set[str]) -> Any:
"""Underlying implementation for iterator helpers.
Args:
result_type: A set indicating desired fields among {'keys',
'values', 'timestamps'}.
Returns:
An iterator yielding keys, values, and/or timestamps based on
result_type.
Raises:
TypeError: If result_type is not a set.
ValueError: If result_type contains invalid entries or an invalid number of items.
NotImplementedError: Subclasses must implement the concrete iterator.
"""
self._process_generic_iter_args(result_type)
raise NotImplementedError("PersiDict is an abstract base class"
" and cannot iterate items directly")
def __iter__(self) -> Iterator[NonEmptySafeStrTuple]:
"""Iterate over keys.
Returns:
Iterator of keys.
"""
return self._generic_iter({"keys"})
[docs]
def keys(self) -> Iterator[NonEmptySafeStrTuple]:
"""Return an iterator over keys.
Returns:
Keys iterator.
"""
return self._generic_iter({"keys"})
[docs]
def keys_and_timestamps(self) -> Iterator[tuple[NonEmptySafeStrTuple, float]]:
"""Return an iterator over (key, timestamp) pairs.
Returns:
Keys and POSIX timestamps.
"""
return self._generic_iter({"keys", "timestamps"})
[docs]
def values(self) -> Iterator[ValueType]:
"""Return an iterator over values.
Returns:
Values iterator.
"""
return self._generic_iter({"values"})
[docs]
def values_and_timestamps(self) -> Iterator[tuple[ValueType, float]]:
"""Return an iterator over (value, timestamp) pairs.
Returns:
Values and POSIX timestamps.
"""
return self._generic_iter({"values", "timestamps"})
[docs]
def items(self) -> Iterator[tuple[NonEmptySafeStrTuple, ValueType]]:
"""Return an iterator over (key, value) pairs.
Returns:
Items iterator.
"""
return self._generic_iter({"keys", "values"})
[docs]
def items_and_timestamps(self) -> Iterator[tuple[NonEmptySafeStrTuple, ValueType, float]]:
"""Return an iterator over (key, value, timestamp) triples.
Returns:
Items and timestamps.
"""
return self._generic_iter({"keys", "values", "timestamps"})
[docs]
def setdefault(self, key: NonEmptyPersiDictKey, default: ValueType | None = None) -> ValueType:
"""Insert key with default value if absent; return the current value.
Behaves like the built-in dict.setdefault(): if the key exists,
return its current value; otherwise, set the key to the default
value and return that default.
Warning:
This base class implementation is not atomic. Subclasses that
require concurrency safety should override this method.
Args:
key: Dictionary key.
default: Value to insert if the key is not present. Defaults to None.
Returns:
Existing value if key is present; otherwise the provided default value.
Raises:
TypeError: If default is a Joker command (KEEP_CURRENT/DELETE_CURRENT),
or if the key is missing and default violates value type constraints.
"""
if isinstance(default, Joker):
raise TypeError("default must be a regular value, not a Joker command")
key = NonEmptySafeStrTuple(key)
try:
return self[key]
except KeyError:
self[key] = default
return default
def __eq__(self, other: Any) -> bool:
"""Compare dictionaries for equality.
When other has the exact same type and matching constructor parameters,
returns True immediately (fast path). Otherwise falls through to
element-wise comparison of all keys and values.
Args:
other: Another Mapping to compare against.
Returns:
True if the dictionaries have the same key/value pairs, False
otherwise. Returns NotImplemented if other is not a Mapping.
"""
if self is other:
return True
if not isinstance(other, Mapping):
return NotImplemented
try:
if type(self) is type(other) :
if self.get_params() == other.get_params():
return True
for self_key, other_key_value in zip_longest(
self.keys(),other.items(), fillvalue=None):
if self_key is None or other_key_value is None:
return False
(other_key, other_value) = other_key_value
if self[other_key] != other_value:
return False
except (KeyError,TypeError, AttributeError, ValueError):
return False
return True
def __ne__(self, other: Any) -> bool:
"""Return True if the dictionaries are not equal."""
if self is other:
return False
eq_result = self.__eq__(other)
if eq_result is NotImplemented:
return NotImplemented
return not eq_result
def __ior__(self, other: Mapping) -> Self:
"""Update this dict with items from other (self |= other)."""
if not isinstance(other, Mapping):
raise TypeError(f"Cannot update PersiDict with non-Mapping type: {type(other)}")
self.update(other)
return self
def __getstate__(self):
"""Prevent pickling of PersiDict instances.
Raises:
TypeError: Always raised; PersiDict instances are not pickleable.
"""
raise TypeError(
f"{self.__class__.__name__} instances cannot be pickled. "
"To persist configuration, use get_params().")
def __setstate__(self, state):
"""Prevent unpickling of PersiDict instances.
Raises:
TypeError: Always raised; PersiDict instances are not pickleable.
"""
raise TypeError(
f"{self.__class__.__name__} instances cannot be unpickled. "
"Recreate from parameters instead.")
[docs]
def clear(self) -> None:
"""Remove all items from the dictionary.
Raises:
MutationPolicyError: If the dictionary is append-only.
"""
self._check_delete_policy()
for k in self.keys():
self.discard(k)
[docs]
def pop(self, key: NonEmptyPersiDictKey, *args: Any) -> Any:
"""Remove a key and return its value.
Uses ``transform_item`` internally so the read-then-delete
sequence is protected by ETag checks and automatic retries,
avoiding the TOCTOU race that the inherited MutableMapping.pop
would have.
Args:
key: Key (string or sequence of strings) or SafeStrTuple.
*args: Optional default value (at most one).
Returns:
The value that was stored, or the default if the key was
absent and a default was provided.
Raises:
MutationPolicyError: If the dictionary is append-only.
TypeError: If more than one default argument is given.
KeyError: If the key does not exist and no default was given.
"""
if len(args) > 1:
raise TypeError(
f"pop expected at most 2 arguments, got {1 + len(args)}")
self._check_delete_policy()
captured: list = []
def _grab_and_delete(current_value: Any) -> Any:
captured.clear()
captured.append(current_value)
if current_value is ITEM_NOT_AVAILABLE:
return KEEP_CURRENT
return DELETE_CURRENT
self.transform_item(key, transformer=_grab_and_delete)
if captured and captured[0] is not ITEM_NOT_AVAILABLE:
return captured[0]
if args:
return args[0]
raise KeyError(NonEmptySafeStrTuple(key))
[docs]
def popitem(self) -> tuple[NonEmptySafeStrTuple, ValueType]:
"""Remove and return an arbitrary (key, value) pair.
Uses ``pop`` (which delegates to ``transform_item``) so the
read-then-delete is protected by ETag checks and automatic
retries. If the chosen key is deleted by another process
before ``pop`` completes, the next key is tried until one
succeeds or the dictionary is empty.
Returns:
A (key, value) tuple.
Raises:
MutationPolicyError: If the dictionary is append-only.
KeyError: If the dictionary is empty.
"""
self._check_delete_policy()
for key in self.keys():
try:
value = self.pop(key)
return (key, value)
except KeyError:
# Another process deleted this key; try the next one.
continue
raise KeyError("dictionary is empty")
[docs]
def discard(self, key: NonEmptyPersiDictKey) -> bool:
"""Delete an item without raising an exception if it doesn't exist.
This method is absent in the original dict API.
Args:
key: Key (string or sequence of strings) or SafeStrTuple.
Returns:
True if the item existed and was deleted; False otherwise.
Raises:
MutationPolicyError: If the dictionary is append-only.
"""
self._check_delete_policy()
key = NonEmptySafeStrTuple(key)
try:
self._remove_item(key)
return True
except KeyError:
return False
[docs]
def delete_if_exists(self, key: NonEmptyPersiDictKey) -> bool:
"""Backward-compatible wrapper for discard().
This method is kept for backward compatibility; new code should use
discard(). Behavior is identical to discard().
"""
return self.discard(key)
[docs]
def get_subdict(self, prefix_key:PersiDictKey) -> Self:
"""Get a sub-dictionary containing items with the given prefix key.
Items whose keys start with the provided prefix are visible through the
returned sub-dictionary. If the prefix does not exist, an empty
sub-dictionary is returned. If the prefix is empty, the entire
dictionary is returned.
This method is absent in the original Python dict API.
Args:
prefix_key: Key prefix (string, sequence of strings, or SafeStrTuple)
identifying the sub-dict to expose.
Returns:
A dictionary-like view restricted to keys under the
provided prefix.
Raises:
NotImplementedError: Must be implemented by subclasses that support
hierarchical key spaces.
"""
raise NotImplementedError(
f"{type(self).__name__} does not support get_subdict()")
[docs]
def subdicts(self) -> dict[str, Self]:
"""Return a mapping of first-level keys to sub-dictionaries.
This method is absent in the original dict API.
Returns:
dict[str, Self]: A mapping from a top-level key segment to a
sub-dictionary restricted to the corresponding keyspace.
"""
all_keys = {k[0] for k in self.keys()}
result_subdicts = {k: self.get_subdict(k) for k in all_keys}
return result_subdicts
[docs]
def random_key(self) -> NonEmptySafeStrTuple | None:
"""Return a random key from the dictionary.
This method is absent in the original Python dict API.
Implementation uses reservoir sampling to select a uniformly random key
in streaming time, without loading all keys into memory or using len().
Returns:
NonEmptySafeStrTuple | None: A random key if the dictionary is
not empty; None if the dictionary is empty.
"""
iterator = iter(self.keys())
try:
# Get the first key
result = next(iterator)
except StopIteration:
# Dictionary is empty
return None
# Reservoir sampling algorithm
i = 2
for key in iterator:
# Select the current key with probability 1/i
if random.random() < 1/i:
result = key
i += 1
return result
[docs]
@abstractmethod
def timestamp(self, key:NonEmptyPersiDictKey) -> float:
"""Return the last modification time of a key.
This method is absent in the original dict API.
Args:
key: Key (string or sequence of strings) or SafeStrTuple.
Returns:
POSIX timestamp (seconds since Unix epoch) of the last
modification of the item.
Raises:
NotImplementedError: Must be implemented by subclasses.
"""
if type(self) is PersiDict:
raise NotImplementedError("PersiDict is an abstract base class"
" and cannot provide timestamps directly")
[docs]
def etag(self, key: NonEmptyPersiDictKey) -> ETagValue:
"""Return the ETag of a key.
By default, this returns a stringified timestamp of the last
modification time. Subclasses may override to provide true
backend-specific ETags (e.g., S3).
This method is absent in the original Python dict API.
Args:
key: Key (string or sequence of strings) or SafeStrTuple.
Returns:
The ETag for the key.
Raises:
KeyError: If the key does not exist.
"""
return ETagValue(f"{self.timestamp(key):.6f}")
def _sorted_keys(self, *, max_n: int | None, newest_first: bool
) -> list[NonEmptySafeStrTuple]:
"""Return keys sorted by timestamp.
Args:
max_n: Maximum number of keys to return. If None, return all.
Values <= 0 yield an empty list.
newest_first: If True, sort newest first; otherwise oldest first.
Returns:
Keys sorted by timestamp in the requested order.
"""
if max_n is not None and max_n <= 0:
return []
heap_fn = heapq.nlargest if newest_first else heapq.nsmallest
if max_n is None:
pairs = list(self.keys_and_timestamps())
pairs.sort(key=lambda x: x[1], reverse=newest_first)
return [key for key, _ in pairs]
return [key for key, _ in heap_fn(
max_n, self.keys_and_timestamps(), key=lambda x: x[1])]
[docs]
def oldest_keys(self, *, max_n: int | None = None
) -> list[NonEmptySafeStrTuple]:
"""Return up to max_n oldest keys in the dictionary.
This method is absent in the original Python dict API.
Args:
max_n: Maximum number of keys to return. If None,
return all keys sorted by age (oldest first). Values <= 0
yield an empty list. Defaults to None.
Returns:
The oldest keys, oldest first.
"""
return self._sorted_keys(max_n=max_n, newest_first=False)
[docs]
def newest_keys(self, *, max_n: int | None = None
) -> list[NonEmptySafeStrTuple]:
"""Return up to max_n newest keys in the dictionary.
This method is absent in the original Python dict API.
Args:
max_n: Maximum number of keys to return. If None,
return all keys sorted by age (newest first). Values <= 0
yield an empty list. Defaults to None.
Returns:
The newest keys, newest first.
"""
return self._sorted_keys(max_n=max_n, newest_first=True)
[docs]
def oldest_values(self, *, max_n: int | None = None) -> list[ValueType]:
"""Return up to max_n oldest values in the dictionary.
This method is absent in the original Python dict API.
Args:
max_n: Maximum number of values to return. If None,
return values for all keys sorted by age (oldest first). Values
<= 0 yield an empty list.
Returns:
Values corresponding to the oldest keys.
"""
return [self[k] for k in self.oldest_keys(max_n=max_n)]
[docs]
def newest_values(self, *, max_n: int | None = None) -> list[ValueType]:
"""Return up to max_n newest values in the dictionary.
This method is absent in the original Python dict API.
Args:
max_n: Maximum number of values to return. If None,
return values for all keys sorted by age (newest first). Values
<= 0 yield an empty list.
Returns:
Values corresponding to the newest keys.
"""
return [self[k] for k in self.newest_keys(max_n=max_n)]