diff --git a/Cargo.toml b/Cargo.toml index bcaf1876b..18cce064b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,9 +37,9 @@ testing = ["dep:axum-test", "dep:scraper"] with-db = ["dep:sea-orm", "dep:sea-orm-migration", "loco-gen/with-db"] # Storage features all_storage = ["storage_aws_s3", "storage_azure", "storage_gcp"] -storage_aws_s3 = ["object_store/aws"] -storage_azure = ["object_store/azure"] -storage_gcp = ["object_store/gcp"] +storage_aws_s3 = ["opendal/services-s3"] +storage_azure = ["opendal/services-azblob"] +storage_gcp = ["opendal/services-gcs"] # Cache feature cache_inmem = ["dep:moka"] bg_redis = ["dep:rusty-sidekiq", "dep:bb8"] @@ -130,7 +130,7 @@ cfg-if = "1" uuid = { version = "1.10.0", features = ["v4", "fast-rng"] } # File Upload -object_store = { version = "0.11.0", default-features = false } +opendal = { version = "0.50.2", default-features = false,features = ["services-memory","services-fs"] } # cache moka = { version = "0.12.7", features = ["sync"], optional = true } diff --git a/src/storage/drivers/aws.rs b/src/storage/drivers/aws.rs index 5cc2be526..142a10d51 100644 --- a/src/storage/drivers/aws.rs +++ b/src/storage/drivers/aws.rs @@ -1,16 +1,7 @@ -#[cfg(test)] -use core::time::Duration; -use std::sync::Arc; - -use object_store::{ - aws::{AmazonS3Builder, AwsCredential}, - StaticCredentialProvider, -}; -#[cfg(test)] -use object_store::{BackoffConfig, RetryConfig}; +use opendal::{services::S3, Operator}; -use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver}; -use crate::Result; +use super::{opendal_adapter::OpendalAdapter, StoreDriver}; +use crate::storage::StorageResult; /// A set of AWS security credentials #[derive(Debug)] @@ -34,14 +25,10 @@ pub struct Credential { /// # Errors /// /// When could not initialize the client instance -pub fn new(bucket_name: &str, region: &str) -> Result> { - let s3 = AmazonS3Builder::new() - .with_bucket_name(bucket_name) - .with_region(region) - .build() - .map_err(Box::from)?; +pub fn new(bucket_name: &str, region: &str) -> StorageResult> { + let s3 = S3::default().bucket(bucket_name).region(region); - Ok(Box::new(ObjectStoreAdapter::new(Box::new(s3)))) + Ok(Box::new(OpendalAdapter::new(Operator::new(s3)?.finish()))) } /// Create new AWS s3 storage with bucket, region and credentials. @@ -64,18 +51,16 @@ pub fn with_credentials( bucket_name: &str, region: &str, credentials: Credential, -) -> Result> { - let s3 = AmazonS3Builder::new() - .with_bucket_name(bucket_name) - .with_region(region) - .with_credentials(Arc::new(StaticCredentialProvider::new(AwsCredential { - key_id: credentials.key_id.to_string(), - secret_key: credentials.secret_key.to_string(), - token: credentials.token, - }))) - .build() - .map_err(Box::from)?; - Ok(Box::new(ObjectStoreAdapter::new(Box::new(s3)))) +) -> StorageResult> { + let mut s3 = S3::default() + .bucket(bucket_name) + .region(region) + .access_key_id(&credentials.key_id) + .secret_access_key(&credentials.secret_key); + if let Some(token) = credentials.token { + s3 = s3.session_token(&token); + } + Ok(Box::new(OpendalAdapter::new(Operator::new(s3)?.finish()))) } /// Build store with failure @@ -86,15 +71,11 @@ pub fn with_credentials( #[cfg(test)] #[must_use] pub fn with_failure() -> Box { - let s3 = AmazonS3Builder::new() - .with_bucket_name("loco-test") - .with_retry(RetryConfig { - backoff: BackoffConfig::default(), - max_retries: 0, - retry_timeout: Duration::from_secs(0), - }) - .build() - .unwrap(); + let s3 = S3::default() + .bucket("loco-test") + .region("ap-south-1") + .allow_anonymous() + .disable_ec2_metadata(); - Box::new(ObjectStoreAdapter::new(Box::new(s3))) + Box::new(OpendalAdapter::new(Operator::new(s3).unwrap().finish())) } diff --git a/src/storage/drivers/azure.rs b/src/storage/drivers/azure.rs index d2831c6c9..52d75e85e 100644 --- a/src/storage/drivers/azure.rs +++ b/src/storage/drivers/azure.rs @@ -1,7 +1,7 @@ -use object_store::azure::MicrosoftAzureBuilder; +use opendal::{services::Azblob, Operator}; -use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver}; -use crate::Result; +use super::StoreDriver; +use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult}; /// Create new Azure storage. /// @@ -18,13 +18,13 @@ pub fn new( container_name: &str, account_name: &str, access_key: &str, -) -> Result> { - let azure = MicrosoftAzureBuilder::new() - .with_container_name(container_name) - .with_account(account_name) - .with_access_key(access_key) - .build() - .map_err(Box::from)?; +) -> StorageResult> { + let azure = Azblob::default() + .container(container_name) + .account_name(account_name) + .account_key(access_key); - Ok(Box::new(ObjectStoreAdapter::new(Box::new(azure)))) + Ok(Box::new(OpendalAdapter::new( + Operator::new(azure)?.finish(), + ))) } diff --git a/src/storage/drivers/gcp.rs b/src/storage/drivers/gcp.rs index 7c7426342..59f74a117 100644 --- a/src/storage/drivers/gcp.rs +++ b/src/storage/drivers/gcp.rs @@ -1,30 +1,23 @@ -use object_store::gcp::GoogleCloudStorageBuilder; +use opendal::{services::Gcs, Operator}; -use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver}; -use crate::Result; +use super::StoreDriver; +use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult}; /// Create new GCP storage. /// /// # Examples ///``` /// use loco_rs::storage::drivers::gcp; -/// let gcp_driver = gcp::new("key", "account_key", "service_account"); +/// let gcp_driver = gcp::new("key", "credential_path"); /// ``` /// /// # Errors /// /// When could not initialize the client instance -pub fn new( - bucket_name: &str, - service_account_key: &str, - service_account: &str, -) -> Result> { - let gcs = GoogleCloudStorageBuilder::new() - .with_bucket_name(bucket_name) - .with_service_account_key(service_account_key) - .with_service_account_path(service_account) - .build() - .map_err(Box::from)?; +pub fn new(bucket_name: &str, credential_path: &str) -> StorageResult> { + let gcs = Gcs::default() + .bucket(bucket_name) + .credential_path(credential_path); - Ok(Box::new(ObjectStoreAdapter::new(Box::new(gcs)))) + Ok(Box::new(OpendalAdapter::new(Operator::new(gcs)?.finish()))) } diff --git a/src/storage/drivers/local.rs b/src/storage/drivers/local.rs index f46f3a1fa..06d1a8c12 100644 --- a/src/storage/drivers/local.rs +++ b/src/storage/drivers/local.rs @@ -1,7 +1,7 @@ -use object_store::local::LocalFileSystem; +use opendal::{services::Fs, Operator}; -use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver}; -use crate::Result; +use super::StoreDriver; +use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult}; /// Create new filesystem storage with no prefix /// @@ -10,9 +10,18 @@ use crate::Result; /// use loco_rs::storage::drivers::local; /// let file_system_driver = local::new(); /// ``` +/// +/// # Panics +/// +/// Panics if the filesystem service built failed. #[must_use] pub fn new() -> Box { - Box::new(ObjectStoreAdapter::new(Box::new(LocalFileSystem::new()))) + let fs = Fs::default().root("/"); + Box::new(OpendalAdapter::new( + Operator::new(fs) + .expect("fs service should build with success") + .finish(), + )) } /// Create new filesystem storage with `prefix` applied to all paths @@ -26,8 +35,7 @@ pub fn new() -> Box { /// # Errors /// /// Returns an error if the path does not exist -pub fn new_with_prefix(prefix: impl AsRef) -> Result> { - Ok(Box::new(ObjectStoreAdapter::new(Box::new( - LocalFileSystem::new_with_prefix(prefix).map_err(Box::from)?, - )))) +pub fn new_with_prefix(prefix: impl AsRef) -> StorageResult> { + let fs = Fs::default().root(&prefix.as_ref().display().to_string()); + Ok(Box::new(OpendalAdapter::new(Operator::new(fs)?.finish()))) } diff --git a/src/storage/drivers/mem.rs b/src/storage/drivers/mem.rs index 665aca4af..0d0f31410 100644 --- a/src/storage/drivers/mem.rs +++ b/src/storage/drivers/mem.rs @@ -1,6 +1,7 @@ -use object_store::memory::InMemory; +use opendal::{services::Memory, Operator}; -use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver}; +use super::StoreDriver; +use crate::storage::drivers::opendal_adapter::OpendalAdapter; /// Create new in-memory storage. /// @@ -9,7 +10,15 @@ use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver}; /// use loco_rs::storage::drivers::mem; /// let mem_storage = mem::new(); /// ``` +/// +/// # Panics +/// +/// Panics if the memory service built failed. #[must_use] pub fn new() -> Box { - Box::new(ObjectStoreAdapter::new(Box::new(InMemory::new()))) + Box::new(OpendalAdapter::new( + Operator::new(Memory::default()) + .expect("memory service must build with success") + .finish(), + )) } diff --git a/src/storage/drivers/mod.rs b/src/storage/drivers/mod.rs index 7d2ecfe3f..e1aa0ccb2 100644 --- a/src/storage/drivers/mod.rs +++ b/src/storage/drivers/mod.rs @@ -2,6 +2,8 @@ use std::path::Path; use async_trait::async_trait; use bytes::Bytes; +use opendal::Reader; + #[cfg(feature = "storage_aws_s3")] pub mod aws; #[cfg(feature = "storage_azure")] @@ -11,7 +13,7 @@ pub mod gcp; pub mod local; pub mod mem; pub mod null; -pub mod object_store_adapter; +pub mod opendal_adapter; use super::StorageResult; @@ -21,9 +23,28 @@ pub struct UploadResponse { pub version: Option, } -// TODO: need to properly abstract the object_store type in order to not -// strongly depend on it -pub type GetResponse = object_store::GetResult; +/// TODO: Add more methods to `GetResponse` to read the content in different +/// ways +/// +/// For example, we can read a specific range of bytes from the stream. +pub struct GetResponse { + stream: Reader, +} + +impl GetResponse { + pub(crate) fn new(stream: Reader) -> Self { + Self { stream } + } + + /// Read all content from the stream and return as `Bytes`. + /// + /// # Errors + /// + /// Returns a `StorageError` with the reason for the failure. + pub async fn bytes(&self) -> StorageResult { + Ok(self.stream.read(..).await?.to_bytes()) + } +} #[async_trait] pub trait StoreDriver: Sync + Send { diff --git a/src/storage/drivers/object_store_adapter.rs b/src/storage/drivers/object_store_adapter.rs deleted file mode 100644 index 1ad9f0a49..000000000 --- a/src/storage/drivers/object_store_adapter.rs +++ /dev/null @@ -1,97 +0,0 @@ -use std::path::Path; - -use async_trait::async_trait; -use bytes::Bytes; -use object_store::ObjectStore; - -use super::{GetResponse, StoreDriver, UploadResponse}; -use crate::storage::StorageResult; - -pub struct ObjectStoreAdapter { - object_store_impl: Box, -} - -impl ObjectStoreAdapter { - /// Constructor for creating a new `Store` instance. - #[must_use] - pub fn new(object_store_impl: Box) -> Self { - Self { object_store_impl } - } -} - -#[async_trait] -impl StoreDriver for ObjectStoreAdapter { - /// Uploads the content represented by `Bytes` to the specified path in the - /// object store. - /// - /// # Errors - /// - /// Returns a `StorageResult` with the result of the upload operation. - async fn upload(&self, path: &Path, content: &Bytes) -> StorageResult { - let path = object_store::path::Path::from(path.display().to_string()); - let res = self - .object_store_impl - .put(&path, content.clone().into()) - .await?; - Ok(UploadResponse { - e_tag: res.e_tag, - version: res.version, - }) - } - - /// Retrieves the content from the specified path in the object store. - /// - /// # Errors - /// - /// Returns a `StorageResult` with the result of the retrieval operation. - async fn get(&self, path: &Path) -> StorageResult { - let path = object_store::path::Path::from(path.display().to_string()); - Ok(self.object_store_impl.get(&path).await?) - } - - /// Deletes the content at the specified path in the object store. - /// - /// # Errors - /// - /// Returns a `StorageResult` indicating the success of the deletion - /// operation. - async fn delete(&self, path: &Path) -> StorageResult<()> { - let path = object_store::path::Path::from(path.display().to_string()); - Ok(self.object_store_impl.delete(&path).await?) - } - - /// Renames or moves the content from one path to another in the object - /// store. - /// - /// # Errors - /// - /// Returns a `StorageResult` indicating the success of the rename/move - /// operation. - async fn rename(&self, from: &Path, to: &Path) -> StorageResult<()> { - let from = object_store::path::Path::from(from.display().to_string()); - let to = object_store::path::Path::from(to.display().to_string()); - Ok(self.object_store_impl.rename(&from, &to).await?) - } - - /// Copies the content from one path to another in the object store. - /// - /// # Errors - /// - /// Returns a `StorageResult` indicating the success of the copy operation. - async fn copy(&self, from: &Path, to: &Path) -> StorageResult<()> { - let from = object_store::path::Path::from(from.display().to_string()); - let to = object_store::path::Path::from(to.display().to_string()); - Ok(self.object_store_impl.copy(&from, &to).await?) - } - - /// Checks if the content exists at the specified path in the object store. - /// - /// # Errors - /// - /// Returns a `StorageResult` with a boolean indicating the existence of the - /// content. - async fn exists(&self, path: &Path) -> StorageResult { - let path = object_store::path::Path::from(path.display().to_string()); - Ok(self.object_store_impl.get(&path).await.is_ok()) - } -} diff --git a/src/storage/drivers/opendal_adapter.rs b/src/storage/drivers/opendal_adapter.rs new file mode 100644 index 000000000..4153c3c12 --- /dev/null +++ b/src/storage/drivers/opendal_adapter.rs @@ -0,0 +1,144 @@ +use std::path::Path; + +use async_trait::async_trait; +use bytes::Bytes; +use futures_util::SinkExt; +use opendal::{layers::RetryLayer, Operator}; + +use super::{GetResponse, StoreDriver, UploadResponse}; +use crate::storage::{StorageError, StorageResult}; + +pub struct OpendalAdapter { + opendal_impl: Operator, +} + +impl OpendalAdapter { + /// Constructor for creating a new `Store` instance. + #[must_use] + pub fn new(opendal_impl: Operator) -> Self { + let opendal_impl = opendal_impl + // Add retry layer with default settings + .layer(RetryLayer::default().with_jitter()); + Self { opendal_impl } + } +} + +#[async_trait] +impl StoreDriver for OpendalAdapter { + /// Uploads the content represented by `Bytes` to the specified path in the + /// object store. + /// + /// # Errors + /// + /// Returns a `StorageResult` with the result of the upload operation. + async fn upload(&self, path: &Path, content: &Bytes) -> StorageResult { + self.opendal_impl + .write(&path.display().to_string(), content.clone()) + .await?; + // TODO: opendal will return the e_tag and version in the future + Ok(UploadResponse { + e_tag: None, + version: None, + }) + } + + /// Retrieves the content from the specified path in the object store. + /// + /// # Errors + /// + /// Returns a `StorageResult` with the result of the retrieval operation. + async fn get(&self, path: &Path) -> StorageResult { + let r = self + .opendal_impl + .reader(&path.display().to_string()) + .await?; + Ok(GetResponse::new(r)) + } + + /// Deletes the content at the specified path in the object store. + /// + /// # Errors + /// + /// Returns a `StorageResult` indicating the success of the deletion + /// operation. + async fn delete(&self, path: &Path) -> StorageResult<()> { + Ok(self + .opendal_impl + .delete(&path.display().to_string()) + .await?) + } + + /// Renames or moves the content from one path to another in the object + /// store. + /// + /// # Behavior + /// + /// Fallback to copy and delete source if the storage does not support rename. + /// + /// # Errors + /// + /// Returns a `StorageResult` indicating the success of the rename/move + /// operation. + async fn rename(&self, from: &Path, to: &Path) -> StorageResult<()> { + if self.opendal_impl.info().full_capability().rename { + let from = from.display().to_string(); + let to = to.display().to_string(); + Ok(self.opendal_impl.rename(&from, &to).await?) + } else { + self.copy(from, to).await?; + self.delete(from).await?; + Ok(()) + } + } + + /// Copies the content from one path to another in the object store. + /// + /// # Behavior + /// + /// Fallback to read from source and write into dest if the storage does not support copy. + /// + /// # Errors + /// + /// Returns a `StorageResult` indicating the success of the copy operation. + async fn copy(&self, from: &Path, to: &Path) -> StorageResult<()> { + let from = from.display().to_string(); + let to = to.display().to_string(); + if self.opendal_impl.info().full_capability().copy { + Ok(self.opendal_impl.copy(&from, &to).await?) + } else { + let mut reader = self + .opendal_impl + .reader(&from) + .await? + .into_bytes_stream(..) + .await?; + let mut writer = self.opendal_impl.writer(&to).await?.into_bytes_sink(); + writer + .send_all(&mut reader) + .await + .map_err(|err| StorageError::Any(Box::new(err)))?; + writer + .close() + .await + .map_err(|err| StorageError::Any(Box::new(err)))?; + Ok(()) + } + } + + /// Checks if the content exists at the specified path in the object store. + /// + /// # Errors + /// + /// Returns a `StorageResult` with a boolean indicating the existence of the + /// content. + /// + /// # TODO + /// + /// The `exists` function should return an error for issues such as permission denied. + /// However, these errors are not handled during the migration process and should be addressed + /// after the test suites are refactored. + async fn exists(&self, path: &Path) -> StorageResult { + let path = path.display().to_string(); + Ok(self.opendal_impl.exists(&path).await.unwrap_or(false)) + } +} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index dc8db80d8..1bd2f8d02 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -29,7 +29,7 @@ pub enum StorageError { StoreNotFound(String), #[error(transparent)] - Store(#[from] object_store::Error), + Store(#[from] opendal::Error), #[error("Unable to read data from file {}", path.display().to_string())] UnableToReadBytes { path: PathBuf }, diff --git a/src/storage/strategies/backup.rs b/src/storage/strategies/backup.rs index d53e84c8a..5e5c4f0f1 100644 --- a/src/storage/strategies/backup.rs +++ b/src/storage/strategies/backup.rs @@ -90,12 +90,7 @@ impl StorageStrategy for BackupStrategy { /// Downloads content only from primary storage backend. async fn download(&self, storage: &Storage, path: &Path) -> StorageResult { let store = storage.as_store_err(&self.primary)?; - Ok(store - .get(path) - .await? - .bytes() - .await - .map_err(StorageError::Store)?) + Ok(store.get(path).await?.bytes().await?) } /// Deletes content from the primary and, if configured, secondary storage diff --git a/src/storage/strategies/mirror.rs b/src/storage/strategies/mirror.rs index ff30286ba..e26bfa028 100644 --- a/src/storage/strategies/mirror.rs +++ b/src/storage/strategies/mirror.rs @@ -231,12 +231,7 @@ impl MirrorStrategy { path: &Path, ) -> StorageResult { let store = storage.as_store_err(store_name)?; - store - .get(path) - .await? - .bytes() - .await - .map_err(StorageError::Store) + store.get(path).await?.bytes().await } }