mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-01-10 23:38:08 -05:00
AWS necessary changes only (#6375)
Co-authored-by: Engel Nyst <enyst@users.noreply.github.com>
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
from urllib.parse import parse_qs
|
||||
|
||||
import jwt
|
||||
from pydantic import SecretStr
|
||||
from socketio.exceptions import ConnectionRefusedError
|
||||
|
||||
from openhands.core.logger import openhands_logger as logger
|
||||
@@ -39,9 +40,13 @@ async def connect(connection_id: str, environ, auth):
|
||||
raise ConnectionRefusedError('No github_auth cookie')
|
||||
if not config.jwt_secret:
|
||||
raise RuntimeError('JWT secret not found')
|
||||
decoded = jwt.decode(
|
||||
signed_token, config.jwt_secret.get_secret_value(), algorithms=['HS256']
|
||||
|
||||
jwt_secret = (
|
||||
config.jwt_secret.get_secret_value()
|
||||
if isinstance(config.jwt_secret, SecretStr)
|
||||
else config.jwt_secret
|
||||
)
|
||||
decoded = jwt.decode(signed_token, jwt_secret, algorithms=['HS256'])
|
||||
user_id = decoded['github_user_id']
|
||||
|
||||
logger.info(f'User {user_id} is connecting to conversation {conversation_id}')
|
||||
|
||||
@@ -11,7 +11,7 @@ def get_file_store(file_store: str, file_store_path: str | None = None) -> FileS
|
||||
raise ValueError('file_store_path is required for local file store')
|
||||
return LocalFileStore(file_store_path)
|
||||
elif file_store == 's3':
|
||||
return S3FileStore()
|
||||
return S3FileStore(file_store_path)
|
||||
elif file_store == 'google_cloud':
|
||||
return GoogleCloudFileStore(file_store_path)
|
||||
return InMemoryFileStore()
|
||||
|
||||
@@ -1,50 +1,130 @@
|
||||
import io
|
||||
import os
|
||||
|
||||
from minio import Minio
|
||||
import boto3
|
||||
import botocore
|
||||
|
||||
from openhands.storage.files import FileStore
|
||||
|
||||
|
||||
class S3FileStore(FileStore):
|
||||
def __init__(self) -> None:
|
||||
def __init__(self, bucket_name: str | None) -> None:
|
||||
access_key = os.getenv('AWS_ACCESS_KEY_ID')
|
||||
secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')
|
||||
endpoint = os.getenv('AWS_S3_ENDPOINT', 's3.amazonaws.com')
|
||||
secure = os.getenv('AWS_S3_SECURE', 'true').lower() == 'true'
|
||||
self.bucket = os.getenv('AWS_S3_BUCKET')
|
||||
self.client = Minio(endpoint, access_key, secret_key, secure=secure)
|
||||
endpoint = self._ensure_url_scheme(secure, os.getenv('AWS_S3_ENDPOINT'))
|
||||
if bucket_name is None:
|
||||
bucket_name = os.environ['AWS_S3_BUCKET']
|
||||
self.bucket = bucket_name
|
||||
self.client = boto3.client(
|
||||
's3',
|
||||
aws_access_key_id=access_key,
|
||||
aws_secret_access_key=secret_key,
|
||||
endpoint_url=endpoint,
|
||||
use_ssl=secure,
|
||||
)
|
||||
|
||||
def write(self, path: str, contents: str | bytes) -> None:
|
||||
as_bytes = contents.encode('utf-8') if isinstance(contents, str) else contents
|
||||
stream = io.BytesIO(as_bytes)
|
||||
try:
|
||||
self.client.put_object(self.bucket, path, stream, len(as_bytes))
|
||||
except Exception as e:
|
||||
raise FileNotFoundError(f'Failed to write to S3 at path {path}: {e}')
|
||||
as_bytes = (
|
||||
contents.encode('utf-8') if isinstance(contents, str) else contents
|
||||
)
|
||||
self.client.put_object(Bucket=self.bucket, Key=path, Body=as_bytes)
|
||||
except botocore.exceptions.ClientError as e:
|
||||
if e.response['Error']['Code'] == 'AccessDenied':
|
||||
raise FileNotFoundError(
|
||||
f"Error: Access denied to bucket '{self.bucket}'."
|
||||
)
|
||||
elif e.response['Error']['Code'] == 'NoSuchBucket':
|
||||
raise FileNotFoundError(
|
||||
f"Error: The bucket '{self.bucket}' does not exist."
|
||||
)
|
||||
raise FileNotFoundError(
|
||||
f"Error: Failed to write to bucket '{self.bucket}' at path {path}: {e}"
|
||||
)
|
||||
|
||||
def read(self, path: str) -> str:
|
||||
try:
|
||||
return self.client.get_object(self.bucket, path).data.decode('utf-8')
|
||||
response = self.client.get_object(Bucket=self.bucket, Key=path)
|
||||
return response['Body'].read().decode('utf-8')
|
||||
except botocore.exceptions.ClientError as e:
|
||||
# Catch all S3-related errors
|
||||
if e.response['Error']['Code'] == 'NoSuchBucket':
|
||||
raise FileNotFoundError(
|
||||
f"Error: The bucket '{self.bucket}' does not exist."
|
||||
)
|
||||
elif e.response['Error']['Code'] == 'NoSuchKey':
|
||||
raise FileNotFoundError(
|
||||
f"Error: The object key '{path}' does not exist in bucket '{self.bucket}'."
|
||||
)
|
||||
else:
|
||||
raise FileNotFoundError(
|
||||
f"Error: Failed to read from bucket '{self.bucket}' at path {path}: {e}"
|
||||
)
|
||||
except Exception as e:
|
||||
raise FileNotFoundError(f'Failed to read from S3 at path {path}: {e}')
|
||||
raise FileNotFoundError(
|
||||
f"Error: Failed to read from bucket '{self.bucket}' at path {path}: {e}"
|
||||
)
|
||||
|
||||
def list(self, path: str) -> list[str]:
|
||||
if path and path != '/' and not path.endswith('/'):
|
||||
path += '/'
|
||||
try:
|
||||
return [
|
||||
obj.object_name for obj in self.client.list_objects(self.bucket, path)
|
||||
]
|
||||
response = self.client.list_objects_v2(Bucket=self.bucket, Prefix=path)
|
||||
# Check if 'Contents' exists in the response
|
||||
if 'Contents' in response:
|
||||
objects = [obj['Key'] for obj in response['Contents']]
|
||||
return objects
|
||||
else:
|
||||
return list()
|
||||
except botocore.exceptions.ClientError as e:
|
||||
# Catch all S3-related errors
|
||||
if e.response['Error']['Code'] == 'NoSuchBucket':
|
||||
raise FileNotFoundError(
|
||||
f"Error: The bucket '{self.bucket}' does not exist."
|
||||
)
|
||||
elif e.response['Error']['Code'] == 'AccessDenied':
|
||||
raise FileNotFoundError(
|
||||
f"Error: Access denied to bucket '{self.bucket}'."
|
||||
)
|
||||
else:
|
||||
raise FileNotFoundError(f"Error: {e.response['Error']['Message']}")
|
||||
except Exception as e:
|
||||
raise FileNotFoundError(f'Failed to list S3 objects at path {path}: {e}')
|
||||
raise FileNotFoundError(
|
||||
f"Error: Failed to read from bucket '{self.bucket}' at path {path}: {e}"
|
||||
)
|
||||
|
||||
def delete(self, path: str) -> None:
|
||||
try:
|
||||
client = self.client
|
||||
bucket = self.bucket
|
||||
objects_to_delete = client.list_objects(bucket, prefix=path, recursive=True)
|
||||
for obj in objects_to_delete:
|
||||
client.remove_object(bucket, obj.object_name)
|
||||
self.client.delete_object(Bucket=self.bucket, Key=path)
|
||||
except botocore.exceptions.ClientError as e:
|
||||
if e.response['Error']['Code'] == 'NoSuchBucket':
|
||||
raise FileNotFoundError(
|
||||
f"Error: The bucket '{self.bucket}' does not exist."
|
||||
)
|
||||
elif e.response['Error']['Code'] == 'AccessDenied':
|
||||
raise FileNotFoundError(
|
||||
f"Error: Access denied to bucket '{self.bucket}'."
|
||||
)
|
||||
elif e.response['Error']['Code'] == 'NoSuchKey':
|
||||
raise FileNotFoundError(
|
||||
f"Error: The object key '{path}' does not exist in bucket '{self.bucket}'."
|
||||
)
|
||||
else:
|
||||
raise FileNotFoundError(
|
||||
f"Error: Failed to delete key '{path}' from bucket '{self.bucket}': {e}"
|
||||
)
|
||||
except Exception as e:
|
||||
raise FileNotFoundError(f'Failed to delete S3 object at path {path}: {e}')
|
||||
raise FileNotFoundError(
|
||||
f"Error: Failed to delete key '{path}' from bucket '{self.bucket}: {e}"
|
||||
)
|
||||
|
||||
def _ensure_url_scheme(self, secure: bool, url: str | None) -> str | None:
|
||||
if not url:
|
||||
return None
|
||||
if secure:
|
||||
if not url.startswith('https://'):
|
||||
url = 'https://' + url.removeprefix('http://')
|
||||
else:
|
||||
if not url.startswith('http://'):
|
||||
url = 'http://' + url.removeprefix('https://')
|
||||
return url
|
||||
|
||||
Reference in New Issue
Block a user