Source code for persidict.s3_dict_file_dir_cached

"""S3Dict_FileDirCached implementation that uses BasicS3Dict, FileDirDict, and cached classes."""

from __future__ import annotations

from typing import Any, Final

from mixinforge import sort_dict_by_keys

from .basic_s3_dict import BasicS3Dict
from .file_dir_dict import FileDirDict
from .cached_appendonly_dict import AppendOnlyDictCached
from .cached_mutable_dict import MutableDictCached
from .persi_dict import PersiDict, NonEmptyPersiDictKey, PersiDictKey, ValueType
from .jokers_and_status_flags import (ETagConditionFlag, ETagValue,
                                      ETagIfExists,
                                      Joker,
                                      RetrieveValueFlag, IF_ETAG_CHANGED,
                                      ConditionalOperationResult,
                                      OperationResult,
                                      TransformingFunction)
from .safe_str_tuple import NonEmptySafeStrTuple, SafeStrTuple
from .overlapping_multi_dict import OverlappingMultiDict


# Default base directory for S3Dict_FileDirCached local cache
S3DICT_NEW_DEFAULT_BASE_DIR: Final[str] = "__s3_dict__"


[docs] class S3Dict_FileDirCached(PersiDict[ValueType]): """S3-backed persistent dictionary using BasicS3Dict with local caching. This class mimics the interface and behavior of S3Dict_Legacy but internally uses BasicS3Dict for S3 operations combined with FileDirDict-based local caching via the cached wrapper classes (AppendOnlyDictCached/MutableDictCached). The architecture layers caching on top of BasicS3Dict to provide: - Fast local access for frequently accessed items - Efficient batch operations - ETag-based change detection for mutable dictionaries - Optimized append-only performance when append_only=True """ def __init__(self, *, bucket_name: str = "my_bucket", region: str = None, root_prefix: str = "", base_dir: str = S3DICT_NEW_DEFAULT_BASE_DIR, serialization_format: str = "pkl", digest_len: int = 8, append_only: bool = False, base_class_for_values: type | None = None): """Initialize an S3-backed persistent dictionary with local caching. Args: bucket_name: Name of the S3 bucket to use. region: AWS region for the bucket. root_prefix: Common S3 key prefix under which all objects are stored. base_dir: Local directory path for caching. serialization_format: File extension/format for stored values. digest_len: Number of base32 MD5 hash characters for collision prevention. append_only: If True, prevents modification/deletion of existing items. base_class_for_values: Optional base class that all stored values must inherit from. """ super().__init__(append_only=append_only, base_class_for_values=base_class_for_values, serialization_format=serialization_format) # Create the main S3 storage using BasicS3Dict self._main_dict = BasicS3Dict( bucket_name=bucket_name, region=region, root_prefix=root_prefix, serialization_format=serialization_format, append_only=append_only, base_class_for_values=base_class_for_values ) # Set up local cache parameters for FileDirDict individual_subdicts_params = {self.serialization_format: {}} if not append_only: self.etag_serialization_format = f"{self.serialization_format}_etag" individual_subdicts_params[self.etag_serialization_format] = { "base_class_for_values": str} # Create local cache using OverlappingMultiDict with FileDirDict self.local_cache = OverlappingMultiDict( dict_type=FileDirDict, shared_subdicts_params={ "base_dir": base_dir, "append_only": append_only, "base_class_for_values": base_class_for_values, "digest_len": digest_len }, **individual_subdicts_params) # Get the data cache self._data_cache = getattr(self.local_cache, self.serialization_format) # Create the appropriate cached wrapper if append_only: # Use AppendOnlyDictCached for append-only mode self._cached_dict = AppendOnlyDictCached( main_dict=self._main_dict, data_cache=self._data_cache ) else: # Use MutableDictCached for mutable mode with ETag cache self._etag_cache = getattr(self.local_cache, self.etag_serialization_format) self._cached_dict = MutableDictCached( main_dict=self._main_dict, data_cache=self._data_cache, etag_cache=self._etag_cache ) @property def digest_len(self) -> int: """Get the digest length used for collision prevention.""" return self._data_cache.digest_len
[docs] def get_params(self) -> dict[str, Any]: """Return configuration parameters as a dictionary.""" # Get params from the main dict and local cache params = self._main_dict.get_params() cache_params = self._data_cache.get_params() # Add cache-specific params params["base_dir"] = cache_params["base_dir"] params["digest_len"] = cache_params["digest_len"] params = sort_dict_by_keys(params) return params
@property def base_url(self) -> str: """Get the base S3 URL.""" return self._main_dict.base_url @property def root_prefix(self) -> str: """Get the S3 root prefix for this dictionary.""" return self._main_dict.root_prefix @property def base_dir(self) -> str: """Get the base directory for local cache.""" return self._data_cache.base_dir def __contains__(self, key: NonEmptyPersiDictKey) -> bool: """Check if key exists in the dictionary.""" return self._cached_dict.__contains__(key) def __getitem__(self, key: NonEmptyPersiDictKey) -> ValueType: """Get item from dictionary.""" return self._cached_dict.__getitem__(key) def __setitem__(self, key: NonEmptyPersiDictKey, value: ValueType | Joker) -> None: """Set item in dictionary.""" self._cached_dict.__setitem__(key, value) def __delitem__(self, key: NonEmptyPersiDictKey) -> None: """Delete item from dictionary.""" self._cached_dict.__delitem__(key) def __len__(self) -> int: """Get number of items in dictionary.""" return self._cached_dict.__len__() def _generic_iter(self, result_type: set[str]): """Generic iteration over dictionary items.""" return self._cached_dict._generic_iter(result_type)
[docs] def get_subdict(self, prefix_key: PersiDictKey) -> 'S3Dict_FileDirCached[ValueType]': """Get a subdictionary for the given key prefix. Returns a new S3Dict_FileDirCached with both the S3 storage and local cache scoped to the given prefix. Modifications to the subdictionary will be visible in the parent and vice versa. Args: prefix_key: Prefix key (string or sequence of strings) identifying the subdictionary scope. Returns: A new cached S3 dictionary rooted at the specified prefix. """ prefix_key = SafeStrTuple(prefix_key) # Delegate to _cached_dict which handles subdict creation properly cached_subdict = self._cached_dict.get_subdict(prefix_key) # Create a new S3Dict_FileDirCached wrapper around the cached subdict result = object.__new__(S3Dict_FileDirCached) # Initialize base PersiDict attributes PersiDict.__init__( result, append_only=self.append_only, base_class_for_values=self.base_class_for_values, serialization_format=self.serialization_format ) # Extract internal components from cached_subdict result._cached_dict = cached_subdict if self.append_only: result._main_dict = cached_subdict._main result._data_cache = cached_subdict._data_cache else: result._main_dict = cached_subdict._main_dict result._data_cache = cached_subdict._data_cache result._etag_cache = cached_subdict._etag_cache result.etag_serialization_format = self.etag_serialization_format # Note: local_cache (OverlappingMultiDict) is not set on subdicts # as it's only used during initial construction return result
[docs] def timestamp(self, key: NonEmptyPersiDictKey) -> float: """Get the timestamp of when the item was last modified.""" return self._cached_dict.timestamp(key)
[docs] def etag(self, key: NonEmptyPersiDictKey) -> ETagValue: """Get the ETag for an item. For mutable dicts, returns the cached S3 native ETag if available, otherwise fetches from S3 and caches it. For append-only dicts, delegates to the cached dict's etag method. Args: key: Non-empty key to query. Returns: The ETag string for the key. Raises: KeyError: If the key does not exist. """ return self._cached_dict.etag(key)
[docs] def get_item_if( self, key: NonEmptyPersiDictKey, *, condition: ETagConditionFlag, expected_etag: ETagIfExists, retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED ) -> ConditionalOperationResult[ValueType]: """Get item only if ETag satisfies a condition; delegate to cached dict.""" return self._cached_dict.get_item_if( key, condition=condition, expected_etag=expected_etag, retrieve_value=retrieve_value)
[docs] def set_item_if( self, key: NonEmptyPersiDictKey, *, value: ValueType | Joker, condition: ETagConditionFlag, expected_etag: ETagIfExists, retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED ) -> ConditionalOperationResult[ValueType]: """Set item only if ETag satisfies a condition; delegate to cached dict.""" return self._cached_dict.set_item_if( key, value=value, condition=condition, expected_etag=expected_etag, retrieve_value=retrieve_value)
[docs] def setdefault_if( self, key: NonEmptyPersiDictKey, *, default_value: ValueType, condition: ETagConditionFlag, expected_etag: ETagIfExists, retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED ) -> ConditionalOperationResult[ValueType]: """Insert default if absent and condition satisfied; delegate to cached dict.""" return self._cached_dict.setdefault_if( key, default_value=default_value, condition=condition, expected_etag=expected_etag, retrieve_value=retrieve_value)
[docs] def discard_if( self, key: NonEmptyPersiDictKey, *, condition: ETagConditionFlag, expected_etag: ETagIfExists ) -> ConditionalOperationResult[ValueType]: """Discard item only if ETag satisfies a condition; delegate to cached dict.""" return self._cached_dict.discard_if( key, condition=condition, expected_etag=expected_etag)
[docs] def transform_item( self, key: NonEmptyPersiDictKey, *, transformer: TransformingFunction[ValueType], n_retries: int | None = 6 ) -> OperationResult[ValueType]: """Transform item; delegate to cached dict.""" return self._cached_dict.transform_item( key, transformer=transformer, n_retries=n_retries)
[docs] def discard(self, key: NonEmptyPersiDictKey) -> bool: """Delete an item without raising an exception if it doesn't exist. This method fixes the issue where cached dictionaries return multiple success counts for a single key deletion. Args: key: Key to delete. Returns: True if the item existed and was deleted; False otherwise. """ key = NonEmptySafeStrTuple(key) try: del self[key] return True except KeyError: return False
S3Dict = S3Dict_FileDirCached # Alias for backward compatibility