Source code for persidict.persi_dict

"""Persistent, dict-like API for durable key-value stores.

PersiDict defines a unified interface for persistent dictionaries. The API is
similar to Python's built-in dict with some differences (e.g., insertion order
is not guaranteed) and several additional convenience methods.

Keys are non-empty sequences of URL/filename-safe strings
represented by SafeStrTuple. Plain strings or sequences of strings are accepted
and automatically coerced to SafeStrTuple. Values can be
arbitrary Python objects unless an implementation restricts them
via `base_class_for_values`.

Persistence means items are stored durably (e.g., in local files or cloud
objects) and remain accessible across process lifetimes.
"""

from __future__ import annotations

from abc import abstractmethod
from collections.abc import MutableMapping
import heapq
from itertools import zip_longest
import joblib
import jsonpickle
import random
import time
from typing import Any, Sequence, Iterator, Mapping, Self

from mixinforge import ParameterizableMixin, sort_dict_by_keys

from . import NonEmptySafeStrTuple
from .jokers_and_status_flags import (KEEP_CURRENT, DELETE_CURRENT, Joker,
                                      CONTINUE_NORMAL_EXECUTION, StatusFlag, EXECUTION_IS_COMPLETE,
                                      ETagValue, ETagConditionFlag,
                                      ANY_ETAG, ETAG_IS_THE_SAME, ETAG_HAS_CHANGED,
                                      RetrieveValueFlag, ALWAYS_RETRIEVE,
                                      NEVER_RETRIEVE, IF_ETAG_CHANGED,
                                      ITEM_NOT_AVAILABLE, VALUE_NOT_RETRIEVED,
                                      ETagIfExists, TransformingFunction,
                                      ValueType,
                                      OperationResult, ConditionalOperationResult)
from .exceptions import MutationPolicyError, ConcurrencyConflictError
from .safe_chars import contains_unsafe_chars
from .safe_str_tuple import SafeStrTuple

PersiDictKey = SafeStrTuple | Sequence[str] | str
"""Key type that includes empty tuples, used for prefix-based operations.

Accepts a SafeStrTuple (possibly empty), a string, or a sequence of strings.
Used in operations like get_subdict where an empty prefix is meaningful.
"""

NonEmptyPersiDictKey = NonEmptySafeStrTuple | Sequence[str] | str
"""Key type for item-level operations that require a non-empty key.

Accepts a NonEmptySafeStrTuple, a string, or a non-empty sequence of strings.
Used in __getitem__, __setitem__, __delitem__, __contains__, and other
operations that address a specific item. Characters within strings must be
URL/filename-safe. Strings and sequences are automatically converted into
SafeStrTuple.
"""

_GET_VALUE_AND_ETAG_MAX_RETRIES: int = 3


[docs] class PersiDict(MutableMapping[NonEmptySafeStrTuple, ValueType], ParameterizableMixin): """Abstract dict-like interface for durable key-value stores. Keys are URL/filename-safe sequences of strings (SafeStrTuple). Concrete subclasses implement storage backends (e.g., filesystem, S3). The API is similar to Python's dict but does not guarantee insertion order and adds persistence-specific helpers (e.g., timestamp()). Attributes (can't be changed after initialization): append_only: If True, items are immutable and non-removable: existing values cannot be modified or deleted. base_class_for_values: Optional base class that all values must inherit from. If None, any type is accepted. serialization_format: File extension/format for stored values (e.g., "pkl", "json"). """ append_only:bool base_class_for_values: type | None serialization_format:str def __init__(self, *, append_only: bool = False, base_class_for_values: type|None = None, serialization_format: str = "pkl"): """Initialize base parameters shared by all persistent dictionaries. Args: append_only: If True, items cannot be modified or deleted. Defaults to False. base_class_for_values: Optional base class that values must inherit from. If None, values are not type-restricted. Defaults to None. serialization_format: File extension/format for stored values. Defaults to "pkl". Raises: ValueError: If serialization_format is an empty string, or contains unsafe characters, or not 'json' or 'pkl' for non-string values. TypeError: If base_class_for_values is not a type or None. """ self._append_only = bool(append_only) if len(serialization_format) == 0: raise ValueError("serialization_format must be a non-empty string") if contains_unsafe_chars(serialization_format): raise ValueError("serialization_format must contain only URL/filename-safe characters") self.serialization_format = str(serialization_format) if not isinstance(base_class_for_values, (type, type(None))): raise TypeError("base_class_for_values must be a type or None") if (base_class_for_values is None or not issubclass(base_class_for_values, str)): if serialization_format not in {"json", "pkl"}: raise ValueError("For non-string values serialization_format must be either 'pkl' or 'json'.") self.base_class_for_values = base_class_for_values ParameterizableMixin.__init__(self)
[docs] def get_params(self) -> dict[str, Any]: """Return configuration parameters of this dictionary. Returns: A sorted dictionary of parameters used to reconstruct the instance. This supports the Parameterizable API and is absent in the built-in dict. """ params = dict( append_only=self.append_only, base_class_for_values=self.base_class_for_values, serialization_format=self.serialization_format ) sorted_params = sort_dict_by_keys(params) return sorted_params
def __copy__(self) -> Self: """Return a shallow copy of the dictionary. This creates a new PersiDict instance with the same parameters, pointing to the same underlying storage. This is analogous to `dict.copy()`. Returns: A new instance of the same concrete type that is a shallow copy of this one. """ if type(self) is PersiDict: raise NotImplementedError("PersiDict is an abstract base class" " and cannot be copied directly") params = self.get_params() return self.__class__(**params) @property def append_only(self) -> bool: """Whether the store is append-only. Returns: True if the store is append-only (contains immutable items that cannot be modified or deleted), False otherwise. """ return self._append_only def __repr__(self) -> str: """Return a reproducible string representation. Returns: Representation including class name and constructor parameters. """ params = self.get_params() params_str = ', '.join(f'{k}={v!r}' for k, v in params.items()) return f'{self.__class__.__name__}({params_str})' def __str__(self) -> str: """Return a user-friendly string with all items. Returns: Stringified dict of items. """ return str(dict(self.items())) def __bool__(self) -> bool: """Return True if the dictionary is non-empty, False otherwise. This provides an efficient truth-value check that avoids calling __len__(), which can be expensive for large persistent stores (e.g., full directory traversal for FileDirDict, S3 pagination for BasicS3Dict). Instead, it attempts to retrieve just the first key using the streaming iterator. Returns: True if at least one key exists; False if empty. """ try: next(iter(self)) return True except StopIteration: return False @abstractmethod def __contains__(self, key:NonEmptyPersiDictKey) -> bool: """Check whether a key exists in the store. Args: key: Key (string or sequence of strings) or SafeStrTuple. Returns: True if key exists, False otherwise. """ raise NotImplementedError("PersiDict is an abstract base class" " and cannot check items directly") def _check_condition( self, condition: ETagConditionFlag, expected_etag: ETagIfExists, actual_etag: ETagIfExists ) -> bool: """Evaluate an ETag condition. Args: condition: The condition to check (ANY_ETAG, ETAG_IS_THE_SAME, or ETAG_HAS_CHANGED). expected_etag: The caller's expected ETag value, or ITEM_NOT_AVAILABLE if the caller believes the key is absent. actual_etag: The actual ETag value, or ITEM_NOT_AVAILABLE if the key is absent. Returns: True if the condition is satisfied. Raises: ValueError: If condition is not a recognized ETagConditionFlag. """ if condition is ANY_ETAG: return True if condition is ETAG_IS_THE_SAME: return expected_etag == actual_etag if condition is ETAG_HAS_CHANGED: return expected_etag != actual_etag raise ValueError( f"condition must be ANY_ETAG, ETAG_IS_THE_SAME, or ETAG_HAS_CHANGED, got {condition!r}") def _actual_etag(self, key: NonEmptySafeStrTuple) -> ETagIfExists: """Return the actual ETag for a key, or ITEM_NOT_AVAILABLE if absent. Args: key: Normalized dictionary key. Returns: The ETag value, or ITEM_NOT_AVAILABLE if the key does not exist. """ try: return self.etag(key) except KeyError: return ITEM_NOT_AVAILABLE def _get_value_and_etag( self, key: NonEmptySafeStrTuple, ) -> tuple[ValueType, ETagValue]: """Return a consistent value and ETag for a key. Uses a read-with-validation pattern (check+get+check) to detect concurrent modifications. If the ETag changes between the pre-read check and the post-read check, the read is retried. Subclasses can override to fetch both in a single backend pass. Args: key: Normalized dictionary key. Returns: A matching (value, ETag) pair. Raises: KeyError: If the key does not exist. ConcurrencyConflictError: If retries are exhausted due to concurrent modification. """ for _ in range(_GET_VALUE_AND_ETAG_MAX_RETRIES): etag_before = self.etag(key) value = self[key] etag_after = self.etag(key) if etag_before == etag_after: return value, etag_after raise ConcurrencyConflictError(key, _GET_VALUE_AND_ETAG_MAX_RETRIES) # --- ConditionalOperationResult factory methods --- @staticmethod def _result_item_not_available( satisfied: bool ) -> ConditionalOperationResult[ValueType]: """Build result when the key is absent.""" return ConditionalOperationResult( condition_was_satisfied=satisfied, actual_etag=ITEM_NOT_AVAILABLE, resulting_etag=ITEM_NOT_AVAILABLE, new_value=ITEM_NOT_AVAILABLE) @staticmethod def _result_unchanged( satisfied: bool, actual_etag: ETagIfExists, new_value: Any ) -> ConditionalOperationResult[ValueType]: """Build result when no mutation occurred (resulting_etag == actual_etag).""" return ConditionalOperationResult( condition_was_satisfied=satisfied, actual_etag=actual_etag, resulting_etag=actual_etag, new_value=new_value) @staticmethod def _result_write_success( actual_etag: ETagIfExists, resulting_etag: ETagValue, new_value: Any ) -> ConditionalOperationResult[ValueType]: """Build result for a successful write.""" return ConditionalOperationResult( condition_was_satisfied=True, actual_etag=actual_etag, resulting_etag=resulting_etag, new_value=new_value) @staticmethod def _result_delete_success( actual_etag: ETagIfExists ) -> ConditionalOperationResult[ValueType]: """Build result for a successful delete.""" return ConditionalOperationResult( condition_was_satisfied=True, actual_etag=actual_etag, resulting_etag=ITEM_NOT_AVAILABLE, new_value=ITEM_NOT_AVAILABLE)
[docs] def get_with_etag( self, key: NonEmptyPersiDictKey ) -> ConditionalOperationResult[ValueType]: """Retrieve the value and its ETag for a key in a single operation. Convenience wrapper around get_item_if that fetches the current value and ETag without requiring condition parameters. On backends that support it (e.g., S3), both are obtained in a single network round-trip. The result uses the same ConditionalOperationResult type as the conditional _if methods, with condition_was_satisfied always True. When the key is absent, new_value and actual_etag are both ITEM_NOT_AVAILABLE. Args: key: Dictionary key. Returns: ConditionalOperationResult with the value in new_value and the ETag in actual_etag (and resulting_etag). """ return self.get_item_if( key, condition=ANY_ETAG, expected_etag=ITEM_NOT_AVAILABLE, retrieve_value=ALWAYS_RETRIEVE)
[docs] def get_item_if( self, key: NonEmptyPersiDictKey, *, condition: ETagConditionFlag, expected_etag: ETagIfExists, retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED ) -> ConditionalOperationResult[ValueType]: """Retrieve the value for a key only if an ETag condition is satisfied. If the key is absent, actual_etag is ITEM_NOT_AVAILABLE and the condition is evaluated normally. No KeyError is raised. Warning: This base class implementation is not atomic. Subclasses that offer concurrency safety should override this method. Args: key: Dictionary key. condition: ANY_ETAG, ETAG_IS_THE_SAME, or ETAG_HAS_CHANGED. expected_etag: The caller's expected ETag, or ITEM_NOT_AVAILABLE if the caller believes the key is absent. retrieve_value: Controls value retrieval. IF_ETAG_CHANGED (default) skips the fetch when expected_etag == actual_etag, returning VALUE_NOT_RETRIEVED instead. ALWAYS_RETRIEVE always fetches the value. NEVER_RETRIEVE always returns VALUE_NOT_RETRIEVED when the key exists. Returns: ConditionalOperationResult with the outcome of the operation. """ self._validate_retrieve_value(retrieve_value) key = NonEmptySafeStrTuple(key) if retrieve_value is ALWAYS_RETRIEVE: try: value, actual_etag = self._get_value_and_etag(key) except KeyError: satisfied = self._check_condition( condition, expected_etag, ITEM_NOT_AVAILABLE) return self._result_item_not_available(satisfied) satisfied = self._check_condition(condition, expected_etag, actual_etag) return self._result_unchanged(satisfied, actual_etag, value) actual_etag = self._actual_etag(key) if actual_etag is ITEM_NOT_AVAILABLE: satisfied = self._check_condition(condition, expected_etag, actual_etag) return self._result_item_not_available(satisfied) if retrieve_value is NEVER_RETRIEVE or expected_etag == actual_etag: satisfied = self._check_condition(condition, expected_etag, actual_etag) return self._result_unchanged( satisfied, actual_etag, VALUE_NOT_RETRIEVED) try: value, actual_etag = self._get_value_and_etag(key) except KeyError: satisfied = self._check_condition( condition, expected_etag, ITEM_NOT_AVAILABLE) return self._result_item_not_available(satisfied) satisfied = self._check_condition(condition, expected_etag, actual_etag) return self._result_unchanged(satisfied, actual_etag, value)
[docs] def set_item_if( self, key: NonEmptyPersiDictKey, *, value: ValueType | Joker, condition: ETagConditionFlag, expected_etag: ETagIfExists, retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED ) -> ConditionalOperationResult[ValueType]: """Store a value only if an ETag condition is satisfied. If the key is absent, actual_etag is ITEM_NOT_AVAILABLE and the condition is evaluated normally. No KeyError is raised. Warning: This base class implementation is not atomic. Subclasses that require concurrency safety should override this method. Args: key: Dictionary key. value: Value to store. condition: ANY_ETAG, ETAG_IS_THE_SAME, or ETAG_HAS_CHANGED. expected_etag: The caller's expected ETag, or ITEM_NOT_AVAILABLE if the caller believes the key is absent. retrieve_value: Controls whether the existing value is fetched. Applies both when the condition is not satisfied and when KEEP_CURRENT is used with a satisfied condition. IF_ETAG_CHANGED (default) fetches only if expected_etag != actual_etag. ALWAYS_RETRIEVE fetches the existing value. NEVER_RETRIEVE returns VALUE_NOT_RETRIEVED. Returns: ConditionalOperationResult with the outcome of the operation. """ self._validate_retrieve_value(retrieve_value) key = NonEmptySafeStrTuple(key) actual_etag = self._actual_etag(key) satisfied = self._check_condition(condition, expected_etag, actual_etag) if not satisfied: if actual_etag is ITEM_NOT_AVAILABLE: return self._result_item_not_available(False) if retrieve_value is NEVER_RETRIEVE or ( retrieve_value is IF_ETAG_CHANGED and expected_etag == actual_etag): existing_value = VALUE_NOT_RETRIEVED else: try: existing_value, actual_etag = self._get_value_and_etag(key) except KeyError: return self._result_item_not_available(False) return self._result_unchanged(False, actual_etag, existing_value) if value is KEEP_CURRENT: if actual_etag is ITEM_NOT_AVAILABLE: return self._result_item_not_available(True) if retrieve_value is NEVER_RETRIEVE or ( retrieve_value is IF_ETAG_CHANGED and expected_etag == actual_etag): existing_value = VALUE_NOT_RETRIEVED else: try: existing_value, actual_etag = self._get_value_and_etag(key) except KeyError: return self._result_item_not_available(True) return self._result_unchanged( True, actual_etag, existing_value) if value is DELETE_CURRENT: if not self.discard(key): return self._result_item_not_available(satisfied) return self._result_delete_success(actual_etag) self[key] = value resulting_etag = self._actual_etag(key) return self._result_write_success( actual_etag, resulting_etag, value)
[docs] def setdefault_if( self, key: NonEmptyPersiDictKey, *, default_value: ValueType, condition: ETagConditionFlag, expected_etag: ETagIfExists, retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED ) -> ConditionalOperationResult[ValueType]: """Insert default_value if key is absent; conditioned on ETag check. If the key is absent and the condition is satisfied, default_value is inserted. If the key is present, no mutation occurs regardless of whether the condition is satisfied. Warning: This base class implementation is not atomic. Subclasses that require concurrency safety should override this method. Args: key: Dictionary key. default_value: Value to insert if the key is absent and the condition is satisfied. condition: ANY_ETAG, ETAG_IS_THE_SAME, or ETAG_HAS_CHANGED. expected_etag: The caller's expected ETag, or ITEM_NOT_AVAILABLE if the caller believes the key is absent. retrieve_value: Controls value retrieval when the key exists. IF_ETAG_CHANGED (default) fetches only if expected_etag != actual_etag. ALWAYS_RETRIEVE fetches the existing value. NEVER_RETRIEVE returns VALUE_NOT_RETRIEVED. Returns: ConditionalOperationResult with the outcome of the operation. """ if isinstance(default_value, Joker): raise TypeError("default_value must be a regular value, not a Joker command") self._validate_retrieve_value(retrieve_value) key = NonEmptySafeStrTuple(key) actual_etag = self._actual_etag(key) satisfied = self._check_condition(condition, expected_etag, actual_etag) if actual_etag is ITEM_NOT_AVAILABLE: if satisfied: self[key] = default_value resulting_etag = self._actual_etag(key) return self._result_write_success( ITEM_NOT_AVAILABLE, resulting_etag, default_value) else: return self._result_item_not_available(False) if retrieve_value is NEVER_RETRIEVE or ( retrieve_value is IF_ETAG_CHANGED and expected_etag == actual_etag): existing_value = VALUE_NOT_RETRIEVED else: try: existing_value, actual_etag = self._get_value_and_etag(key) except KeyError: return self._result_item_not_available(False) return self._result_unchanged( satisfied, actual_etag, existing_value)
[docs] def discard_if( self, key: NonEmptyPersiDictKey, *, condition: ETagConditionFlag, expected_etag: ETagIfExists ) -> ConditionalOperationResult[ValueType]: """Discard a key only if an ETag condition is satisfied. No retrieve_value parameter — new_value is ITEM_NOT_AVAILABLE on delete success or missing key; on condition failure it is VALUE_NOT_RETRIEVED. Warning: This base class implementation is not atomic. Subclasses that require concurrency safety should override this method. Args: key: Dictionary key. condition: ANY_ETAG, ETAG_IS_THE_SAME, or ETAG_HAS_CHANGED. expected_etag: The caller's expected ETag, or ITEM_NOT_AVAILABLE if the caller believes the key is absent. Returns: ConditionalOperationResult with the outcome of the operation. """ key = NonEmptySafeStrTuple(key) actual_etag = self._actual_etag(key) satisfied = self._check_condition(condition, expected_etag, actual_etag) if actual_etag is ITEM_NOT_AVAILABLE: return self._result_item_not_available(satisfied) if satisfied: self._check_delete_policy() try: self._remove_item(key) except KeyError: return self._result_item_not_available(False) return self._result_delete_success(actual_etag) return self._result_unchanged( False, actual_etag, VALUE_NOT_RETRIEVED)
[docs] def transform_item( self, key: NonEmptyPersiDictKey, *, transformer: TransformingFunction[ValueType], n_retries: int | None = 6 ) -> OperationResult[ValueType]: """Apply a transformation function to a key's value. Reads the current value (or ITEM_NOT_AVAILABLE if absent), calls transformer(current_value), and writes the result back using conditional operations. If the transformer returns DELETE_CURRENT, the key is deleted (or no-op if already absent). If the transformer returns KEEP_CURRENT, the value is left unchanged. Warning: This base class implementation is not atomic unless the backend's conditional operations are atomic. The transformer may be called multiple times if conflicts occur. Args: key: Dictionary key. transformer: A callable that receives the current value (or ITEM_NOT_AVAILABLE) and returns a new value, DELETE_CURRENT, or KEEP_CURRENT. n_retries: Number of retries after ETag conflicts. None retries indefinitely. Raises: ConcurrencyConflictError: If conflicts persist after n_retries. Returns: OperationResult with resulting_etag and new_value. """ key = NonEmptySafeStrTuple(key) if n_retries is not None: try: n_retries = int(n_retries) except (TypeError, ValueError) as exc: raise TypeError("n_retries must be a non-negative int or None") from exc if n_retries < 0: raise ValueError("n_retries must be a non-negative int or None") retries = 0 current_value = None actual_etag = None need_read = True while True: if need_read: read_res = self.get_item_if( key, condition=ANY_ETAG, expected_etag=ITEM_NOT_AVAILABLE, retrieve_value=ALWAYS_RETRIEVE) current_value = read_res.new_value actual_etag = read_res.actual_etag new_value = transformer(current_value) if new_value is KEEP_CURRENT: return OperationResult( resulting_etag=actual_etag, new_value=current_value) if new_value is DELETE_CURRENT: delete_res = self.discard_if( key, condition=ETAG_IS_THE_SAME, expected_etag=actual_etag) if delete_res.condition_was_satisfied: return OperationResult( resulting_etag=ITEM_NOT_AVAILABLE, new_value=ITEM_NOT_AVAILABLE) # discard_if has no retrieve_value param, # so we must re-read on the next iteration. need_read = True else: write_res = self.set_item_if( key, value=new_value, condition=ETAG_IS_THE_SAME, expected_etag=actual_etag, retrieve_value=ALWAYS_RETRIEVE) if write_res.condition_was_satisfied: return OperationResult( resulting_etag=write_res.resulting_etag, new_value=new_value) # Conflict: set_item_if already fetched the current # value+etag, so skip the read at the top of the loop. current_value = write_res.new_value actual_etag = write_res.actual_etag need_read = False if n_retries is not None and retries >= n_retries: raise ConcurrencyConflictError(key, retries + 1) time.sleep(random.uniform(0.01, 0.2) * (1.75 ** retries)) retries += 1
@abstractmethod def __getitem__(self, key:NonEmptyPersiDictKey) -> ValueType: """Retrieve the value for a key. Args: key: Key (string or sequence of strings) or SafeStrTuple. Returns: The stored value. """ raise NotImplementedError("PersiDict is an abstract base class" " and cannot retrieve items directly") def _validate_value(self, value: ValueType) -> None: """Validate that a value is acceptable for storage. Joker commands (KEEP_CURRENT, DELETE_CURRENT) are silently accepted. Args: value: Value to store, or a joker command. Raises: TypeError: If the value is a PersiDict instance or does not match the required base_class_for_values when specified. """ if isinstance(value, PersiDict): raise TypeError("Cannot store a PersiDict instance directly") if value is not KEEP_CURRENT and value is not DELETE_CURRENT: if self.base_class_for_values is not None: if not isinstance(value, self.base_class_for_values): raise TypeError(f"Value must be an instance of" f" {self.base_class_for_values.__name__}") @staticmethod def _validate_retrieve_value( retrieve_value: RetrieveValueFlag) -> None: """Validate that retrieve_value is a RetrieveValueFlag instance. Raises: TypeError: If retrieve_value is not a RetrieveValueFlag. """ if not isinstance(retrieve_value, RetrieveValueFlag): raise TypeError( f"retrieve_value must be ALWAYS_RETRIEVE, NEVER_RETRIEVE," f" or IF_ETAG_CHANGED, got {type(retrieve_value).__name__}") def _validate_returned_value(self, value: ValueType) -> None: """Validate that a value retrieved from storage matches base_class_for_values. Args: value: The deserialized value retrieved from storage. Raises: TypeError: If base_class_for_values is set and the value does not match it. """ if self.base_class_for_values is not None: if not isinstance(value, self.base_class_for_values): raise TypeError( f"Value must be an instance of" f" {self.base_class_for_values.__name__}," f" but it is {type(value).__name__} instead.") def _serialize_to_file( self , value: Any , f: Any , * , pkl_compress: str | None = None ) -> None: """Serialize a value into an open file-like object. Dispatches on self.serialization_format to write the value using the appropriate library. The caller is responsible for opening the file in the correct mode ('wb' for pkl, 'w' for json/text) with UTF-8 encoding for text modes, and for any post-write actions (flush, fsync, close). Args: value: The Python object to serialize. f: A writable file-like object (binary for pkl, text for others). pkl_compress: Compression algorithm passed to joblib.dump when serialization_format is 'pkl' (e.g. 'lz4', 'zlib'). None means joblib's default. Ignored for non-pkl formats. """ if self.serialization_format == 'json': f.write(jsonpickle.dumps(value, indent=4)) elif self.serialization_format == 'pkl': if pkl_compress is not None: joblib.dump(value, f, compress=pkl_compress) else: joblib.dump(value, f) else: f.write(str(value)) def _deserialize_from_file(self, f: Any) -> Any: """Deserialize a value from an open file-like object. Dispatches on self.serialization_format to read the value using the appropriate library. The caller is responsible for opening the file in the correct mode ('rb' for pkl, 'r' for json/text) with UTF-8 encoding for text modes, and for closing it afterward. Args: f: A readable file-like object (binary for pkl, text for others). Returns: The deserialized Python object. """ if self.serialization_format == 'json': return jsonpickle.loads(f.read()) elif self.serialization_format == 'pkl': return joblib.load(f) else: return f.read() def _validate_setitem_args(self, key: NonEmptyPersiDictKey, value: ValueType | Joker ) -> NonEmptySafeStrTuple: """Validate setitem arguments without applying joker side effects. Args: key: Dictionary key (string or sequence of strings or NonEmptySafeStrTuple). value: Value to store, or a joker command (KEEP_CURRENT or DELETE_CURRENT). Raises: MutationPolicyError: If value is DELETE_CURRENT and append_only is True. TypeError: If the value is a PersiDict instance or does not match the required base_class_for_values when specified. Returns: Normalized key. """ if self.append_only and value is not KEEP_CURRENT: if value is DELETE_CURRENT: raise MutationPolicyError("append-only") self._validate_value(value) key = NonEmptySafeStrTuple(key) return key def _process_setitem_args(self, key: NonEmptyPersiDictKey, value: ValueType | Joker ) -> StatusFlag: """Perform the first steps for setting an item. Handles joker commands (KEEP_CURRENT, DELETE_CURRENT) and validates value types. Does NOT enforce the append-only existence check — callers (concrete ``__setitem__`` implementations) are responsible for that, using either an inline ``key in self`` guard or an atomic ``setdefault_if`` call. Args: key: Dictionary key (string or sequence of strings or NonEmptySafeStrTuple). value: Value to store, or a joker command (KEEP_CURRENT or DELETE_CURRENT). Raises: MutationPolicyError: If value is DELETE_CURRENT and append_only is True. TypeError: If the value is a PersiDict instance or does not match the required base_class_for_values when specified. Returns: CONTINUE_NORMAL_EXECUTION if the caller should proceed with storing the value; EXECUTION_IS_COMPLETE if a joker command was processed and no further action is needed. """ if value is KEEP_CURRENT: return EXECUTION_IS_COMPLETE key = self._validate_setitem_args(key, value) if value is DELETE_CURRENT: self.discard(key) return EXECUTION_IS_COMPLETE return CONTINUE_NORMAL_EXECUTION @abstractmethod def __setitem__(self, key:NonEmptyPersiDictKey, value: ValueType | Joker) -> None: """Set the value for a key. Special values KEEP_CURRENT and DELETE_CURRENT are interpreted as commands to keep or delete the current value respectively. Args: key: Key (string or sequence of strings) or SafeStrTuple. value: Value to store, or a Joker command. Raises: MutationPolicyError: If attempting to modify an existing key when append_only is True. NotImplementedError: Subclasses must implement actual writing. """ if self._process_setitem_args(key, value) is EXECUTION_IS_COMPLETE: return raise NotImplementedError("PersiDict is an abstract base class" " and cannot store items directly") def _check_delete_policy(self) -> None: """Raise MutationPolicyError if deletion is not allowed on this dict. Raises: MutationPolicyError: If append_only is True. """ if self.append_only: raise MutationPolicyError("append-only") def _remove_item(self, key: NonEmptySafeStrTuple) -> None: """Remove a key from the backend storage. Performs the raw deletion without policy or existence pre-checks. Subclasses should override this with a direct backend deletion so that ``discard`` and ``discard_if`` can bypass the existence check in ``_process_delitem_args``. The default falls back to ``del self[key]``. Args: key: Already-validated NonEmptySafeStrTuple. Raises: KeyError: If the key does not exist in the backend. """ del self[key] def _process_delitem_args(self, key: NonEmptyPersiDictKey) -> None: """Perform the first steps for deleting an item. Args: key: Dictionary key (string or sequence of strings or NonEmptySafeStrTuple). Raises: MutationPolicyError: If attempting to delete an item when append_only is True. KeyError: If the key does not exist. """ self._check_delete_policy() key = NonEmptySafeStrTuple(key) if key not in self: raise KeyError(key) @abstractmethod def __delitem__(self, key:NonEmptyPersiDictKey): """Delete a key and its value. Args: key: Key (string or sequence of strings) or SafeStrTuple. Raises: MutationPolicyError: If append_only is True. KeyError: If the key does not exist. NotImplementedError: Subclasses must implement deletion. """ self._process_delitem_args(key) raise NotImplementedError("PersiDict is an abstract base class" " and cannot delete items directly") @abstractmethod def __len__(self) -> int: """Return the number of stored items. Returns: Number of key-value pairs. Raises: NotImplementedError: Subclasses must implement counting. """ raise NotImplementedError("PersiDict is an abstract base class" " and cannot count items directly") def _process_generic_iter_args(self, result_type: set[str]) -> None: """Validate arguments for iterator helpers. Args: result_type: A set indicating desired fields among {'keys', 'values', 'timestamps'}. Raises: TypeError: If result_type is not a set. ValueError: If result_type contains invalid entries or an invalid number of items. """ if not isinstance(result_type, set): raise TypeError("result_type must be a set of strings") if not (1 <= len(result_type) <= 3): raise ValueError("result_type must contain between 1 and 3 elements") allowed = {"keys", "values", "timestamps"} if (result_type | allowed) != allowed: raise ValueError("result_type can only contain 'keys', 'values', 'timestamps'") if not (1 <= len(result_type & allowed) <= 3): raise ValueError("result_type must include at least one of 'keys', 'values', 'timestamps'") def _assemble_iter_result( self , result_type: set[str] , * , key: Any = None , value: Any = None , timestamp: Any = None ) -> Any: """Build a single yield value for _generic_iter from named fields. Assembles an iteration result by selecting the requested fields (key, value, timestamp) in canonical order and returning either a single value or a tuple, depending on how many fields were requested. Args: result_type: The same set passed to _generic_iter — a non-empty subset of {"keys", "values", "timestamps"}. key: The key to include when "keys" is in result_type. value: The value to include when "values" is in result_type. timestamp: The timestamp to include when "timestamps" is in result_type. Returns: A single item if only one field is requested, otherwise a tuple of the requested fields in (key, value, timestamp) order. """ to_return: list[Any] = [] if "keys" in result_type: to_return.append(key) if "values" in result_type: to_return.append(value) if "timestamps" in result_type: to_return.append(timestamp) if len(result_type) == 1: return to_return[0] return tuple(to_return) @abstractmethod def _generic_iter(self, result_type: set[str]) -> Any: """Underlying implementation for iterator helpers. Args: result_type: A set indicating desired fields among {'keys', 'values', 'timestamps'}. Returns: An iterator yielding keys, values, and/or timestamps based on result_type. Raises: TypeError: If result_type is not a set. ValueError: If result_type contains invalid entries or an invalid number of items. NotImplementedError: Subclasses must implement the concrete iterator. """ self._process_generic_iter_args(result_type) raise NotImplementedError("PersiDict is an abstract base class" " and cannot iterate items directly") def __iter__(self) -> Iterator[NonEmptySafeStrTuple]: """Iterate over keys. Returns: Iterator of keys. """ return self._generic_iter({"keys"})
[docs] def keys(self) -> Iterator[NonEmptySafeStrTuple]: """Return an iterator over keys. Returns: Keys iterator. """ return self._generic_iter({"keys"})
[docs] def keys_and_timestamps(self) -> Iterator[tuple[NonEmptySafeStrTuple, float]]: """Return an iterator over (key, timestamp) pairs. Returns: Keys and POSIX timestamps. """ return self._generic_iter({"keys", "timestamps"})
[docs] def values(self) -> Iterator[ValueType]: """Return an iterator over values. Returns: Values iterator. """ return self._generic_iter({"values"})
[docs] def values_and_timestamps(self) -> Iterator[tuple[ValueType, float]]: """Return an iterator over (value, timestamp) pairs. Returns: Values and POSIX timestamps. """ return self._generic_iter({"values", "timestamps"})
[docs] def items(self) -> Iterator[tuple[NonEmptySafeStrTuple, ValueType]]: """Return an iterator over (key, value) pairs. Returns: Items iterator. """ return self._generic_iter({"keys", "values"})
[docs] def items_and_timestamps(self) -> Iterator[tuple[NonEmptySafeStrTuple, ValueType, float]]: """Return an iterator over (key, value, timestamp) triples. Returns: Items and timestamps. """ return self._generic_iter({"keys", "values", "timestamps"})
[docs] def setdefault(self, key: NonEmptyPersiDictKey, default: ValueType | None = None) -> ValueType: """Insert key with default value if absent; return the current value. Behaves like the built-in dict.setdefault(): if the key exists, return its current value; otherwise, set the key to the default value and return that default. Warning: This base class implementation is not atomic. Subclasses that require concurrency safety should override this method. Args: key: Dictionary key. default: Value to insert if the key is not present. Defaults to None. Returns: Existing value if key is present; otherwise the provided default value. Raises: TypeError: If default is a Joker command (KEEP_CURRENT/DELETE_CURRENT), or if the key is missing and default violates value type constraints. """ if isinstance(default, Joker): raise TypeError("default must be a regular value, not a Joker command") key = NonEmptySafeStrTuple(key) try: return self[key] except KeyError: self[key] = default return default
def __eq__(self, other: Any) -> bool: """Compare dictionaries for equality. When other has the exact same type and matching constructor parameters, returns True immediately (fast path). Otherwise falls through to element-wise comparison of all keys and values. Args: other: Another Mapping to compare against. Returns: True if the dictionaries have the same key/value pairs, False otherwise. Returns NotImplemented if other is not a Mapping. """ if self is other: return True if not isinstance(other, Mapping): return NotImplemented try: if type(self) is type(other) : if self.get_params() == other.get_params(): return True for self_key, other_key_value in zip_longest( self.keys(),other.items(), fillvalue=None): if self_key is None or other_key_value is None: return False (other_key, other_value) = other_key_value if self[other_key] != other_value: return False except (KeyError,TypeError, AttributeError, ValueError): return False return True def __ne__(self, other: Any) -> bool: """Return True if the dictionaries are not equal.""" if self is other: return False eq_result = self.__eq__(other) if eq_result is NotImplemented: return NotImplemented return not eq_result def __ior__(self, other: Mapping) -> Self: """Update this dict with items from other (self |= other).""" if not isinstance(other, Mapping): raise TypeError(f"Cannot update PersiDict with non-Mapping type: {type(other)}") self.update(other) return self def __getstate__(self): """Prevent pickling of PersiDict instances. Raises: TypeError: Always raised; PersiDict instances are not pickleable. """ raise TypeError( f"{self.__class__.__name__} instances cannot be pickled. " "To persist configuration, use get_params().") def __setstate__(self, state): """Prevent unpickling of PersiDict instances. Raises: TypeError: Always raised; PersiDict instances are not pickleable. """ raise TypeError( f"{self.__class__.__name__} instances cannot be unpickled. " "Recreate from parameters instead.")
[docs] def clear(self) -> None: """Remove all items from the dictionary. Raises: MutationPolicyError: If the dictionary is append-only. """ self._check_delete_policy() for k in self.keys(): self.discard(k)
[docs] def pop(self, key: NonEmptyPersiDictKey, *args: Any) -> Any: """Remove a key and return its value. Uses ``transform_item`` internally so the read-then-delete sequence is protected by ETag checks and automatic retries, avoiding the TOCTOU race that the inherited MutableMapping.pop would have. Args: key: Key (string or sequence of strings) or SafeStrTuple. *args: Optional default value (at most one). Returns: The value that was stored, or the default if the key was absent and a default was provided. Raises: MutationPolicyError: If the dictionary is append-only. TypeError: If more than one default argument is given. KeyError: If the key does not exist and no default was given. """ if len(args) > 1: raise TypeError( f"pop expected at most 2 arguments, got {1 + len(args)}") self._check_delete_policy() captured: list = [] def _grab_and_delete(current_value: Any) -> Any: captured.clear() captured.append(current_value) if current_value is ITEM_NOT_AVAILABLE: return KEEP_CURRENT return DELETE_CURRENT self.transform_item(key, transformer=_grab_and_delete) if captured and captured[0] is not ITEM_NOT_AVAILABLE: return captured[0] if args: return args[0] raise KeyError(NonEmptySafeStrTuple(key))
[docs] def popitem(self) -> tuple[NonEmptySafeStrTuple, ValueType]: """Remove and return an arbitrary (key, value) pair. Uses ``pop`` (which delegates to ``transform_item``) so the read-then-delete is protected by ETag checks and automatic retries. If the chosen key is deleted by another process before ``pop`` completes, the next key is tried until one succeeds or the dictionary is empty. Returns: A (key, value) tuple. Raises: MutationPolicyError: If the dictionary is append-only. KeyError: If the dictionary is empty. """ self._check_delete_policy() for key in self.keys(): try: value = self.pop(key) return (key, value) except KeyError: # Another process deleted this key; try the next one. continue raise KeyError("dictionary is empty")
[docs] def discard(self, key: NonEmptyPersiDictKey) -> bool: """Delete an item without raising an exception if it doesn't exist. This method is absent in the original dict API. Args: key: Key (string or sequence of strings) or SafeStrTuple. Returns: True if the item existed and was deleted; False otherwise. Raises: MutationPolicyError: If the dictionary is append-only. """ self._check_delete_policy() key = NonEmptySafeStrTuple(key) try: self._remove_item(key) return True except KeyError: return False
[docs] def delete_if_exists(self, key: NonEmptyPersiDictKey) -> bool: """Backward-compatible wrapper for discard(). This method is kept for backward compatibility; new code should use discard(). Behavior is identical to discard(). """ return self.discard(key)
[docs] def get_subdict(self, prefix_key:PersiDictKey) -> Self: """Get a sub-dictionary containing items with the given prefix key. Items whose keys start with the provided prefix are visible through the returned sub-dictionary. If the prefix does not exist, an empty sub-dictionary is returned. If the prefix is empty, the entire dictionary is returned. This method is absent in the original Python dict API. Args: prefix_key: Key prefix (string, sequence of strings, or SafeStrTuple) identifying the sub-dict to expose. Returns: A dictionary-like view restricted to keys under the provided prefix. Raises: NotImplementedError: Must be implemented by subclasses that support hierarchical key spaces. """ raise NotImplementedError( f"{type(self).__name__} does not support get_subdict()")
[docs] def subdicts(self) -> dict[str, Self]: """Return a mapping of first-level keys to sub-dictionaries. This method is absent in the original dict API. Returns: dict[str, Self]: A mapping from a top-level key segment to a sub-dictionary restricted to the corresponding keyspace. """ all_keys = {k[0] for k in self.keys()} result_subdicts = {k: self.get_subdict(k) for k in all_keys} return result_subdicts
[docs] def random_key(self) -> NonEmptySafeStrTuple | None: """Return a random key from the dictionary. This method is absent in the original Python dict API. Implementation uses reservoir sampling to select a uniformly random key in streaming time, without loading all keys into memory or using len(). Returns: NonEmptySafeStrTuple | None: A random key if the dictionary is not empty; None if the dictionary is empty. """ iterator = iter(self.keys()) try: # Get the first key result = next(iterator) except StopIteration: # Dictionary is empty return None # Reservoir sampling algorithm i = 2 for key in iterator: # Select the current key with probability 1/i if random.random() < 1/i: result = key i += 1 return result
[docs] @abstractmethod def timestamp(self, key:NonEmptyPersiDictKey) -> float: """Return the last modification time of a key. This method is absent in the original dict API. Args: key: Key (string or sequence of strings) or SafeStrTuple. Returns: POSIX timestamp (seconds since Unix epoch) of the last modification of the item. Raises: NotImplementedError: Must be implemented by subclasses. """ if type(self) is PersiDict: raise NotImplementedError("PersiDict is an abstract base class" " and cannot provide timestamps directly")
[docs] def etag(self, key: NonEmptyPersiDictKey) -> ETagValue: """Return the ETag of a key. By default, this returns a stringified timestamp of the last modification time. Subclasses may override to provide true backend-specific ETags (e.g., S3). This method is absent in the original Python dict API. Args: key: Key (string or sequence of strings) or SafeStrTuple. Returns: The ETag for the key. Raises: KeyError: If the key does not exist. """ return ETagValue(f"{self.timestamp(key):.6f}")
def _sorted_keys(self, *, max_n: int | None, newest_first: bool ) -> list[NonEmptySafeStrTuple]: """Return keys sorted by timestamp. Args: max_n: Maximum number of keys to return. If None, return all. Values <= 0 yield an empty list. newest_first: If True, sort newest first; otherwise oldest first. Returns: Keys sorted by timestamp in the requested order. """ if max_n is not None and max_n <= 0: return [] heap_fn = heapq.nlargest if newest_first else heapq.nsmallest if max_n is None: pairs = list(self.keys_and_timestamps()) pairs.sort(key=lambda x: x[1], reverse=newest_first) return [key for key, _ in pairs] return [key for key, _ in heap_fn( max_n, self.keys_and_timestamps(), key=lambda x: x[1])]
[docs] def oldest_keys(self, *, max_n: int | None = None ) -> list[NonEmptySafeStrTuple]: """Return up to max_n oldest keys in the dictionary. This method is absent in the original Python dict API. Args: max_n: Maximum number of keys to return. If None, return all keys sorted by age (oldest first). Values <= 0 yield an empty list. Defaults to None. Returns: The oldest keys, oldest first. """ return self._sorted_keys(max_n=max_n, newest_first=False)
[docs] def newest_keys(self, *, max_n: int | None = None ) -> list[NonEmptySafeStrTuple]: """Return up to max_n newest keys in the dictionary. This method is absent in the original Python dict API. Args: max_n: Maximum number of keys to return. If None, return all keys sorted by age (newest first). Values <= 0 yield an empty list. Defaults to None. Returns: The newest keys, newest first. """ return self._sorted_keys(max_n=max_n, newest_first=True)
[docs] def oldest_values(self, *, max_n: int | None = None) -> list[ValueType]: """Return up to max_n oldest values in the dictionary. This method is absent in the original Python dict API. Args: max_n: Maximum number of values to return. If None, return values for all keys sorted by age (oldest first). Values <= 0 yield an empty list. Returns: Values corresponding to the oldest keys. """ return [self[k] for k in self.oldest_keys(max_n=max_n)]
[docs] def newest_values(self, *, max_n: int | None = None) -> list[ValueType]: """Return up to max_n newest values in the dictionary. This method is absent in the original Python dict API. Args: max_n: Maximum number of values to return. If None, return values for all keys sorted by age (newest first). Values <= 0 yield an empty list. Returns: Values corresponding to the newest keys. """ return [self[k] for k in self.newest_keys(max_n=max_n)]