Skip to content

Commit

Permalink
PULL multiple obj at the same time
Browse files Browse the repository at this point in the history
  • Loading branch information
cktan committed Oct 30, 2019
1 parent b7c0022 commit 643c07c
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 52 deletions.
61 changes: 33 additions & 28 deletions client/c/s3cat.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

void usage(const char* pname, const char* msg)
{
fprintf(stderr, "Usage: %s [-h] -p port bucket:key ...\n", pname);
fprintf(stderr, "Usage: %s [-h] -p port bucket key [key...]\n", pname);
fprintf(stderr, "Copy s3 files to stdout.\n\n");
fprintf(stderr, " -p port : specify the port number of s3pool process\n");
fprintf(stderr, " -h : print this help message\n");
Expand All @@ -27,29 +27,8 @@ void fatal(const char* msg)
}


void doit(int port, char* bktkey_)
void catfile(const char* fname)
{
char errmsg[200];
char* bktkey = strdup(bktkey_);
if (!bktkey) {
fatal("out of memory");
}

char* colon = strchr(bktkey, ':');
if (!colon) {
fatal("missing colon char in bucket:key");
}

*colon = 0;
char* bucket = bktkey;
char* key = colon+1;

char* fname = s3pool_pull(port, bucket, key,
errmsg, sizeof(errmsg));
if (!fname) {
fatal(errmsg);
}

FILE* fp = fopen(fname, "r");
if (!fp) {
perror("fopen");
Expand All @@ -72,8 +51,29 @@ void doit(int port, char* bktkey_)


fclose(fp);
free(bktkey);

}


void doit(int port, char* bucket, const char* key[], int nkey)
{
char errmsg[200];
char* reply = s3pool_pull_ex(port, bucket, key, nkey,
errmsg, sizeof(errmsg));
if (!reply) {
fatal(errmsg);
}

char* p = reply;
while (1) {
char* q = strchr(p, '\n');
if (!q || p == q) break;

*q = 0;
catfile(p);
p = q+1;
}

free(reply);
}


Expand Down Expand Up @@ -104,10 +104,15 @@ int main(int argc, char* argv[])
if (optind >= argc) {
usage(argv[0], "Need bucket and key");
}

for (int i = optind; i < argc; i++) {
doit(port, argv[i]);

char* bucket = argv[optind++];
if (optind >= argc) {
usage(argv[0], "Need key");
}

const char** key = (const char**) &argv[optind];
int nkey = argc - optind;
doit(port, bucket, key, nkey);

return 0;
}
22 changes: 14 additions & 8 deletions client/c/s3pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ static char* mkrequest(int argc, const char** argv, char* errmsg, int errmsgsz)
*p++ = '\n';
*p = 0; /* NUL */

assert(strlen(p) + 1 <= len);
assert((int)strlen(p) + 1 <= len);
return request;

bailout:
Expand Down Expand Up @@ -195,16 +195,21 @@ static char* chat(int port, const char* request,
On failure, return a NULL ptr.
*/
char* s3pool_pull_ex(int port, const char* bucket, const char* key,
const char* nextkey,
char* s3pool_pull_ex(int port, const char* bucket,
const char* key[], int nkey,
char* errmsg, int errmsgsz)
{
char* request = 0;
char* reply = 0;
int fd = -1;
const char* argv[4] = {"PULL", bucket, key, nextkey};
const char* argv[2+nkey];

request = mkrequest(nextkey ? 4 : 3, argv, errmsg, errmsgsz);
argv[0] = "PULL";
argv[1] = bucket;
for (int i = 0; i < nkey; i++)
argv[i+2] = key[i];

request = mkrequest(2+nkey, argv, errmsg, errmsgsz);
if (!request) {
goto bailout;
}
Expand All @@ -214,8 +219,6 @@ char* s3pool_pull_ex(int port, const char* bucket, const char* key,
goto bailout;
}

char* term = strchr(reply, '\n');
if (term) *term = 0;

free(request);
return reply;
Expand All @@ -230,7 +233,10 @@ char* s3pool_pull_ex(int port, const char* bucket, const char* key,
char* s3pool_pull(int port, const char* bucket, const char* key,
char* errmsg, int errmsgsz)
{
return s3pool_pull_ex(port, bucket, key, 0, errmsg, errmsgsz);
char* reply = s3pool_pull_ex(port, bucket, &key, 1, errmsg, errmsgsz);
char* term = strchr(reply, '\n');
if (term) *term = 0;
return reply;
}


Expand Down
17 changes: 15 additions & 2 deletions client/c/s3pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,21 @@
*/
EXTERN char* s3pool_pull(int port, const char* bucket, const char* key,
char* errmsg, int errmsgsz);
EXTERN char* s3pool_pull_ex(int port, const char* bucket, const char* key,
const char* nextkey, /* hint for prefetching next obj */


/**
PULL multiple file from S3 to local disk.
On success, return the paths to the file pulled down from S3 in a
list of strings terminated by NEWLINE. Caller must free() the
buffer returned.
On failure, return a NULL ptr.
*/
EXTERN char* s3pool_pull_ex(int port, const char* bucket,
const char* key[], int nkey,
char* errmsg, int errmsgsz);


Expand Down
58 changes: 46 additions & 12 deletions src/s3pool/op/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,59 @@ package op

import (
"errors"
"strings"
)


func pmap(processitem func(idx int), maxidx int, maxworker int) {
fin := make(chan int)
gate := make(chan int, maxworker);
defer close(fin)
defer close(gate)

for i := 0; i < maxworker; i++ {
gate <- 1
}
for i := 0; i < maxidx; i++ {
<- gate
go func(idx int) {
processitem(idx)
gate <- 1
fin <- idx
}(i)
}

for i := 0; i < maxidx; i++ {
<- fin
}
}


func Pull(args []string) (string, error) {
if len(args) != 2 && len(args) != 3 {
return "", errors.New("Expected 2 or 3 arguments for PULL")
if len(args) < 2 {
return "", errors.New("Expected at least 2 arguments for PULL")
}
bucket, key := args[0], args[1]
bucket := args[0]
keys := args[1:]
nkeys := len(keys)
path := make([]string, nkeys)
patherr := make([]error, nkeys)

// retrieve the object
path, err := s3GetObject(bucket, key)
if err != nil {
return "", err
dowork := func(i int) {
path[i], patherr[i] = s3GetObject(bucket, keys[i])
}

if len(args) == 3 {
nextKey := args[2]
// prefetch ... fire and forget
go s3GetObject(bucket, nextKey)
pmap(dowork, nkeys, 50)

var reply strings.Builder
for i := 0; i < nkeys; i++ {
if patherr[i] != nil {
return "", patherr[i]
}
reply.WriteString(path[i])
reply.WriteString("\n")
}

return path + "\n", nil

return reply.String(), nil
}
4 changes: 2 additions & 2 deletions src/s3pool/op/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"time"
)

var trace_s3api bool = false
var trace_s3api bool = true
var use_goapi bool = false

func s3ListObjects(bucket string, wr io.Writer) error {
Expand Down Expand Up @@ -117,7 +117,7 @@ func s3GetObject(bucket string, key string) (string, error) {

// If this file was recently modified, don't go fetch it
since, _ := fileMtimeSince(path)
if since > 0 && since.Minutes() < 2 {
if since > 0 && since.Minutes() < 30 {
return path, nil
}

Expand Down

0 comments on commit 643c07c

Please sign in to comment.