Skip to content

Commit

Permalink
Support to initiate transfers through CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
DImuthuUpe committed Dec 25, 2022
1 parent 8124fa2 commit 319c424
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void submitHttpDownload(HttpDownloadApiRequest request, StreamObserver<Ht
.withMethod("submitHttpDownload")
.withParameter("resourcePath", request.getResourcePath())
.withParameter("sourceStorageId", request.getSourceStorageId())
.withParameter("sourceToken", request.getSourceToken())
.withParameter("sourceToken", request.getSourceSecretId())
.withParameter("mftAuthorizationToken", JsonFormat.printer().print(request.getMftAuthorizationToken()));

SyncRPCResponse rpcResponse = agentRPCClient.sendSyncRequest(requestBuilder.build());
Expand Down
8 changes: 4 additions & 4 deletions api/stub/src/main/proto/MFTTransferApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ message CallbackEndpoint {
message TransferApiRequest {
string sourcePath = 1;
string sourceStorageId = 2;
string sourceToken = 3;
string sourceSecretId = 3;
string destinationPath = 4;
string destinationStorageId = 5;
string destinationToken = 6;
string destinationSecretId = 6;
bool affinityTransfer = 7;
map<string, int32> targetAgents = 8;
org.apache.airavata.mft.common.AuthToken mftAuthorizationToken = 9;
Expand All @@ -43,7 +43,7 @@ message BatchTransferApiResponse {
message HttpUploadApiRequest {
string destinationStorageId = 1;
string resourcePath = 2;
string destinationToken = 3;
string destinationSecretId = 3;
string targetAgent = 5;
org.apache.airavata.mft.common.AuthToken mftAuthorizationToken = 6;
}
Expand All @@ -56,7 +56,7 @@ message HttpUploadApiResponse {
message HttpDownloadApiRequest {
string resourcePath = 1;
string sourceStorageId = 2;
string sourceToken = 3;
string sourceSecretId = 3;
string targetAgent = 5;
org.apache.airavata.mft.common.AuthToken mftAuthorizationToken = 6;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public Integer call() throws Exception {
}

TransferApiResponse transferResp = mftApiClient.getTransferClient().submitTransfer(TransferApiRequest.newBuilder()
.setSourceToken(sourceSecretForStorage.getSecretId())
.setDestinationToken(destSecretForStorage.getSecretId())
.setSourceSecretId(sourceSecretForStorage.getSecretId())
.setDestinationSecretId(destSecretForStorage.getSecretId())
.setDestinationStorageId(destinationStorageId)
.setDestinationPath(destinationPath)
.setSourceStorageId(sourceStorageId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ public AgentTransferRequest createAgentTransferRequest(TransferApiRequest transf
agentTransferBuilder.setSourcePath(transferRequest.getSourcePath());
agentTransferBuilder.setDestinationPath(transferRequest.getDestinationPath());
Pair<StorageWrapper, SecretWrapper> sourceCred = createCredentials(transferRequest.getSourceStorageId(),
transferRequest.getSourceToken());
transferRequest.getSourceSecretId());

agentTransferBuilder.setSourceStorage(sourceCred.getLeft());
agentTransferBuilder.setSourceSecret(sourceCred.getRight());

Pair<StorageWrapper, SecretWrapper> destCred = createCredentials(transferRequest.getDestinationStorageId(),
transferRequest.getDestinationToken());
transferRequest.getDestinationSecretId());

agentTransferBuilder.setDestinationStorage(destCred.getLeft());
agentTransferBuilder.setDestinationSecret(destCred.getRight());
Expand Down
2 changes: 1 addition & 1 deletion python-cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Install dependencies
```
pip install grpcio==1.46.3
pip install grpcio-tools==1.46.3
pip install airavata_mft_sdk==0.0.1-alpha18
pip install airavata_mft_sdk==0.0.1-alpha19
```

Build the binary
Expand Down
133 changes: 123 additions & 10 deletions python-cli/mft_cli/mft_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
from airavata_mft_sdk import MFTTransferApi_pb2
from rich.console import Console
from rich.table import Table
from rich.progress import track
import time

app = typer.Typer()

app.add_typer(mft_cli.storage.app, name="storage")

@app.command("ls")
def list(storage_path):
storage_name = storage_path.split("/")[0]
resource_path = storage_path[len(storage_name) +1 :]
def fetch_storage_and_secret_ids(storage_name):
client = mft_client.MFTClient()
search_req = StorageCommon_pb2.StorageSearchRequest(storageName=storage_name)
storages = client.common_api.searchStorages(search_req)
Expand All @@ -24,24 +23,38 @@ def list(storage_path):

if len(storages.storageList) == 0:
print("No storage with name or id " + storage_name + " was found. Please register the storage with command mft-cli storage add")
exit()
raise typer.Abort()

if len(storages.storageList) > 1:
print("More than one storage with nam " + storage_name + " was found. Please use the storage id. You can fetch it from mft-cli storage list")
exit()
raise typer.Abort()

storage = storages.storageList[0]
sec_req = StorageCommon_pb2.SecretForStorageGetRequest(storageId = storage.storageId)
sec_resp = client.common_api.getSecretForStorage(sec_req)
if sec_resp.error != 0:
print("Could not fetch the secret for storage " + storage.storageId)

id_req = MFTTransferApi_pb2.GetResourceMetadataFromIDsRequest(storageId = sec_resp.storageId,
secretId = sec_resp.secretId,
resourcePath = resource_path)
return sec_resp.storageId, sec_resp.secretId
def get_resource_metadata(storage_path, recursive_search = False):
storage_name = storage_path.split("/")[0]
resource_path = storage_path[len(storage_name) +1 :]

storage_id, secret_id = fetch_storage_and_secret_ids(storage_name)

id_req = MFTTransferApi_pb2.GetResourceMetadataFromIDsRequest(storageId = storage_id,
secretId = secret_id,
resourcePath = resource_path)
resource_medata_req = MFTTransferApi_pb2.FetchResourceMetadataRequest(idRequest = id_req)

client = mft_client.MFTClient()

metadata_resp = client.transfer_api.resourceMetadata(resource_medata_req)
return metadata_resp
@app.command("ls")
def list(storage_path):

metadata_resp = get_resource_metadata(storage_path)

console = Console()
table = Table()
Expand All @@ -65,10 +78,110 @@ def list(storage_path):

console.print(table)

def flatten_directories(directory, parent_path, file_list):
for dir in directory.directories:
flatten_directories(dir, parent_path + dir.friendlyName + "/", file_list)

for file in directory.files:
file_list.append((file, parent_path + file.friendlyName))

@app.command("cp")
def copy(source, destination):
print("Moving data from " + source + " to " + destination)

source_storage_id, source_secret_id = fetch_storage_and_secret_ids(source.split("/")[0])
dest_storage_id, dest_secret_id = fetch_storage_and_secret_ids(destination.split("/")[0])

## TODO : Check agent availability and deploy cloud agents if required

file_list = []
source_metadata = get_resource_metadata(source)
transfer_requests = []
total_volume = 0

if (source_metadata.WhichOneof('metadata') == 'directory') :
if (destination[-1] != "/"):
print("Source is a directory path so destination path should end with /")
raise typer.Abort()

flatten_directories(source_metadata.directory, "", file_list)
for file_entry in file_list:
file = file_entry[0]
relative_path = file_entry[1]
transfer_requests.append(MFTTransferApi_pb2.TransferApiRequest(
sourcePath = file.resourcePath,
sourceStorageId = source_storage_id,
sourceSecretId = source_secret_id,
destinationPath = destination[len(destination.split("/")[0]) +1 :] + relative_path,
destinationStorageId = dest_storage_id,
destinationSecretId = dest_secret_id))
total_volume += file.resourceSize

elif (source_metadata.WhichOneof('metadata') == 'file'):
file_list.append((source_metadata.file, source_metadata.file.friendlyName))

if destination[-1] == "/":
destination = destination + source_metadata.file.friendlyName

transfer_requests.append(MFTTransferApi_pb2.TransferApiRequest(
sourcePath = source_metadata.file.resourcePath,
sourceStorageId = source_storage_id,
sourceSecretId = source_secret_id,
destinationPath = destination[len(destination.split("/")[0]) +1 :],
destinationStorageId = dest_storage_id,
destinationSecretId = dest_secret_id))

total_volume += source_metadata.file.resourceSize

elif (source_metadata.WhichOneof('metadata') == 'error'):
print("Failed while fetching source details")
print(metadata_resp.error)
raise typer.Abort()

batch_transfer_request = MFTTransferApi_pb2.BatchTransferApiRequest()
batch_transfer_request.transferRequests.extend(transfer_requests)

confirm = typer.confirm("Total number of " + str(len(transfer_requests)) +
" files to be transferred. Total volume is " + str(total_volume)
+ " bytes. Do you want to start the transfer? ", True)

client = mft_client.MFTClient()
batch_transfer_resp = client.transfer_api.submitBatchTransfer(batch_transfer_request)

if not confirm:
raise typer.Abort()

transfer_ids = batch_transfer_resp.transferIds

state_requests = []
for transfer_id in transfer_ids:
state_requests.append(MFTTransferApi_pb2.TransferStateApiRequest(transferId=transfer_id))

## TODO: This has to be optimized and avoid frequent polling of all transfer ids in each iteration
## Possible fix is to introduce a parent batch transfer id at the API level and fetch child trnasfer id
# summaries in a single API call

completed = 0
failed = 0

with typer.progressbar(length=len(transfer_ids)) as progress:

while 1:
completed = 0
failed = 0
for state_request in state_requests:
state_resp = client.transfer_api.getTransferState(state_request)
if state_resp.state == "COMPLETED":
completed += 1
elif state_resp.state == "FAILED":
failed += 1

total = completed + failed
progress.update(total)
if (total == len(transfer_ids)):
break
time.sleep(1)

print(f"Processed {completed + failed} files. Completed {completed}, Failed {failed}.")

if __name__ == "__main__":
app()
2 changes: 1 addition & 1 deletion python-sdk/setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = airavata_mft_sdk
version = 0.0.1-alpha18
version = 0.0.1-alpha19
author = Airavata MFT Developers
author_email = [email protected]
description = Python SDK for Apache Airavata Managed File Transfers (MFT)
Expand Down
Loading

0 comments on commit 319c424

Please sign in to comment.