Skip to content

Commit

Permalink
Rework the wait mechanism when multiple refresh requests on the same …
Browse files Browse the repository at this point in the history
…bucket arrive at the same time
  • Loading branch information
cktan committed Nov 18, 2019
1 parent a2aa8cf commit 2b5c7e1
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 26 deletions.
6 changes: 5 additions & 1 deletion src/s3pool/cat/bucketmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

type KeyMap struct {
sync.RWMutex
err error
Map *map[string]string // key to etag
}

Expand Down Expand Up @@ -47,21 +48,24 @@ func (bm *BucketMap) Get(bucket string) (result *KeyMap, ok bool) {
return
}

func (bm *BucketMap) Put(bucket string, key2etag *map[string]string) {
func (bm *BucketMap) Put(bucket string, key2etag *map[string]string, err error) {
bm.Lock()
km := bm.Map[bucket]
if km == nil {
// this is a new keymap
km = &KeyMap{Map: key2etag}
// even though we will assign to km.Map again later,
// it is better to also do it here to ensure that
// km.Map is never nil to avoid potential race
km.Map = key2etag
km.err = err
bm.Map[bucket] = km
}
bm.Unlock()

km.Lock()
km.Map = key2etag
km.err = err
km.Unlock()
}

Expand Down
21 changes: 14 additions & 7 deletions src/s3pool/cat/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ func KnownBuckets() []string {
return bm.Keys()
}

func Exists(bucket string) bool {
_, ok := bm.Get(bucket)
return ok
}

func Find(bucket, key string) (etag string) {
// returns etag == "" if not found
if trace {
Expand Down Expand Up @@ -88,13 +83,25 @@ func Scan(bucket string, filter func(string) bool) (key []string) {
return
}

func Store(bucket string, key, etag []string) {
func Store(bucket string, key, etag []string, err error) {
if trace {
log.Println("Catalog.Store", bucket)
}
dict := make(map[string]string)
for i := range key {
dict[key[i]] = etag[i]
}
bm.Put(bucket, &dict)
bm.Put(bucket, &dict, err)
}

func Exists(bucket string) (ok bool, err error) {
var km *KeyMap
km, ok = bm.Get(bucket)
if ok {
km.Lock()
err = km.err
km.Unlock()
}

return
}
29 changes: 22 additions & 7 deletions src/s3pool/op/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"path/filepath"
"s3pool/cat"
"s3pool/conf"
"s3pool/strlock"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -85,17 +86,31 @@ func moveFile(src, dst string) error {
// Check that we have a catalog on bucket. If not, create it.
func checkCatalog(bucket string) error {

if cat.Exists(bucket) {
return nil
// serialize refresh on bucket
lockname, err := strlock.Lock("refresh " + bucket)
if err != nil {
return err
}
defer strlock.Unlock(lockname)

// notify bucketmon; it will invoke refresh to create entry in catalog.
conf.NotifyBucketmon(bucket)
ok, err := cat.Exists(bucket)
if err != nil {
return err
}
if !ok {
// notify bucketmon; it will invoke refresh to create entry in catalog.
conf.NotifyBucketmon(bucket)

// wait and poll cat
for !cat.Exists(bucket) {
time.Sleep(time.Second)
// wait for it
for !ok {
time.Sleep(time.Second)
ok, err = cat.Exists(bucket)
if err != nil {
return err
}
}
}

log.Println("Refreshed due to missing catalog")
return nil
}
15 changes: 4 additions & 11 deletions src/s3pool/op/refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package op
import (
"errors"
"s3pool/cat"
"s3pool/strlock"
)

/*
Expand All @@ -30,13 +29,6 @@ func Refresh(args []string) (string, error) {
bucket := args[0]
// DO NOT checkCatalog here. We will update it!

// serialize refresh on bucket
lockname, err := strlock.Lock("refresh " + bucket)
if err != nil {
return "", err
}
defer strlock.Unlock(lockname)

numItems := 0
/*
log.Println("REFRESH start on", bucket)
Expand All @@ -60,11 +52,12 @@ func Refresh(args []string) (string, error) {
numItems++
}

if err := s3ListObjects(bucket, save); err != nil {
err := s3ListObjects(bucket, save)
cat.Store(bucket, key, etag, err)

if err != nil {
return "", err
}

cat.Store(bucket, key, etag)

return "\n", nil
}

0 comments on commit 2b5c7e1

Please sign in to comment.