Source code for persidict.basic_s3_dict

"""Basic S3-backed persistent dictionary.

This module provides `BasicS3Dict`, a concrete implementation of the
`PersiDict` interface that stores each dictionary entry as a separate
object in Amazon S3. It is designed for simple, low-overhead persistence
without local caching, while exposing a familiar mapping-like API.

See individual method docstrings for details on semantics and exceptions.
"""

from __future__ import annotations

from typing import Any, Final
import io

import boto3
from botocore.exceptions import ClientError

from mixinforge import sort_dict_by_keys

from .safe_str_tuple import SafeStrTuple, NonEmptySafeStrTuple
from .safe_str_tuple_signing import sign_safe_str_tuple, unsign_safe_str_tuple
from .persi_dict import PersiDict, NonEmptyPersiDictKey, PersiDictKey, ValueType
from .exceptions import MutationPolicyError, ConcurrencyConflictError
from .jokers_and_status_flags import (EXECUTION_IS_COMPLETE,
                                      KEEP_CURRENT, DELETE_CURRENT,
                                      Joker, ETagValue,
                                      ETagConditionFlag,
                                      ANY_ETAG, ETAG_IS_THE_SAME, ETAG_HAS_CHANGED,
                                      RetrieveValueFlag, ALWAYS_RETRIEVE,
                                      NEVER_RETRIEVE, IF_ETAG_CHANGED,
                                      ITEM_NOT_AVAILABLE, ItemNotAvailableFlag,
                                      VALUE_NOT_RETRIEVED,
                                      ETagIfExists, ConditionalOperationResult)


_MAX_SETDEFAULT_RETRIES: Final[int] = 5


def _s3_error_status_code(e: ClientError) -> tuple[int | None, str | None]:
    """Return HTTP status and S3 error code if present on a ClientError."""
    response = getattr(e, "response", {}) or {}
    status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
    code = response.get("Error", {}).get("Code")
    return status, code


def not_found_error(e: ClientError) -> bool:
    """Check if a ClientError indicates a missing S3 object.

    Args:
        e: The ClientError exception to check.

    Returns:
        True if the error indicates a missing object (404, NoSuchKey),
        False otherwise.
    """
    status, error_code = _s3_error_status_code(e)
    if status == 404:
        return True
    return error_code in ("NoSuchKey", "404", "NotFound")


def conditional_request_failed(e: ClientError) -> bool:
    """Check if a ClientError indicates a failed conditional request."""
    status, code = _s3_error_status_code(e)
    return status in (409, 412) or code in ("ConditionalRequestConflict", "PreconditionFailed")


def not_modified_error(e: ClientError) -> bool:
    """Check if a ClientError indicates an If-None-Match not modified response."""
    status, code = _s3_error_status_code(e)
    return status == 304 or code in ("304", "NotModified")


[docs] class BasicS3Dict(PersiDict[ValueType]): """A persistent dictionary that stores key-value pairs as S3 objects. Each key-value pair is stored as a separate S3 object in the specified bucket. A key can be either a string (object name without file extension) or a sequence of strings representing a hierarchical path (folder structure ending with an object name). Values can be instances of any Python type and are serialized to S3 objects. BasicS3Dict supports multiple serialization formats: - Binary storage using pickle ('pkl' format) - Human-readable text using jsonpickle ('json' format) - Plain text for string values (other formats) Note: Unlike native Python dictionaries, insertion order is not preserved. Operations may incur S3 API costs and network latency. All operations are performed directly against S3 without local caching. """ region: str bucket_name: str root_prefix: str _conditional_delete_probed: bool = False _conditional_delete_supported: bool = True _if_none_match_etag_probed: bool = False _if_none_match_etag_supported: bool = True _CONTENT_TYPE_MAP: dict[str, str] = { 'json': 'application/json', 'pkl': 'application/octet-stream', } def __init__(self, *, bucket_name: str = "my_bucket", region: str = None, root_prefix: str = "", serialization_format: str = "pkl", append_only: bool = False, base_class_for_values: type | None = None): """Initialize a basic S3-backed persistent dictionary. Args: bucket_name: Name of the S3 bucket to use. The bucket will be created automatically if it does not exist and permissions allow. region: AWS region for the bucket. If None, uses the default client region from AWS configuration. root_prefix: Common S3 key prefix under which all objects are stored. A trailing slash is automatically added if missing. serialization_format: File extension/format for stored values. Supported formats: 'pkl' (pickle), 'json' (jsonpickle), or custom text formats. append_only: If True, prevents modification of existing items after they are initially stored. base_class_for_values: Optional base class that all stored values must inherit from. When specified (and not str), serialization_format must be 'pkl' or 'json' for proper serialization. Note: The S3 bucket will be created if it doesn't exist and AWS permissions allow. Network connectivity and valid AWS credentials are required. """ super().__init__(append_only=append_only, base_class_for_values=base_class_for_values, serialization_format=serialization_format) self.region = region if region is None: self.s3_client = boto3.client('s3') else: self.s3_client = boto3.client('s3', region_name=region) try: self.s3_client.head_bucket(Bucket=bucket_name) except ClientError as e: error_code = e.response['Error']['Code'] if not_found_error(e): # Bucket does not exist, attempt to create it try: effective_region = self.s3_client.meta.region_name if effective_region and effective_region != 'us-east-1': self.s3_client.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint': effective_region}) else: self.s3_client.create_bucket(Bucket=bucket_name) except ClientError as create_e: create_error_code = create_e.response['Error']['Code'] # Handle race condition where the bucket was created by another # process or its name is already taken by another AWS account if ( create_error_code == 'BucketAlreadyOwnedByYou' or create_error_code == 'BucketAlreadyExists'): pass else: raise create_e # Re-raise other unexpected creation errors elif error_code == '403' or error_code == 'Forbidden': # Bucket exists but access is forbidden - likely a cross-account # bucket with policy granting limited access. Operations may still # work if the policy allows the required S3 permissions. pass else: raise e # Re-raise other unexpected head_bucket errors self.bucket_name = bucket_name self.root_prefix = root_prefix if len(self.root_prefix) and self.root_prefix[-1] != "/": self.root_prefix += "/"
[docs] def get_params(self) -> dict[str, Any]: """Return configuration parameters as a dictionary. This method supports the Parameterizable API and is not part of the standard Python dictionary interface. Returns: A mapping of parameter names to their configured values, including S3-specific parameters (region, bucket_name, root_prefix) sorted by key names. """ params = super().get_params() additional_params = dict( region=self.region, bucket_name=self.bucket_name, root_prefix=self.root_prefix) params = {**params, **additional_params} sorted_params = sort_dict_by_keys(params) return sorted_params
[docs] def etag(self, key: NonEmptyPersiDictKey) -> ETagValue: """Get an ETag for a key. Args: key: Dictionary key (string or sequence of strings or NonEmptySafeStrTuple). Returns: The ETag value for the S3 object. Raises: KeyError: If the key does not exist in S3. """ key = NonEmptySafeStrTuple(key) obj_name = self._build_full_objectname(key) try: response = self.s3_client.head_object(Bucket=self.bucket_name, Key=obj_name) return ETagValue(response["ETag"]) except ClientError as e: if not_found_error(e): raise KeyError(key) from e else: raise
@property def base_url(self) -> str|None: """Return the S3 URL prefix of this dictionary. This property is not part of the standard Python dictionary interface. Returns: The base S3 URL in the format "s3://<bucket>/<root_prefix>". """ return f"s3://{self.bucket_name}/{self.root_prefix}" def _build_full_objectname(self, key: NonEmptyPersiDictKey) -> str: """Convert a key into a full S3 object key. Args: key: Dictionary key (string or sequence of strings or NonEmptySafeStrTuple). Returns: The complete S3 object key including root_prefix and serialization_format extension, with digest-based collision prevention applied if enabled. """ key = NonEmptySafeStrTuple(key) key = sign_safe_str_tuple(key, 0) objectname = self.root_prefix + "/".join(key) + "." + self.serialization_format return objectname def __contains__(self, key: NonEmptyPersiDictKey) -> bool: """Check if the specified key exists in the dictionary. Performs a HEAD request to S3 to verify the object's existence. Args: key: Dictionary key (string or sequence of strings or NonEmptySafeStrTuple). Returns: True if the key exists in S3, False otherwise. """ key = NonEmptySafeStrTuple(key) try: obj_name = self._build_full_objectname(key) self.s3_client.head_object(Bucket=self.bucket_name, Key=obj_name) return True except ClientError as e: if not_found_error(e): return False else: raise def _deserialize_s3_body(self, body) -> Any: """Deserialize a value from an S3 response body. Args: body: The S3 response Body stream. Returns: The deserialized value. """ try: raw = body.read() if self.serialization_format == 'pkl': f = io.BytesIO(raw) else: f = io.StringIO(raw.decode('utf-8')) return self._deserialize_from_file(f) finally: body.close() def _get_value_and_etag(self, key: NonEmptySafeStrTuple) -> tuple[ValueType, ETagValue]: """Return a consistent value and ETag for a key in a single S3 request. Args: key: Normalized dictionary key. Returns: A matching (value, ETag) pair. Raises: KeyError: If the key does not exist in S3. TypeError: If base_class_for_values is set and the stored value does not match it. """ obj_name = self._build_full_objectname(key) try: response = self.s3_client.get_object( Bucket=self.bucket_name, Key=obj_name) s3_etag = ETagValue(response["ETag"]) value = self._deserialize_s3_body(response['Body']) self._validate_returned_value(value) return value, s3_etag except ClientError as e: if not_found_error(e): raise KeyError(key) from e raise def _result_for_missing_key( self, condition: ETagConditionFlag, expected_etag: ETagIfExists ) -> ConditionalOperationResult[ValueType]: """Build a ConditionalOperationResult for a missing key.""" satisfied = self._check_condition( condition, expected_etag, ITEM_NOT_AVAILABLE) return self._result_item_not_available(satisfied) def _conditional_failure_result( self, key: NonEmptySafeStrTuple, *, expected_etag: ETagIfExists = ITEM_NOT_AVAILABLE, retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED, return_existing_value: bool = True ) -> ConditionalOperationResult[ValueType]: """Build result for a failed conditional write/delete.""" if not return_existing_value or retrieve_value is NEVER_RETRIEVE: new_actual = self._actual_etag(key) if new_actual is ITEM_NOT_AVAILABLE: return self._result_item_not_available(False) return self._result_unchanged( False, new_actual, VALUE_NOT_RETRIEVED) if retrieve_value is ALWAYS_RETRIEVE: try: new_value, new_actual = self._get_value_and_etag(key) except KeyError: return self._result_item_not_available(False) return self._result_unchanged( False, new_actual, new_value) # IF_ETAG_CHANGED: fetch only if actual ETag differs from expected new_actual = self._actual_etag(key) if new_actual is ITEM_NOT_AVAILABLE: return self._result_item_not_available(False) if expected_etag != new_actual: try: new_value, new_actual = self._get_value_and_etag(key) except KeyError: return self._result_item_not_available(False) return self._result_unchanged( False, new_actual, new_value) return self._result_unchanged( False, new_actual, VALUE_NOT_RETRIEVED)
[docs] def get_item_if( self, key: NonEmptyPersiDictKey, *, condition: ETagConditionFlag, expected_etag: ETagIfExists, retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED ) -> ConditionalOperationResult[ValueType]: """Retrieve the value for a key only if an ETag condition is satisfied. Uses S3 conditional headers (IfMatch/IfNoneMatch) for server-side condition checking when possible. Args: key: Dictionary key. condition: ANY_ETAG, ETAG_IS_THE_SAME, or ETAG_HAS_CHANGED. expected_etag: The caller's expected ETag, or ITEM_NOT_AVAILABLE. retrieve_value: Controls value retrieval. IF_ETAG_CHANGED (default) uses S3 IfNoneMatch to skip the fetch when the ETag matches. ALWAYS_RETRIEVE always fetches the value. NEVER_RETRIEVE does a HEAD only and returns VALUE_NOT_RETRIEVED. Returns: ConditionalOperationResult with the outcome. Raises: TypeError: If base_class_for_values is set and the retrieved value does not match it. """ self._validate_retrieve_value(retrieve_value) key = NonEmptySafeStrTuple(key) if retrieve_value is ALWAYS_RETRIEVE: try: value, actual_etag = self._get_value_and_etag(key) except KeyError: return self._result_for_missing_key(condition, expected_etag) satisfied = self._check_condition(condition, expected_etag, actual_etag) return self._result_unchanged(satisfied, actual_etag, value) if retrieve_value is NEVER_RETRIEVE: actual_etag = self._actual_etag(key) if actual_etag is ITEM_NOT_AVAILABLE: return self._result_for_missing_key(condition, expected_etag) satisfied = self._check_condition(condition, expected_etag, actual_etag) return self._result_unchanged( satisfied, actual_etag, VALUE_NOT_RETRIEVED) obj_name = self._build_full_objectname(key) use_if_none_match = not isinstance(expected_etag, ItemNotAvailableFlag) get_kwargs = { "Bucket": self.bucket_name, "Key": obj_name } if use_if_none_match: get_kwargs["IfNoneMatch"] = expected_etag try: response = self.s3_client.get_object(**get_kwargs) actual_etag = ETagValue(response["ETag"]) value = self._deserialize_s3_body(response["Body"]) except ClientError as e: if not_found_error(e): return self._result_for_missing_key(condition, expected_etag) if use_if_none_match and not_modified_error(e): satisfied = self._check_condition( condition, expected_etag, expected_etag) return self._result_unchanged( satisfied, expected_etag, VALUE_NOT_RETRIEVED) raise self._validate_returned_value(value) satisfied = self._check_condition(condition, expected_etag, actual_etag) return self._result_unchanged(satisfied, actual_etag, value)
def __getitem__(self, key: NonEmptyPersiDictKey) -> ValueType: """Retrieve the value stored for a key directly from S3. Args: key: Dictionary key (string or sequence of strings or NonEmptySafeStrTuple). Returns: The deserialized value stored for the key. Raises: KeyError: If the key does not exist in S3. TypeError: If base_class_for_values is set and the stored value does not match it. """ key = NonEmptySafeStrTuple(key) obj_name = self._build_full_objectname(key) try: response = self.s3_client.get_object( Bucket=self.bucket_name, Key=obj_name) value = self._deserialize_s3_body(response['Body']) self._validate_returned_value(value) return value except ClientError as e: if not_found_error(e): raise KeyError(key) from e raise
[docs] def setdefault(self, key: NonEmptyPersiDictKey, default: ValueType | None = None) -> ValueType: """Insert key with default value if absent; return the current value. Uses an S3 conditional put (If-None-Match: ``*``) to avoid overwriting existing values under concurrent writers. On conditional failure, returns the current value without modifying it. Args: key: Key (string, sequence of strings, or SafeStrTuple). default: Value to insert if the key is not present. Defaults to None. Returns: Existing value if key is present; otherwise the provided default value. Raises: TypeError: If default is a Joker command (KEEP_CURRENT/DELETE_CURRENT), or if the key is missing and default violates value type constraints. ConcurrencyConflictError: If retries are exhausted due to concurrent modifications. """ key = NonEmptySafeStrTuple(key) if isinstance(default, Joker): raise TypeError("default must be a regular value, not a Joker command") invalid_default = isinstance(default, PersiDict) if not invalid_default and self.base_class_for_values is not None: invalid_default = not isinstance(default, self.base_class_for_values) if invalid_default: try: return self[key] except KeyError: if isinstance(default, PersiDict): raise TypeError("Cannot store a PersiDict instance directly") raise TypeError(f"Value must be an instance of" f" {self.base_class_for_values.__name__}") try: serialized_data, content_type = self._serialize_value_for_s3(default) except Exception as exc: try: return self[key] except KeyError: raise exc from None obj_name = self._build_full_objectname(key) for _ in range(_MAX_SETDEFAULT_RETRIES): try: self.s3_client.put_object( Bucket=self.bucket_name, Key=obj_name, Body=serialized_data, ContentType=content_type, IfNoneMatch="*" ) return default except ClientError as e: if conditional_request_failed(e): try: return self[key] except KeyError: # Key was deleted between our failed put and # our read — retry the whole operation. continue raise raise ConcurrencyConflictError(key, _MAX_SETDEFAULT_RETRIES)
def _serialize_value_for_s3(self, value: Any) -> tuple[bytes, str]: """Serialize a value for S3 storage and return (bytes, content_type).""" content_type = self._CONTENT_TYPE_MAP.get( self.serialization_format, 'text/plain') if self.serialization_format == 'pkl': with io.BytesIO() as buffer: self._serialize_to_file(value, buffer) serialized_data = buffer.getvalue() else: with io.StringIO() as buffer: self._serialize_to_file(value, buffer) serialized_data = buffer.getvalue().encode('utf-8') return serialized_data, content_type @staticmethod def _compute_conditional_headers( condition: ETagConditionFlag, expected_etag: ETagIfExists, actual_etag: ETagIfExists = ITEM_NOT_AVAILABLE ) -> tuple[str | None, str | None]: """Map a condition and expected ETag to S3 conditional headers. Args: condition: The ETag condition to enforce. expected_etag: The caller's expected ETag, or ITEM_NOT_AVAILABLE. actual_etag: The actual ETag (needed for ETAG_HAS_CHANGED with ITEM_NOT_AVAILABLE expected_etag). Returns: (if_match, if_none_match) for use in S3 put/delete calls. """ if condition is ETAG_IS_THE_SAME: if isinstance(expected_etag, ItemNotAvailableFlag): return None, "*" return expected_etag, None if condition is ETAG_HAS_CHANGED: if isinstance(expected_etag, ItemNotAvailableFlag): return actual_etag, None return None, expected_etag return None, None def _put_object_with_conditions( self, key: NonEmptySafeStrTuple, value: Any, *, if_match: str | None = None, if_none_match: str | None = None ) -> ETagValue: """Serialize and upload a value to S3, returning the new ETag.""" obj_name = self._build_full_objectname(key) serialized_data, content_type = self._serialize_value_for_s3(value) put_kwargs = { "Bucket": self.bucket_name, "Key": obj_name, "Body": serialized_data, "ContentType": content_type, } if if_match is not None: put_kwargs["IfMatch"] = if_match if if_none_match is not None: put_kwargs["IfNoneMatch"] = if_none_match response = self.s3_client.put_object(**put_kwargs) return ETagValue(response["ETag"]) def _put_object_get_etag(self, key: NonEmptySafeStrTuple, value: Any) -> ETagValue: """Serialize and upload a value to S3, returning the new ETag. Args: key: Normalized dictionary key. value: Value to store (not a joker). Returns: The ETag of the newly stored object. """ return self._put_object_with_conditions(key, value)
[docs] def set_item_if( self, key: NonEmptyPersiDictKey, *, value: ValueType | Joker, condition: ETagConditionFlag, expected_etag: ETagIfExists, retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED ) -> ConditionalOperationResult[ValueType]: """Store a value only if an ETag condition is satisfied. Uses S3 conditional headers (IfMatch / IfNoneMatch) for a single-roundtrip put when the condition can be fully expressed without a prior HEAD. This covers ETAG_IS_THE_SAME (any expected_etag) and ETAG_HAS_CHANGED with a real expected_etag. Other combinations fall back to check-then-write. Args: key: Dictionary key. value: Value to store. condition: ANY_ETAG, ETAG_IS_THE_SAME, or ETAG_HAS_CHANGED. expected_etag: The caller's expected ETag, or ITEM_NOT_AVAILABLE. retrieve_value: Controls whether the existing value is fetched. Applies both when the condition is not satisfied and when KEEP_CURRENT is used with a satisfied condition. IF_ETAG_CHANGED (default) fetches only if expected_etag != actual_etag. ALWAYS_RETRIEVE fetches the existing value. NEVER_RETRIEVE returns VALUE_NOT_RETRIEVED. Returns: ConditionalOperationResult with the outcome. """ self._validate_retrieve_value(retrieve_value) key = NonEmptySafeStrTuple(key) if self.append_only and value is DELETE_CURRENT: raise MutationPolicyError("append-only") self._validate_value(value) # Fast path: single-roundtrip S3 conditional put using # IfMatch / IfNoneMatch headers. Eligible when we can fully # express the condition in S3 headers without a prior HEAD: # - ETAG_IS_THE_SAME (any expected_etag), unless append_only # (except insert-only: ETAG_IS_THE_SAME + ITEM_NOT_AVAILABLE, # which maps to IfNoneMatch:* and is safe in append_only) # - ETAG_HAS_CHANGED with a real expected_etag, provided # the backend enforces IfNoneMatch with specific ETags # (ITEM_NOT_AVAILABLE needs actual_etag for IfMatch, # so it must go through the fallback HEAD path) _fast_eligible = ( (not self.append_only or (condition is ETAG_IS_THE_SAME and isinstance(expected_etag, ItemNotAvailableFlag))) and value is not KEEP_CURRENT and value is not DELETE_CURRENT and condition is ETAG_IS_THE_SAME ) if not _fast_eligible and ( not self.append_only and value is not KEEP_CURRENT and value is not DELETE_CURRENT and condition is ETAG_HAS_CHANGED and not isinstance(expected_etag, ItemNotAvailableFlag) ): if not type(self)._if_none_match_etag_probed: self._probe_if_none_match_etag() if type(self)._if_none_match_etag_supported: _fast_eligible = True if _fast_eligible: return self._set_item_if_fast_path( key, value, condition, expected_etag, retrieve_value) return self._set_item_if_fallback( key, value, condition, expected_etag, retrieve_value)
def _set_item_if_fast_path( self, key: NonEmptySafeStrTuple, value: ValueType, condition: ETagConditionFlag, expected_etag: ETagIfExists, retrieve_value: RetrieveValueFlag ) -> ConditionalOperationResult[ValueType]: """Optimistic S3 conditional write in a single round-trip. Supports ETAG_IS_THE_SAME (any expected_etag) and ETAG_HAS_CHANGED (with a real expected_etag). Attempts a single S3 put with IfMatch/IfNoneMatch headers, avoiding a separate ETag check round-trip. For ETAG_IS_THE_SAME the reported actual_etag equals expected_etag (the PUT confirms it matched). For ETAG_HAS_CHANGED the true pre-write ETag is unknown (S3 does not return it); actual_etag is set to expected_etag as the caller's last-known reference. """ if_match, if_none_match = self._compute_conditional_headers( condition, expected_etag) # Best available pre-write ETag: for ETAG_IS_THE_SAME a # successful PUT confirms the object had this ETag; for # ETAG_HAS_CHANGED the true value is unknown, so we report # the caller's last-known ETag as a stable reference. actual_etag = (ITEM_NOT_AVAILABLE if isinstance(expected_etag, ItemNotAvailableFlag) else expected_etag) try: resulting_etag = self._put_object_with_conditions( key, value, if_match=if_match, if_none_match=if_none_match) return self._result_write_success( actual_etag, resulting_etag, value) except ClientError as e: if not_found_error(e): return self._result_for_missing_key(condition, expected_etag) if not conditional_request_failed(e): raise return self._conditional_failure_result( key, expected_etag=expected_etag, retrieve_value=retrieve_value, return_existing_value=True) def _set_item_if_fallback( self, key: NonEmptySafeStrTuple, value: ValueType, condition: ETagConditionFlag, expected_etag: ETagIfExists, retrieve_value: RetrieveValueFlag ) -> ConditionalOperationResult[ValueType]: """Check-then-act path for conditions other than the fast path. Handles jokers (KEEP_CURRENT, DELETE_CURRENT), append_only checks, ETAG_HAS_CHANGED, and ANY_ETAG conditions. """ actual_etag = self._actual_etag(key) if self.append_only and value is not KEEP_CURRENT: if actual_etag is not ITEM_NOT_AVAILABLE: raise MutationPolicyError("append-only") satisfied = self._check_condition(condition, expected_etag, actual_etag) if not satisfied: if actual_etag is ITEM_NOT_AVAILABLE: return self._result_item_not_available(False) if retrieve_value is NEVER_RETRIEVE or ( retrieve_value is IF_ETAG_CHANGED and expected_etag == actual_etag): existing_value = VALUE_NOT_RETRIEVED else: existing_value = self[key] return self._result_unchanged( False, actual_etag, existing_value) # Handle joker values before attempting S3 write if value is KEEP_CURRENT: if actual_etag is ITEM_NOT_AVAILABLE: return self._result_item_not_available(True) if retrieve_value is NEVER_RETRIEVE or ( retrieve_value is IF_ETAG_CHANGED and expected_etag == actual_etag): existing_value = VALUE_NOT_RETRIEVED else: existing_value = self[key] return self._result_unchanged( True, actual_etag, existing_value) if value is DELETE_CURRENT: if actual_etag is ITEM_NOT_AVAILABLE: return self._result_item_not_available(satisfied) if not type(self)._conditional_delete_probed: self._probe_conditional_delete() if type(self)._conditional_delete_supported: obj_name = self._build_full_objectname(key) try: self.s3_client.delete_object( Bucket=self.bucket_name, Key=obj_name, IfMatch=actual_etag) return self._result_delete_success(actual_etag) except ClientError as e: if not_found_error(e) or conditional_request_failed(e): return self._conditional_failure_result( key, retrieve_value=NEVER_RETRIEVE, return_existing_value=False) raise else: if not self.discard(key): return self._result_item_not_available(satisfied) return self._result_delete_success(actual_etag) # Attempt conditional write when possible if condition in (ETAG_IS_THE_SAME, ETAG_HAS_CHANGED): if_match, if_none_match = self._compute_conditional_headers( condition, expected_etag, actual_etag) try: resulting_etag = self._put_object_with_conditions( key, value, if_match=if_match, if_none_match=if_none_match) return self._result_write_success( actual_etag, resulting_etag, value) except ClientError as e: if conditional_request_failed(e): return self._conditional_failure_result( key, expected_etag=expected_etag, retrieve_value=retrieve_value, return_existing_value=True) raise # For ANY_ETAG: unconditional write resulting_etag = self._put_object_get_etag(key, value) return self._result_write_success( actual_etag, resulting_etag, value) def _probe_conditional_delete(self) -> None: """Test whether the S3 backend enforces IfMatch on delete_object. Some S3-compatible backends (e.g. moto) silently ignore conditional headers on deletes. This probe runs once per class and caches the result so the fast path is only used when the backend supports it. """ cls = type(self) if cls._conditional_delete_probed: return probe_key = self.root_prefix + "__persidict_probe__" try: self.s3_client.put_object( Bucket=self.bucket_name, Key=probe_key, Body=b"probe") try: self.s3_client.delete_object( Bucket=self.bucket_name, Key=probe_key, IfMatch='"__wrong_etag__"') # Delete succeeded with wrong IfMatch — backend doesn't enforce it cls._conditional_delete_supported = False except ClientError: # Backend correctly rejected the mismatched IfMatch cls._conditional_delete_supported = True self.s3_client.delete_object( Bucket=self.bucket_name, Key=probe_key) except ClientError: cls._conditional_delete_supported = False cls._conditional_delete_probed = True def _probe_if_none_match_etag(self) -> None: """Test whether the S3 backend enforces IfNoneMatch with a specific ETag. Some S3-compatible backends (e.g. moto) only enforce IfNoneMatch: * (wildcard) but silently ignore IfNoneMatch with a concrete ETag value. This probe runs once per class so the ETAG_HAS_CHANGED fast path is only used when the backend rejects a put whose current ETag matches the IfNoneMatch value. """ cls = type(self) if cls._if_none_match_etag_probed: return probe_key = self.root_prefix + "__persidict_probe_inm__" try: resp = self.s3_client.put_object( Bucket=self.bucket_name, Key=probe_key, Body=b"probe") probe_etag = resp["ETag"] try: # IfNoneMatch with the object's own ETag — should be rejected self.s3_client.put_object( Bucket=self.bucket_name, Key=probe_key, Body=b"probe2", IfNoneMatch=probe_etag) # Write succeeded despite matching ETag — backend is broken cls._if_none_match_etag_supported = False except ClientError: # Backend correctly rejected the matching IfNoneMatch cls._if_none_match_etag_supported = True # Clean up probe object try: self.s3_client.delete_object( Bucket=self.bucket_name, Key=probe_key) except ClientError: pass except ClientError: cls._if_none_match_etag_supported = False cls._if_none_match_etag_probed = True
[docs] def discard_if( self, key: NonEmptyPersiDictKey, *, condition: ETagConditionFlag, expected_etag: ETagIfExists ) -> ConditionalOperationResult[ValueType]: """Discard a key only if an ETag condition is satisfied. Uses S3 conditional delete with IfMatch to guard against concurrent changes for all condition types. Args: key: Dictionary key. condition: ANY_ETAG, ETAG_IS_THE_SAME, or ETAG_HAS_CHANGED. expected_etag: The caller's expected ETag, or ITEM_NOT_AVAILABLE. Returns: ConditionalOperationResult with the outcome. """ key = NonEmptySafeStrTuple(key) # Fast path: ETAG_IS_THE_SAME with a real ETag — skip the HEAD. # Requires S3 backend to enforce IfMatch on delete_object. if (not self.append_only and condition is ETAG_IS_THE_SAME and not isinstance(expected_etag, ItemNotAvailableFlag)): if not type(self)._conditional_delete_probed: self._probe_conditional_delete() if type(self)._conditional_delete_supported: return self._discard_if_fast_path( key, condition, expected_etag) return self._discard_if_fallback( key, condition, expected_etag)
def _discard_if_fast_path( self, key: NonEmptySafeStrTuple, condition: ETagConditionFlag, expected_etag: ETagIfExists ) -> ConditionalOperationResult[ValueType]: """Optimistic S3 conditional delete for ETAG_IS_THE_SAME. Attempts a single S3 delete with IfMatch header, avoiding a separate ETag check round-trip. """ obj_name = self._build_full_objectname(key) try: self.s3_client.delete_object( Bucket=self.bucket_name, Key=obj_name, IfMatch=expected_etag) return self._result_delete_success(expected_etag) except ClientError as e: if not_found_error(e): return self._result_for_missing_key(condition, expected_etag) if not conditional_request_failed(e): raise return self._conditional_failure_result( key, retrieve_value=NEVER_RETRIEVE, return_existing_value=False) def _discard_if_fallback( self, key: NonEmptySafeStrTuple, condition: ETagConditionFlag, expected_etag: ETagIfExists ) -> ConditionalOperationResult[ValueType]: """Check-then-delete path for conditions other than the fast path. Handles ETAG_HAS_CHANGED, ANY_ETAG, ETAG_IS_THE_SAME with ITEM_NOT_AVAILABLE, and append_only mode. """ actual_etag = self._actual_etag(key) satisfied = self._check_condition(condition, expected_etag, actual_etag) if actual_etag is ITEM_NOT_AVAILABLE: return self._result_item_not_available(satisfied) if not satisfied: return self._result_unchanged( False, actual_etag, VALUE_NOT_RETRIEVED) if self.append_only: raise MutationPolicyError("append-only") # Atomic delete: guard against concurrent changes since the HEAD obj_name = self._build_full_objectname(key) try: self.s3_client.delete_object( Bucket=self.bucket_name, Key=obj_name, IfMatch=actual_etag) return self._result_delete_success(actual_etag) except ClientError as e: if not_found_error(e) or conditional_request_failed(e): return self._conditional_failure_result( key, retrieve_value=NEVER_RETRIEVE, return_existing_value=False) raise
[docs] def setdefault_if( self, key: NonEmptyPersiDictKey, *, default_value: ValueType, condition: ETagConditionFlag, expected_etag: ETagIfExists, retrieve_value: RetrieveValueFlag = IF_ETAG_CHANGED ) -> ConditionalOperationResult[ValueType]: """Insert default_value if key is absent; conditioned on ETag check. Uses S3 conditional put (IfNoneMatch: ``*``) for atomic insert-if-absent when the key is absent and the condition is satisfied, avoiding the TOCTOU race in the base class. Args: key: Dictionary key. default_value: Value to insert if the key is absent and the condition is satisfied. condition: ANY_ETAG, ETAG_IS_THE_SAME, or ETAG_HAS_CHANGED. expected_etag: The caller's expected ETag, or ITEM_NOT_AVAILABLE if the caller believes the key is absent. retrieve_value: Controls value retrieval when the key exists. IF_ETAG_CHANGED (default) fetches only if expected_etag != actual_etag. ALWAYS_RETRIEVE fetches the existing value. NEVER_RETRIEVE returns VALUE_NOT_RETRIEVED. Returns: ConditionalOperationResult with the outcome of the operation. """ if isinstance(default_value, Joker): raise TypeError("default_value must be a regular value, not a Joker command") self._validate_retrieve_value(retrieve_value) key = NonEmptySafeStrTuple(key) self._validate_value(default_value) actual_etag = self._actual_etag(key) satisfied = self._check_condition(condition, expected_etag, actual_etag) if actual_etag is not ITEM_NOT_AVAILABLE: if retrieve_value is NEVER_RETRIEVE or ( retrieve_value is IF_ETAG_CHANGED and expected_etag == actual_etag): existing_value = VALUE_NOT_RETRIEVED else: existing_value = self[key] return self._result_unchanged( satisfied, actual_etag, existing_value) if not satisfied: return self._result_item_not_available(False) # Key is absent and condition is satisfied — atomic insert try: resulting_etag = self._put_object_with_conditions( key, default_value, if_none_match="*") return self._result_write_success( ITEM_NOT_AVAILABLE, resulting_etag, default_value) except ClientError as e: if conditional_request_failed(e): # Concurrent writer inserted the key between our check # and our put — treat as "key already exists" actual_etag = self._actual_etag(key) if actual_etag is ITEM_NOT_AVAILABLE: return self._result_item_not_available(satisfied) if retrieve_value is NEVER_RETRIEVE: existing_value = VALUE_NOT_RETRIEVED else: # ALWAYS_RETRIEVE and IF_ETAG_CHANGED both fetch here: # the key went from absent to present, so the ETag # definitionally changed. try: existing_value = self[key] except KeyError: # Key deleted between etag check and read — # report as absent. return self._result_item_not_available( satisfied) return self._result_unchanged( satisfied, actual_etag, existing_value) raise
def __setitem__(self, key: NonEmptyPersiDictKey, value: ValueType | Joker) -> None: """Store a value for a key directly in S3. Handles special joker values (KEEP_CURRENT, DELETE_CURRENT) for conditional operations. Validates value types against base_class_for_values if specified, then serializes and uploads directly to S3. When append_only is True, uses ``setdefault_if`` with S3 conditional headers (IfNoneMatch: ``*``) for atomic insert-if-absent, avoiding the TOCTOU race of a separate existence check followed by an unconditional write. Args: key: Dictionary key (string or sequence of strings or NonEmptyPersiDictKey). value: Value to store, or a joker command (KEEP_CURRENT or DELETE_CURRENT). Raises: MutationPolicyError: If attempting to modify an existing item when append_only is True. TypeError: If value is a PersiDict instance or does not match the required base_class_for_values when specified. """ key = NonEmptySafeStrTuple(key) if self._process_setitem_args(key, value) is EXECUTION_IS_COMPLETE: return if self.append_only: result = self.setdefault_if( key, default_value=value, condition=ANY_ETAG, expected_etag=ITEM_NOT_AVAILABLE, retrieve_value=NEVER_RETRIEVE, ) if not result.value_was_mutated: raise MutationPolicyError("append-only") return self._put_object_get_etag(key, value) def __delitem__(self, key: NonEmptyPersiDictKey): """Delete the stored value for a key from S3. Args: key: Dictionary key (string or sequence of strings or NonEmptyPersiDictKey). Raises: MutationPolicyError: If append_only is True. KeyError: If the key does not exist. """ key = NonEmptySafeStrTuple(key) self._process_delitem_args(key) # _process_delitem_args already verified existence via __contains__ # (HEAD request), so skip the duplicate HEAD in _remove_item and # call delete_object directly. self._delete_object_ignoring_not_found(key) def _delete_object_ignoring_not_found( self, key: NonEmptySafeStrTuple) -> None: """Issue an S3 DeleteObject, silently ignoring 404/NoSuchKey.""" obj_name = self._build_full_objectname(key) try: self.s3_client.delete_object(Bucket=self.bucket_name, Key=obj_name) except ClientError as e: if not_found_error(e): pass else: raise def _remove_item(self, key: NonEmptySafeStrTuple) -> None: """Remove the S3 object for *key*. S3 ``delete_object`` is idempotent (returns 204 even for missing keys), so a HEAD check is performed first to raise ``KeyError`` when the key is absent — matching the contract that ``discard`` relies on. Raises: KeyError: If the object does not exist. """ obj_name = self._build_full_objectname(key) try: self.s3_client.head_object(Bucket=self.bucket_name, Key=obj_name) except ClientError as e: if not_found_error(e): raise KeyError(key) from e raise self._delete_object_ignoring_not_found(key) def __len__(self) -> int: """Return the number of key-value pairs in the dictionary. Warning: This operation can be very slow and expensive on large S3 buckets as it must paginate through all objects under the dictionary's prefix. Avoid using in performance-critical code. Returns: Number of stored items under this dictionary's root_prefix. """ num_files = 0 suffix = "." + self.serialization_format paginator = self.s3_client.get_paginator("list_objects_v2") page_iterator = paginator.paginate( Bucket=self.bucket_name, Prefix=self.root_prefix) for page in page_iterator: contents = page.get("Contents") if not contents: continue for key in contents: obj_name = key["Key"] if obj_name.endswith(suffix): num_files += 1 return num_files def _generic_iter(self, result_type: set[str]): """Underlying implementation for items(), keys(), and values() iterators. Paginates through S3 objects under the configured root_prefix and yields keys, values, and/or timestamps according to the requested result_type. S3 object keys are converted to SafeStrTuple instances by removing the file extension and reversing digest-based signing if enabled. Args: result_type: Non-empty subset of {"keys", "values", "timestamps"} specifying which fields to yield from each dictionary entry. Returns: A generator that yields: - SafeStrTuple if result_type == {"keys"} - Any if result_type == {"values"} - tuple[SafeStrTuple, Any] if result_type == {"keys", "values"} - tuple including float timestamp if "timestamps" requested Raises: TypeError: If base_class_for_values is set and a yielded value does not match it. ValueError: If result_type is invalid (empty, not a set, or contains unsupported field names). """ self._process_generic_iter_args(result_type) suffix = "." + self.serialization_format ext_len = len(self.serialization_format) + 1 prefix_len = len(self.root_prefix) def splitter(full_name: str) -> SafeStrTuple: """Convert an S3 object key into a SafeStrTuple without the file extension. Args: full_name: Complete S3 object key including root_prefix and extension. Returns: The parsed key components with digest signatures intact. Raises: ValueError: If the object key does not start with this dictionary's root_prefix (indicating it's outside the dictionary's scope). """ if not full_name.startswith(self.root_prefix): raise ValueError( f"S3 object key '{full_name}' is outside of root_prefix '{self.root_prefix}'" ) result = full_name[prefix_len:-ext_len].split(sep="/") return SafeStrTuple(result) def step(): """Generator that paginates through S3 objects and yields requested data. Yields dictionary entries (keys, values, timestamps) according to the result_type specification from the parent _generic_iter method. """ paginator = self.s3_client.get_paginator("list_objects_v2") page_iterator = paginator.paginate( Bucket=self.bucket_name, Prefix=self.root_prefix) for page in page_iterator: contents = page.get("Contents") if not contents: continue for key in contents: obj_name = key["Key"] if not obj_name.endswith(suffix): continue obj_key = splitter(obj_name) unsigned_key = unsign_safe_str_tuple( obj_key, 0) value_to_return = None if "values" in result_type: # The object can be deleted between listing and fetching. # Skip such races instead of raising to make iteration robust. try: value_to_return = self[unsigned_key] except KeyError: continue timestamp_to_return = None if "timestamps" in result_type: timestamp_to_return = key["LastModified"].timestamp() yield self._assemble_iter_result( result_type , key=unsigned_key , value=value_to_return , timestamp=timestamp_to_return) return step()
[docs] def get_subdict(self, prefix_key:PersiDictKey) -> 'BasicS3Dict[ValueType]': """Create a subdictionary scoped to items with the specified prefix. Returns an empty subdictionary if no items exist under the prefix. If the prefix is empty, the entire dictionary is returned. This method is not part of the standard Python dictionary interface. Args: prefix_key: A common prefix (string or sequence of strings or SafeStrTuple) used to scope items stored under this dictionary. Returns: A new BasicS3Dict instance with root_prefix extended by the given prefix_key, sharing the parent's bucket, region, serialization_format, and other configuration settings. """ prefix_key = SafeStrTuple(prefix_key) if len(prefix_key): prefix_key = sign_safe_str_tuple(prefix_key, 0) full_root_prefix = self.root_prefix + "/".join(prefix_key) else: full_root_prefix = self.root_prefix new_dict = BasicS3Dict( bucket_name=self.bucket_name, region=self.region, root_prefix=full_root_prefix, serialization_format=self.serialization_format, append_only=self.append_only, base_class_for_values=self.base_class_for_values) return new_dict
[docs] def timestamp(self, key: NonEmptyPersiDictKey) -> float: """Get the last modification timestamp for a key. This method is not part of the standard Python dictionary interface. Args: key: Dictionary key (string or sequence of strings or NonEmptySafeStrTuple). Returns: POSIX timestamp (seconds since Unix epoch) of the last modification time as reported by S3. The timestamp is timezone-aware and converted to UTC. Raises: KeyError: If the key does not exist in S3. """ key = NonEmptySafeStrTuple(key) obj_name = self._build_full_objectname(key) try: response = self.s3_client.head_object(Bucket=self.bucket_name, Key=obj_name) return response["LastModified"].timestamp() except ClientError as e: if not_found_error(e): raise KeyError(key) from e else: raise