"""S3Dict_FileDirCached implementation that uses BasicS3Dict, FileDirDict, and cached classes."""
from __future__ import annotations
from typing import Any, Final
from mixinforge import sort_dict_by_keys
from .basic_s3_dict import BasicS3Dict
from .file_dir_dict import FileDirDict
from .cached_appendonly_dict import AppendOnlyDictCached
from .cached_mutable_dict import MutableDictCached
from .persi_dict import PersiDict, NonEmptyPersiDictKey, PersiDictKey, ValueType
from .jokers_and_status_flags import (ETagConditionFlag, ETagValue,
ETagIfExists,
Joker,
RetrieveValueFlag, IF_ETAG_CHANGED,
ConditionalOperationResult,
OperationResult,
TransformingFunction)
from .safe_str_tuple import NonEmptySafeStrTuple, SafeStrTuple
from .overlapping_multi_dict import OverlappingMultiDict
# Default base directory for S3Dict_FileDirCached local cache
S3DICT_NEW_DEFAULT_BASE_DIR: Final[str] = "__s3_dict__"
[docs]
class S3Dict_FileDirCached(PersiDict[ValueType]):
"""S3-backed persistent dictionary using BasicS3Dict with local caching.
This class mimics the interface and behavior of S3Dict_Legacy but internally uses
BasicS3Dict for S3 operations combined with FileDirDict-based local caching
via the cached wrapper classes (AppendOnlyDictCached/MutableDictCached).
The architecture layers caching on top of BasicS3Dict to provide:
- Fast local access for frequently accessed items
- Efficient batch operations
- ETag-based change detection for mutable dictionaries
- Optimized append-only performance when append_only=True
"""
def __init__(self, *,
bucket_name: str = "my_bucket",
region: str = None,
root_prefix: str = "",
base_dir: str = S3DICT_NEW_DEFAULT_BASE_DIR,
serialization_format: str = "pkl",
digest_len: int = 8,
append_only: bool = False,
base_class_for_values: type | None = None):
"""Initialize an S3-backed persistent dictionary with local caching.
Args:
bucket_name: Name of the S3 bucket to use.
region: AWS region for the bucket.
root_prefix: Common S3 key prefix under which all objects are stored.
base_dir: Local directory path for caching.
serialization_format: File extension/format for stored values.
digest_len: Number of base32 MD5 hash characters for collision prevention.
append_only: If True, prevents modification/deletion of existing items.
base_class_for_values: Optional base class that all stored values must inherit from.
"""
super().__init__(append_only=append_only,
base_class_for_values=base_class_for_values,
serialization_format=serialization_format)
# Create the main S3 storage using BasicS3Dict
self._main_dict = BasicS3Dict(
bucket_name=bucket_name,
region=region,
root_prefix=root_prefix,
serialization_format=serialization_format,
append_only=append_only,
base_class_for_values=base_class_for_values
)
# Set up local cache parameters for FileDirDict
individual_subdicts_params = {self.serialization_format: {}}
if not append_only:
self.etag_serialization_format = f"{self.serialization_format}_etag"
individual_subdicts_params[self.etag_serialization_format] = {
"base_class_for_values": str}
# Create local cache using OverlappingMultiDict with FileDirDict
self.local_cache = OverlappingMultiDict(
dict_type=FileDirDict,
shared_subdicts_params={
"base_dir": base_dir,
"append_only": append_only,
"base_class_for_values": base_class_for_values,
"digest_len": digest_len
},
**individual_subdicts_params)
# Get the data cache
self._data_cache = getattr(self.local_cache, self.serialization_format)
# Create the appropriate cached wrapper
if append_only:
# Use AppendOnlyDictCached for append-only mode
self._cached_dict = AppendOnlyDictCached(
main_dict=self._main_dict,
data_cache=self._data_cache
)
else:
# Use MutableDictCached for mutable mode with ETag cache
self._etag_cache = getattr(self.local_cache, self.etag_serialization_format)
self._cached_dict = MutableDictCached(
main_dict=self._main_dict,
data_cache=self._data_cache,
etag_cache=self._etag_cache
)
@property
def digest_len(self) -> int:
"""Get the digest length used for collision prevention."""
return self._data_cache.digest_len
[docs]
def get_params(self) -> dict[str, Any]:
"""Return configuration parameters as a dictionary."""
# Get params from the main dict and local cache
params = self._main_dict.get_params()
cache_params = self._data_cache.get_params()
# Add cache-specific params
params["base_dir"] = cache_params["base_dir"]
params["digest_len"] = cache_params["digest_len"]
params = sort_dict_by_keys(params)
return params
@property
def base_url(self) -> str:
"""Get the base S3 URL."""
return self._main_dict.base_url
@property
def root_prefix(self) -> str:
"""Get the S3 root prefix for this dictionary."""
return self._main_dict.root_prefix
@property
def base_dir(self) -> str:
"""Get the base directory for local cache."""
return self._data_cache.base_dir
def __contains__(self, key: NonEmptyPersiDictKey) -> bool:
"""Check if key exists in the dictionary."""
return self._cached_dict.__contains__(key)
def __getitem__(self, key: NonEmptyPersiDictKey) -> ValueType:
"""Get item from dictionary."""
return self._cached_dict.__getitem__(key)
def __setitem__(self, key: NonEmptyPersiDictKey, value: ValueType | Joker) -> None:
"""Set item in dictionary."""
self._cached_dict.__setitem__(key, value)
def __delitem__(self, key: NonEmptyPersiDictKey) -> None:
"""Delete item from dictionary."""
self._cached_dict.__delitem__(key)
def __len__(self) -> int:
"""Get number of items in dictionary."""
return self._cached_dict.__len__()
def _generic_iter(self, result_type: set[str]):
"""Generic iteration over dictionary items."""
return self._cached_dict._generic_iter(result_type)
[docs]
def get_subdict(self, prefix_key: PersiDictKey) -> 'S3Dict_FileDirCached[ValueType]':
"""Get a subdictionary for the given key prefix.
Returns a new S3Dict_FileDirCached with both the S3 storage and local
cache scoped to the given prefix. Modifications to the subdictionary
will be visible in the parent and vice versa.
Args:
prefix_key: Prefix key (string or sequence of strings) identifying the
subdictionary scope.
Returns:
A new cached S3 dictionary rooted at the
specified prefix.
"""
prefix_key = SafeStrTuple(prefix_key)
# Delegate to _cached_dict which handles subdict creation properly
cached_subdict = self._cached_dict.get_subdict(prefix_key)
# Create a new S3Dict_FileDirCached wrapper around the cached subdict
result = object.__new__(S3Dict_FileDirCached)
# Initialize base PersiDict attributes
PersiDict.__init__(
result,
append_only=self.append_only,
base_class_for_values=self.base_class_for_values,
serialization_format=self.serialization_format
)
# Extract internal components from cached_subdict
result._cached_dict = cached_subdict
if self.append_only:
result._main_dict = cached_subdict._main
result._data_cache = cached_subdict._data_cache
else:
result._main_dict = cached_subdict._main_dict
result._data_cache = cached_subdict._data_cache
result._etag_cache = cached_subdict._etag_cache
result.etag_serialization_format = self.etag_serialization_format
# Note: local_cache (OverlappingMultiDict) is not set on subdicts
# as it's only used during initial construction
return result
[docs]
def timestamp(self, key: NonEmptyPersiDictKey) -> float:
"""Get the timestamp of when the item was last modified."""
return self._cached_dict.timestamp(key)
[docs]
def etag(self, key: NonEmptyPersiDictKey) -> ETagValue:
"""Get the ETag for an item.
For mutable dicts, returns the cached S3 native ETag if available,
otherwise fetches from S3 and caches it. For append-only dicts,
delegates to the cached dict's etag method.
Args:
key: Non-empty key to query.
Returns:
The ETag string for the key.
Raises:
KeyError: If the key does not exist.
"""
return self._cached_dict.etag(key)
[docs]
def get_item_if(
self,
key: NonEmptyPersiDictKey,
*,
condition: ETagConditionFlag,
expected_etag: ETagIfExists,
retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED
) -> ConditionalOperationResult[ValueType]:
"""Get item only if ETag satisfies a condition; delegate to cached dict."""
return self._cached_dict.get_item_if(
key, condition=condition, expected_etag=expected_etag,
retrieve_value=retrieve_value)
[docs]
def set_item_if(
self,
key: NonEmptyPersiDictKey,
*,
value: ValueType | Joker,
condition: ETagConditionFlag,
expected_etag: ETagIfExists,
retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED
) -> ConditionalOperationResult[ValueType]:
"""Set item only if ETag satisfies a condition; delegate to cached dict."""
return self._cached_dict.set_item_if(
key, value=value, condition=condition, expected_etag=expected_etag,
retrieve_value=retrieve_value)
[docs]
def setdefault_if(
self,
key: NonEmptyPersiDictKey,
*,
default_value: ValueType,
condition: ETagConditionFlag,
expected_etag: ETagIfExists,
retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED
) -> ConditionalOperationResult[ValueType]:
"""Insert default if absent and condition satisfied; delegate to cached dict."""
return self._cached_dict.setdefault_if(
key, default_value=default_value, condition=condition,
expected_etag=expected_etag, retrieve_value=retrieve_value)
[docs]
def discard_if(
self,
key: NonEmptyPersiDictKey,
*,
condition: ETagConditionFlag,
expected_etag: ETagIfExists
) -> ConditionalOperationResult[ValueType]:
"""Discard item only if ETag satisfies a condition; delegate to cached dict."""
return self._cached_dict.discard_if(
key, condition=condition, expected_etag=expected_etag)
[docs]
def discard(self, key: NonEmptyPersiDictKey) -> bool:
"""Delete an item without raising an exception if it doesn't exist.
This method fixes the issue where cached dictionaries return multiple
success counts for a single key deletion.
Args:
key: Key to delete.
Returns:
True if the item existed and was deleted; False otherwise.
"""
key = NonEmptySafeStrTuple(key)
try:
del self[key]
return True
except KeyError:
return False
S3Dict = S3Dict_FileDirCached # Alias for backward compatibility