Skip to content

Commit

Permalink
Merge pull request #165 from liormizr/fix_for_issue_164
Browse files Browse the repository at this point in the history
fix issue #164
  • Loading branch information
liormizr authored Apr 11, 2024
2 parents b3c1c75 + ead44a9 commit bc86432
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 127 deletions.
176 changes: 56 additions & 120 deletions s3path/accessor.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
import sys
import importlib.util
from os import stat_result
from threading import Lock
from itertools import chain
from functools import lru_cache
from contextlib import suppress
from collections import namedtuple
from io import UnsupportedOperation

import boto3
from boto3.s3.transfer import TransferManager
from botocore.exceptions import ClientError
from botocore.docs.docstring import LazyLoadedDocstring
import smart_open

def _lazy_import_resources(name):
spec = importlib.util.find_spec(name)
loader = importlib.util.LazyLoader(spec.loader)
spec.loader = loader
module = importlib.util.module_from_spec(spec)
sys.modules[name] = module
loader.exec_module(module)
return module


boto3 = _lazy_import_resources('boto3')
smart_open = _lazy_import_resources('smart_open')
# For Development on Cli, or in general application that require fast startup
# This will lazy load boto3 resources
# boto3 increase startup time by X10!


class StatResult(namedtuple('BaseStatResult', 'size, last_modified, version_id', defaults=(None,))):
Expand Down Expand Up @@ -83,6 +97,7 @@ def rename(path, target):
target_key_name = target.key

resource, config = configuration_map.get_configuration(path)
allowed_copy_args = boto3.s3.transfer.TransferManager.ALLOWED_COPY_ARGS

if not is_dir(path):
target_bucket = resource.Bucket(target_bucket_name)
Expand All @@ -92,10 +107,10 @@ def rename(path, target):
target_bucket.copy,
config=config,
args=(old_source, target_key_name),
allowed_extra_args=TransferManager.ALLOWED_COPY_ARGS,
)
allowed_extra_args=allowed_copy_args)
_boto3_method_with_parameters(object_summary.delete)
return

bucket = resource.Bucket(source_bucket_name)
target_bucket = resource.Bucket(target_bucket_name)
for object_summary in bucket.objects.filter(Prefix=source_key_name):
Expand All @@ -106,8 +121,7 @@ def rename(path, target):
target_bucket.copy,
config=config,
args=(old_source, new_key),
allowed_extra_args=TransferManager.ALLOWED_COPY_ARGS,
)
allowed_extra_args=allowed_copy_args)
_boto3_method_with_parameters(object_summary.delete)


Expand Down Expand Up @@ -159,12 +173,13 @@ def exists(path):
kwargs={'Bucket': bucket_name},
config=config)
return True
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
# Not found
return False
raise e
except Exception as client_error:
with suppress(AttributeError, KeyError):
error_code = client_error.response['Error']['Code']
if error_code == '404':
# Not found
return False
raise client_error

bucket = resource.Bucket(bucket_name)
key_name = str(path.key)
Expand Down Expand Up @@ -253,38 +268,34 @@ def listdir(path):
def open(path, *, mode='r', buffering=-1, encoding=None, errors=None, newline=None):
resource, config = configuration_map.get_configuration(path)

if smart_open.__version__ < '4.0.0' and mode.startswith('b'):
mode = ''.join(reversed(mode))
smart_open_kwargs = {
'uri': "s3:/" + str(path),
'mode': mode,
'buffering': buffering,
'encoding': encoding,
'errors': errors,
'newline': newline,
}
dummy_object = resource.Object('bucket', 'key')
get_object_kwargs = _update_kwargs_with_config(
dummy_object.meta.client.get_object, config=config)
create_multipart_upload_kwargs = _update_kwargs_with_config(
dummy_object.meta.client.create_multipart_upload, config=config)


transport_params = {'defer_seek': True}
if _is_versioned_path(path):
transport_params['version_id'] = path.version_id
dummy_object = resource.Object('bucket', 'key')

if smart_open.__version__ >= '5.1.0':
_smart_open_new_version_kwargs(
dummy_object,
resource,
config,
transport_params,
smart_open_kwargs)
else:
_smart_open_old_version_kwargs(
dummy_object,
resource,
config,
transport_params,
smart_open_kwargs)

file_object = smart_open.open(**smart_open_kwargs)
return file_object
transport_params.update(
client=resource.meta.client,
client_kwargs={
'S3.Client.get_object': get_object_kwargs,
'S3.Client.create_multipart_upload': create_multipart_upload_kwargs,
},
)

return smart_open.open(
uri="s3:/" + str(path),
mode=mode,
buffering=buffering,
encoding=encoding,
errors=errors,
newline=newline,
compression='disable',
transport_params=transport_params)


def get_presigned_url(path, expire_in: int) -> str:
Expand Down Expand Up @@ -329,80 +340,6 @@ def _is_versioned_path(path):
return hasattr(path, 'version_id') and bool(path.version_id)


def _smart_open_new_version_kwargs(
dummy_object,
resource,
config,
transport_params,
smart_open_kwargs):
"""
New Smart-Open api
Doc: https://github.com/RaRe-Technologies/smart_open/blob/develop/MIGRATING_FROM_OLDER_VERSIONS.rst
"""
get_object_kwargs = _update_kwargs_with_config(
dummy_object.meta.client.get_object, config=config)
create_multipart_upload_kwargs = _update_kwargs_with_config(
dummy_object.meta.client.create_multipart_upload, config=config)
transport_params.update(
client=resource.meta.client,
client_kwargs={
'S3.Client.create_multipart_upload': create_multipart_upload_kwargs,
'S3.Client.get_object': get_object_kwargs
},
)
smart_open_kwargs.update(
compression='disable',
transport_params=transport_params,
)


def _smart_open_old_version_kwargs(
dummy_object,
resource,
config,
transport_params,
smart_open_kwargs):
"""
Old Smart-Open api
<5.0.0
"""
def get_resource_kwargs():
# This is a good example of the complicity of boto3 and botocore
# resource arguments from the resource object :-/
# very annoying...

try:
access_key = resource.meta.client._request_signer._credentials.access_key
secret_key = resource.meta.client._request_signer._credentials.secret_key
token = resource.meta.client._request_signer._credentials.token
except AttributeError:
access_key = secret_key = token = None
return {
'endpoint_url': resource.meta.client.meta._endpoint_url,
'config': resource.meta.client._client_config,
'region_name': resource.meta.client._client_config.region_name,
'use_ssl': resource.meta.client._endpoint.host.startswith('https'),
'verify': resource.meta.client._endpoint.http_session._verify,
'aws_access_key_id': access_key,
'aws_secret_access_key': secret_key,
'aws_session_token': token,
}

initiate_multipart_upload_kwargs = _update_kwargs_with_config(
dummy_object.initiate_multipart_upload, config=config)
object_kwargs = _update_kwargs_with_config(dummy_object.get, config=config)
transport_params.update(
multipart_upload_kwargs=initiate_multipart_upload_kwargs,
object_kwargs=object_kwargs,
resource_kwargs=get_resource_kwargs(),
session=boto3.DEFAULT_SESSION,
)
smart_open_kwargs.update(
ignore_ext=True,
transport_params=transport_params,
)


def _update_kwargs_with_config(boto3_method, config, kwargs=None):
kwargs = kwargs or {}
if config is not None:
Expand Down Expand Up @@ -440,10 +377,9 @@ def _boto3_method_with_extraargs(

@lru_cache()
def _get_action_arguments(action):
if isinstance(action.__doc__, LazyLoadedDocstring):
docs = action.__doc__
with suppress(AttributeError):
docs = action.__doc__._generate()
else:
docs = action.__doc__
return set(
line.replace(':param ', '').strip().strip(':')
for line in docs.splitlines()
Expand Down
15 changes: 8 additions & 7 deletions s3path/current_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import re
import sys
import typing
import fnmatch
import posixpath
from datetime import timedelta
Expand All @@ -11,9 +12,12 @@
from typing import Union, Literal, Optional
from io import DEFAULT_BUFFER_SIZE, TextIOWrapper

import smart_open
from botocore.exceptions import ClientError
from boto3.resources.factory import ServiceResource

if typing.TYPE_CHECKING:
import smart_open
from boto3.resources.factory import ServiceResource
KeyFileObjectType = Union[TextIOWrapper, smart_open.s3.Reader, smart_open.s3.MultipartWriter]

from . import accessor

Expand Down Expand Up @@ -431,18 +435,15 @@ def iterdir(self): # todo: -> Generator[S3Path, None, None]:

def open(
self,
mode: Literal["r", "w", "rb", "wb"] = 'r',
mode: Literal['r', 'w', 'rb', 'wb'] = 'r',
buffering: int = DEFAULT_BUFFER_SIZE,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None
) -> Union[TextIOWrapper, smart_open.s3.Reader, smart_open.s3.MultipartWriter]:
newline: Optional[str] = None) -> KeyFileObjectType:
"""
Opens the Bucket key pointed to by the path, returns a Key file object that you can read/write with
"""
self._absolute_path_validation()
if smart_open.__version__ < '4.0.0' and mode.startswith('b'):
mode = ''.join(reversed(mode))
return accessor.open(
self,
mode=mode,
Expand Down

0 comments on commit bc86432

Please sign in to comment.