From 2b5c7e13945891c76816a9c210fa190d14dfb0a0 Mon Sep 17 00:00:00 2001 From: CK Tan Date: Sun, 17 Nov 2019 17:40:15 -0800 Subject: [PATCH] Rework the wait mechanism when multiple refresh requests on the same bucket arrive at the same time --- src/s3pool/cat/bucketmap.go | 6 +++++- src/s3pool/cat/cat.go | 21 ++++++++++++++------- src/s3pool/op/common.go | 29 ++++++++++++++++++++++------- src/s3pool/op/refresh.go | 15 ++++----------- 4 files changed, 45 insertions(+), 26 deletions(-) diff --git a/src/s3pool/cat/bucketmap.go b/src/s3pool/cat/bucketmap.go index 01b0ee0..731b3a8 100644 --- a/src/s3pool/cat/bucketmap.go +++ b/src/s3pool/cat/bucketmap.go @@ -18,6 +18,7 @@ import ( type KeyMap struct { sync.RWMutex + err error Map *map[string]string // key to etag } @@ -47,21 +48,24 @@ func (bm *BucketMap) Get(bucket string) (result *KeyMap, ok bool) { return } -func (bm *BucketMap) Put(bucket string, key2etag *map[string]string) { +func (bm *BucketMap) Put(bucket string, key2etag *map[string]string, err error) { bm.Lock() km := bm.Map[bucket] if km == nil { + // this is a new keymap km = &KeyMap{Map: key2etag} // even though we will assign to km.Map again later, // it is better to also do it here to ensure that // km.Map is never nil to avoid potential race km.Map = key2etag + km.err = err bm.Map[bucket] = km } bm.Unlock() km.Lock() km.Map = key2etag + km.err = err km.Unlock() } diff --git a/src/s3pool/cat/cat.go b/src/s3pool/cat/cat.go index e17b383..7b80574 100644 --- a/src/s3pool/cat/cat.go +++ b/src/s3pool/cat/cat.go @@ -23,11 +23,6 @@ func KnownBuckets() []string { return bm.Keys() } -func Exists(bucket string) bool { - _, ok := bm.Get(bucket) - return ok -} - func Find(bucket, key string) (etag string) { // returns etag == "" if not found if trace { @@ -88,7 +83,7 @@ func Scan(bucket string, filter func(string) bool) (key []string) { return } -func Store(bucket string, key, etag []string) { +func Store(bucket string, key, etag []string, err error) { if trace { log.Println("Catalog.Store", bucket) } @@ -96,5 +91,17 @@ func Store(bucket string, key, etag []string) { for i := range key { dict[key[i]] = etag[i] } - bm.Put(bucket, &dict) + bm.Put(bucket, &dict, err) +} + +func Exists(bucket string) (ok bool, err error) { + var km *KeyMap + km, ok = bm.Get(bucket) + if ok { + km.Lock() + err = km.err + km.Unlock() + } + + return } diff --git a/src/s3pool/op/common.go b/src/s3pool/op/common.go index 7d62574..2e5de4a 100644 --- a/src/s3pool/op/common.go +++ b/src/s3pool/op/common.go @@ -20,6 +20,7 @@ import ( "path/filepath" "s3pool/cat" "s3pool/conf" + "s3pool/strlock" "strings" "syscall" "time" @@ -85,17 +86,31 @@ func moveFile(src, dst string) error { // Check that we have a catalog on bucket. If not, create it. func checkCatalog(bucket string) error { - if cat.Exists(bucket) { - return nil + // serialize refresh on bucket + lockname, err := strlock.Lock("refresh " + bucket) + if err != nil { + return err } + defer strlock.Unlock(lockname) - // notify bucketmon; it will invoke refresh to create entry in catalog. - conf.NotifyBucketmon(bucket) + ok, err := cat.Exists(bucket) + if err != nil { + return err + } + if !ok { + // notify bucketmon; it will invoke refresh to create entry in catalog. + conf.NotifyBucketmon(bucket) - // wait and poll cat - for !cat.Exists(bucket) { - time.Sleep(time.Second) + // wait for it + for !ok { + time.Sleep(time.Second) + ok, err = cat.Exists(bucket) + if err != nil { + return err + } + } } + log.Println("Refreshed due to missing catalog") return nil } diff --git a/src/s3pool/op/refresh.go b/src/s3pool/op/refresh.go index de59df6..b10d9b4 100644 --- a/src/s3pool/op/refresh.go +++ b/src/s3pool/op/refresh.go @@ -15,7 +15,6 @@ package op import ( "errors" "s3pool/cat" - "s3pool/strlock" ) /* @@ -30,13 +29,6 @@ func Refresh(args []string) (string, error) { bucket := args[0] // DO NOT checkCatalog here. We will update it! - // serialize refresh on bucket - lockname, err := strlock.Lock("refresh " + bucket) - if err != nil { - return "", err - } - defer strlock.Unlock(lockname) - numItems := 0 /* log.Println("REFRESH start on", bucket) @@ -60,11 +52,12 @@ func Refresh(args []string) (string, error) { numItems++ } - if err := s3ListObjects(bucket, save); err != nil { + err := s3ListObjects(bucket, save) + cat.Store(bucket, key, etag, err) + + if err != nil { return "", err } - cat.Store(bucket, key, etag) - return "\n", nil }