From 334e04213e5d849f8ba19a024084322c9b0456bd Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Wed, 23 Oct 2024 19:04:14 +0100 Subject: [PATCH 1/5] Device can now be set to optionally sync to S3. Added stats to S3Storage. Signed-off-by: Jimmy Moore --- cmd/sync.go | 2 +- pkg/storage/config/silo.go | 18 ++ pkg/storage/device/device.go | 147 ++++++++++++ pkg/storage/device/device_sync_test.go | 118 ++++++++++ pkg/storage/migrator/migrator_s3_test.go | 2 +- pkg/storage/migrator/sync_test.go | 2 +- .../protocol/packets/alternate_sources.go | 64 ++++++ pkg/storage/protocol/packets/packet.go | 2 + pkg/storage/protocol/packets/packet_test.go | 40 ++++ pkg/storage/sources/s3_storage.go | 215 +++++++++++++----- pkg/storage/sources/s3_storage_test.go | 4 +- 11 files changed, 555 insertions(+), 59 deletions(-) create mode 100644 pkg/storage/device/device_sync_test.go create mode 100644 pkg/storage/protocol/packets/alternate_sources.go diff --git a/cmd/sync.go b/cmd/sync.go index 816f48cd..e366d08c 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -179,7 +179,7 @@ func sync_setup_device(conf *config.DeviceSchema) (*syncStorageInfo, error) { if sync_dummy { dest = modules.NewNothing(sourceStorage.Size()) } else { - dest, err = sources.NewS3StorageCreate(sync_endpoint, + dest, err = sources.NewS3StorageCreate(false, sync_endpoint, sync_access, sync_secret, sync_bucket, diff --git a/pkg/storage/config/silo.go b/pkg/storage/config/silo.go index 778aa86c..5c12ce11 100644 --- a/pkg/storage/config/silo.go +++ b/pkg/storage/config/silo.go @@ -26,6 +26,24 @@ type DeviceSchema struct { ROSource *DeviceSchema `hcl:"source,block"` Binlog string `hcl:"binlog,optional"` PageServerPID int `hcl:"pid,optional"` + Sync *SyncS3Schema `hcl:"sync,block"` +} + +type SyncConfigSchema struct { + BlockShift int `hcl:"blockshift,attr"` + MaxAge string `hcl:"maxage,attr"` + MinChanged int `hcl:"minchanged,attr"` + CheckPeriod string `hcl:"checkperiod,attr"` + Limit int `hcl:"limit,attr"` +} + +type SyncS3Schema struct { + Secure bool `hcl:"secure,attr"` + AccessKey string `hcl:"accesskey,attr"` + SecretKey string `hcl:"secretkey,attr"` + Endpoint string `hcl:"endpoint,attr"` + Bucket string `hcl:"bucket,attr"` + Config *SyncConfigSchema `hcl:"config,block"` } func parseByteValue(val string) int64 { diff --git a/pkg/storage/device/device.go b/pkg/storage/device/device.go index 36cd118f..f0b60633 100644 --- a/pkg/storage/device/device.go +++ b/pkg/storage/device/device.go @@ -1,17 +1,25 @@ package device import ( + "context" "encoding/binary" "errors" "fmt" "os" "path" + "sync" + "time" "github.com/loopholelabs/silo/pkg/storage" + "github.com/loopholelabs/silo/pkg/storage/blocks" "github.com/loopholelabs/silo/pkg/storage/config" + "github.com/loopholelabs/silo/pkg/storage/dirtytracker" "github.com/loopholelabs/silo/pkg/storage/expose" + "github.com/loopholelabs/silo/pkg/storage/migrator" "github.com/loopholelabs/silo/pkg/storage/modules" + "github.com/loopholelabs/silo/pkg/storage/protocol/packets" "github.com/loopholelabs/silo/pkg/storage/sources" + "github.com/rs/zerolog/log" ) const ( @@ -50,6 +58,9 @@ func NewDevices(ds []*config.DeviceSchema) (map[string]*Device, error) { } func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.ExposedStorage, error) { + device_schema := string(ds.Encode()) + log.Info().Str("schema", device_schema).Msg("Setting up NewDevice from schema") + var prov storage.StorageProvider var err error @@ -163,6 +174,8 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose // Optionally use a copy on write RO source... if ds.ROSource != nil { + log.Info().Str("schema", device_schema).Msg("Setting up CopyOnWrite") + // Create the ROSource... rodev, _, err := NewDevice(ds.ROSource) if err != nil { @@ -205,6 +218,7 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose // Optionally binlog this dev to a file if ds.Binlog != "" { + log.Info().Str("schema", string(ds.Encode())).Msg("Setting up BinLog") prov, err = modules.NewBinLog(prov, ds.Binlog) if err != nil { return nil, nil, err @@ -215,6 +229,8 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose // NB You may well need to call ex.SetProvider if you wish to insert other things in the chain. var ex storage.ExposedStorage if ds.Expose { + log.Info().Str("schema", device_schema).Msg("Setting up Expose device") + ex = expose.NewExposedStorageNBDNL(prov, 8, 0, prov.Size(), expose.NBD_DEFAULT_BLOCK_SIZE, true) err := ex.Init() @@ -224,5 +240,136 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose } } + // Optionally sync the device to S3 + if ds.Sync != nil { + log.Info().Str("schema", device_schema).Msg("Setting up Sync") + + s3dest, err := sources.NewS3StorageCreate(ds.Sync.Secure, + ds.Sync.Endpoint, + ds.Sync.AccessKey, + ds.Sync.SecretKey, + ds.Sync.Bucket, + ds.Name, + prov.Size(), + bs) + + if err != nil { + prov.Close() + return nil, nil, err + } + + dirty_block_size := bs >> ds.Sync.Config.BlockShift + + num_blocks := (int(prov.Size()) + bs - 1) / bs + + sourceDirtyLocal, sourceDirtyRemote := dirtytracker.NewDirtyTracker(prov, dirty_block_size) + sourceStorage := modules.NewLockable(sourceDirtyLocal) + + // Setup a block order + orderer := blocks.NewAnyBlockOrder(num_blocks, nil) + orderer.AddAll() + + check_period, err := time.ParseDuration(ds.Sync.Config.CheckPeriod) + if err != nil { + prov.Close() + return nil, nil, err + } + + max_age, err := time.ParseDuration(ds.Sync.Config.MaxAge) + if err != nil { + prov.Close() + return nil, nil, err + } + + ctx, cancelfn := context.WithCancel(context.TODO()) + + // Start doing the sync... + syncer := migrator.NewSyncer(ctx, &migrator.Sync_config{ + Name: ds.Name, + Integrity: false, + Cancel_writes: true, + Dedupe_writes: true, + Tracker: sourceDirtyRemote, + Lockable: sourceStorage, + Destination: s3dest, + Orderer: orderer, + Dirty_check_period: check_period, + Dirty_block_getter: func() []uint { + return sourceDirtyRemote.GetDirtyBlocks( + max_age, ds.Sync.Config.Limit, ds.Sync.Config.BlockShift, ds.Sync.Config.MinChanged) + }, + Block_size: bs, + Progress_handler: func(p *migrator.MigrationProgress) {}, + Error_handler: func(b *storage.BlockInfo, err error) {}, + }) + + // The provider we return should feed into our sync here. + prov = sourceStorage + + var sync_lock sync.Mutex + var sync_running bool + var wg sync.WaitGroup + + // If the storage gets a "sync.start", we should start syncing to S3. + storage.AddEventNotification(prov, "sync.start", func(event_type storage.EventType, data storage.EventData) storage.EventReturnData { + sync_lock.Lock() + if sync_running { + sync_lock.Unlock() + return false + } + log.Info().Str("schema", device_schema).Msg("Starting sync") + sync_running = true + wg.Add(1) + sync_lock.Unlock() + + // Sync happens here... + go func() { + // Do this in a goroutine, but make sure it's cancelled etc + status, err := syncer.Sync(false, true) + log.Info().Str("schema", device_schema).Err(err).Any("status", status).Msg("Sync finished") + wg.Done() + }() + return true + }) + + // If the storage gets a "sync.status", get some status on the S3Storage + storage.AddEventNotification(prov, "sync.status", func(event_type storage.EventType, data storage.EventData) storage.EventReturnData { + return s3dest.Metrics() + }) + + // If the storage gets a "sync.stop", we should cancel the sync, and return the safe blocks + storage.AddEventNotification(prov, "sync.stop", func(event_type storage.EventType, data storage.EventData) storage.EventReturnData { + sync_lock.Lock() + if !sync_running { + sync_lock.Unlock() + return nil + } + log.Info().Str("schema", device_schema).Msg("Stopping sync") + cancelfn() + // WAIT HERE for the sync to finish + wg.Wait() + sync_running = false + sync_lock.Unlock() + + // Get the list of safe blocks we can use. + blocks := syncer.GetSafeBlockMap() + // Translate these to locations so they can be sent to a destination... + alt_sources := make([]packets.AlternateSource, 0) + for block, hash := range blocks { + as := packets.AlternateSource{ + Offset: int64(block * uint(bs)), + Length: int64(bs), + Hash: hash, + Location: fmt.Sprintf("%s %s %s", ds.Sync.Endpoint, ds.Sync.Bucket, ds.Name), + } + alt_sources = append(alt_sources, as) + } + + log.Info().Str("schema", device_schema).Int("sources", len(alt_sources)).Msg("Sync stopped with sources") + return alt_sources + }) + + } + return prov, ex, nil } diff --git a/pkg/storage/device/device_sync_test.go b/pkg/storage/device/device_sync_test.go new file mode 100644 index 00000000..c1ae696c --- /dev/null +++ b/pkg/storage/device/device_sync_test.go @@ -0,0 +1,118 @@ +package device + +import ( + "crypto/rand" + "crypto/sha256" + "fmt" + "os" + "testing" + "time" + + "github.com/loopholelabs/silo/pkg/storage" + "github.com/loopholelabs/silo/pkg/storage/config" + "github.com/loopholelabs/silo/pkg/storage/protocol/packets" + "github.com/loopholelabs/silo/pkg/storage/sources" + "github.com/loopholelabs/silo/pkg/testutils" + "github.com/stretchr/testify/assert" +) + +func TestDeviceSync(t *testing.T) { + PORT_9000 := testutils.SetupMinio(t.Cleanup) + + block_size := 64 * 1024 + + testSyncSchema := fmt.Sprintf(` + device TestSync { + system = "file" + size = "1m" + blocksize = "64k" + location = "./testdata/testfile_sync" + sync { + secure = false + accesskey = "silosilo" + secretkey = "silosilo" + endpoint = "%s" + bucket = "silosilo" + config { + blockshift = 2 + maxage = "100ms" + minchanged = 4 + limit = 256 + checkperiod = "100ms" + } + } + } + `, fmt.Sprintf("localhost:%s", PORT_9000)) + + s := new(config.SiloSchema) + err := s.Decode([]byte(testSyncSchema)) + assert.NoError(t, err) + devs, err := NewDevices(s.Device) + assert.NoError(t, err) + t.Cleanup(func() { + os.Remove("./testdata/testfile_sync") + }) + + assert.Equal(t, 1, len(devs)) + + prov := devs["TestSync"].Provider + + num_blocks := (int(prov.Size()) + block_size - 1) / block_size + + buffer := make([]byte, 1024*1024) + _, err = rand.Read(buffer) + assert.NoError(t, err) + n, err := prov.WriteAt(buffer, 0) + assert.NoError(t, err) + assert.Equal(t, 1024*1024, n) + + // Tell the sync to start. + storage.SendEvent(prov, "sync.start", nil) + + // Do a few write here, and wait a little bit for sync to happen... + for i := 0; i < num_blocks; i++ { + wbuffer := make([]byte, block_size) + rand.Read(wbuffer) + n, err = prov.WriteAt(wbuffer, int64(i*block_size)) + assert.NoError(t, err) + assert.Equal(t, 64*1024, n) + } + + // Should be enough time here to migrate the changed data blocks, since we have set the config. + time.Sleep(500 * time.Millisecond) + + // Tell the sync to stop, and return the AlternateSource details. + asources := storage.SendEvent(prov, "sync.stop", nil) + + locs := make([]string, 0) + + for _, r := range asources { + alt := r.([]packets.AlternateSource) + for _, as := range alt { + // Check the data matches what we have locally... + buff := make([]byte, as.Length) + n, err := prov.ReadAt(buff, as.Offset) + assert.NoError(t, err) + assert.Equal(t, n, int(as.Length)) + + hash := sha256.Sum256(buff) + assert.Equal(t, hash, as.Hash) + + locs = append(locs, as.Location) + } + } + + // If everything worked, all blocks should be present on S3. + assert.Equal(t, num_blocks, len(locs)) + + // Get some statistics + stats := storage.SendEvent(prov, "sync.status", nil) + + assert.Equal(t, 1, len(stats)) + metrics := stats[0].(*sources.S3Metrics) + + // Do some asserts on the S3Metrics... It should have written each block at least once by now. + assert.GreaterOrEqual(t, num_blocks, int(metrics.Blocks_w_count)) + + prov.Close() +} diff --git a/pkg/storage/migrator/migrator_s3_test.go b/pkg/storage/migrator/migrator_s3_test.go index 7d347518..49bd69ce 100644 --- a/pkg/storage/migrator/migrator_s3_test.go +++ b/pkg/storage/migrator/migrator_s3_test.go @@ -58,7 +58,7 @@ func TestMigratorToS3(t *testing.T) { orderer.AddAll() // START moving data from sourceStorage to destStorage - destStorage, err := sources.NewS3StorageCreate(fmt.Sprintf("localhost:%s", PORT_9000), "silosilo", "silosilo", "silosilo", "file", uint64(size), blockSize) + destStorage, err := sources.NewS3StorageCreate(false, fmt.Sprintf("localhost:%s", PORT_9000), "silosilo", "silosilo", "silosilo", "file", uint64(size), blockSize) assert.NoError(t, err) diff --git a/pkg/storage/migrator/sync_test.go b/pkg/storage/migrator/sync_test.go index 71221768..39cb15a2 100644 --- a/pkg/storage/migrator/sync_test.go +++ b/pkg/storage/migrator/sync_test.go @@ -66,7 +66,7 @@ func TestSyncToS3(t *testing.T) { orderer.AddAll() // START moving data from sourceStorage to destStorage - destStorage, err := sources.NewS3StorageCreate(fmt.Sprintf("localhost:%s", PORT_9000), "silosilo", "silosilo", "silosilo", "file", uint64(size), blockSize) + destStorage, err := sources.NewS3StorageCreate(false, fmt.Sprintf("localhost:%s", PORT_9000), "silosilo", "silosilo", "silosilo", "file", uint64(size), blockSize) assert.NoError(t, err) diff --git a/pkg/storage/protocol/packets/alternate_sources.go b/pkg/storage/protocol/packets/alternate_sources.go new file mode 100644 index 00000000..2d6d401e --- /dev/null +++ b/pkg/storage/protocol/packets/alternate_sources.go @@ -0,0 +1,64 @@ +package packets + +import ( + "bytes" + "crypto/sha256" + "encoding/binary" + "errors" +) + +type AlternateSource struct { + Offset int64 + Length int64 + Hash [sha256.Size]byte + Location string +} + +func EncodeAlternateSources(sources []AlternateSource) []byte { + var buff bytes.Buffer + + head := make([]byte, 1+4) + head[0] = COMMAND_ALTERNATE_SOURCES + binary.LittleEndian.PutUint32(head[1:], uint32(len(sources))) + + buff.Write(head) + + for _, src := range sources { + v := make([]byte, 8+8+sha256.Size+4+len(src.Location)) + binary.LittleEndian.PutUint64(v, uint64(src.Offset)) + binary.LittleEndian.PutUint64(v[8:], uint64(src.Length)) + copy(v[16:], src.Hash[:]) + binary.LittleEndian.PutUint32(v[16+sha256.Size:], uint32(len(src.Location))) + copy(v[16+sha256.Size+4:], src.Location) + buff.Write(v) + } + + return buff.Bytes() +} + +func DecodeAlternateSources(buff []byte) ([]AlternateSource, error) { + if buff == nil || len(buff) < 1+4 || buff[0] != COMMAND_ALTERNATE_SOURCES { + return nil, errors.New("invalid packet") + } + + l := int(binary.LittleEndian.Uint32(buff[1:])) + + sources := make([]AlternateSource, 0) + ptr := 5 + for a := 0; a < l; a++ { + offset := binary.LittleEndian.Uint64(buff[ptr:]) + length := binary.LittleEndian.Uint64(buff[ptr+8:]) + hash := buff[ptr+16 : ptr+16+sha256.Size] + loc_len := binary.LittleEndian.Uint32(buff[ptr+16+sha256.Size:]) + loc := buff[ptr+16+sha256.Size+4 : ptr+16+sha256.Size+4+int(loc_len)] + ptr = ptr + 16 + sha256.Size + 4 + int(loc_len) + sources = append(sources, AlternateSource{ + Offset: int64(offset), + Length: int64(length), + Hash: [32]byte(hash), + Location: string(loc), + }) + } + + return sources, nil +} diff --git a/pkg/storage/protocol/packets/packet.go b/pkg/storage/protocol/packets/packet.go index 90c8def0..6ca67aff 100644 --- a/pkg/storage/protocol/packets/packet.go +++ b/pkg/storage/protocol/packets/packet.go @@ -16,6 +16,8 @@ const ( COMMAND_WRITE_AT_WITH_MAP = COMMAND_REQUEST | byte(10) COMMAND_REMOVE_DEV = COMMAND_REQUEST | byte(11) COMMAND_REMOVE_FROM_MAP = COMMAND_REQUEST | byte(12) + COMMAND_WRITE_AT_HASH = COMMAND_REQUEST | byte(13) + COMMAND_ALTERNATE_SOURCES = COMMAND_REQUEST | byte(14) ) const ( diff --git a/pkg/storage/protocol/packets/packet_test.go b/pkg/storage/protocol/packets/packet_test.go index 37b1f92d..bc0774c4 100644 --- a/pkg/storage/protocol/packets/packet_test.go +++ b/pkg/storage/protocol/packets/packet_test.go @@ -1,6 +1,7 @@ package packets import ( + "crypto/sha256" "errors" "testing" @@ -331,3 +332,42 @@ func TestRemoveDev(t *testing.T) { assert.Error(t, err) } + +func TestHashes(t *testing.T) { + + hashes := map[uint][32]byte{ + 1: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}, + 2: {2, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}, + } + b := EncodeHashes(hashes) + + hashes2, err := DecodeHashes(b) + assert.NoError(t, err) + assert.Equal(t, len(hashes), len(hashes2)) + assert.Equal(t, hashes[1], hashes2[1]) + assert.Equal(t, hashes[2], hashes2[2]) + +} + +func TestAlternateSources(t *testing.T) { + + sources := []AlternateSource{ + { + Offset: 0, + Length: 100, + Hash: [sha256.Size]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}, + Location: "somewhere", + }, + } + b := EncodeAlternateSources(sources) + + sources2, err := DecodeAlternateSources(b) + assert.NoError(t, err) + assert.Equal(t, len(sources), len(sources2)) + + assert.Equal(t, sources[0].Offset, sources2[0].Offset) + assert.Equal(t, sources[0].Length, sources2[0].Length) + assert.Equal(t, sources[0].Hash, sources2[0].Hash) + assert.Equal(t, sources[0].Location, sources2[0].Location) + +} diff --git a/pkg/storage/sources/s3_storage.go b/pkg/storage/sources/s3_storage.go index b0355c23..3e40c363 100644 --- a/pkg/storage/sources/s3_storage.go +++ b/pkg/storage/sources/s3_storage.go @@ -7,31 +7,43 @@ import ( "fmt" "io" "sync" + "sync/atomic" + "time" "github.com/loopholelabs/silo/pkg/storage" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" ) -/* var ( errNoSuchKey = errors.New("The specified key does not exist.") // Minio doesn't export errors ) -*/ type S3Storage struct { storage.StorageProviderWithEvents - client *minio.Client - bucket string - prefix string - size uint64 - block_size int - lockers []sync.RWMutex - contexts []context.CancelFunc - contexts_lock sync.Mutex + dummy bool + client *minio.Client + bucket string + prefix string + size uint64 + block_size int + lockers []sync.RWMutex + contexts []context.CancelFunc + contexts_lock sync.Mutex + metrics_blocks_w_count uint64 + metrics_blocks_w_data_bytes uint64 + metrics_blocks_w_bytes uint64 + metrics_blocks_w_time_ns uint64 + metrics_blocks_w_pre_r_count uint64 + metrics_blocks_w_pre_r_bytes uint64 + metrics_blocks_w_pre_r_time_ns uint64 + metrics_blocks_r_count uint64 + metrics_blocks_r_data_bytes uint64 + metrics_blocks_r_bytes uint64 + metrics_blocks_r_time_ns uint64 } -func NewS3Storage(endpoint string, +func NewS3Storage(secure bool, endpoint string, access string, secretAccess string, bucket string, @@ -41,7 +53,7 @@ func NewS3Storage(endpoint string, client, err := minio.New(endpoint, &minio.Options{ Creds: credentials.NewStaticV4(access, secretAccess, ""), - Secure: false, + Secure: secure, }) if err != nil { @@ -61,7 +73,21 @@ func NewS3Storage(endpoint string, }, nil } -func NewS3StorageCreate(endpoint string, +func NewS3StorageDummy(size uint64, + blockSize int) (*S3Storage, error) { + + numBlocks := (int(size) + blockSize - 1) / blockSize + + return &S3Storage{ + size: size, + block_size: blockSize, + dummy: true, + lockers: make([]sync.RWMutex, numBlocks), + contexts: make([]context.CancelFunc, numBlocks), + }, nil +} + +func NewS3StorageCreate(secure bool, endpoint string, access string, secretAccess string, bucket string, @@ -71,7 +97,7 @@ func NewS3StorageCreate(endpoint string, client, err := minio.New(endpoint, &minio.Options{ Creds: credentials.NewStaticV4(access, secretAccess, ""), - Secure: false, + Secure: secure, }) if err != nil { return nil, err @@ -89,18 +115,6 @@ func NewS3StorageCreate(endpoint string, } } - b_end := (int(size) + blockSize - 1) / blockSize - buffer := make([]byte, blockSize) - - for b := 0; b < b_end; b++ { - offset := b * blockSize - - _, err := client.PutObject(context.TODO(), bucket, fmt.Sprintf("%s-%d", prefix, offset), bytes.NewReader(buffer), int64(blockSize), minio.PutObjectOptions{}) - if err != nil { - return nil, err - } - } - numBlocks := (int(size) + blockSize - 1) / blockSize return &S3Storage{ @@ -138,13 +152,34 @@ func (i *S3Storage) ReadAt(buffer []byte, offset int64) (int, error) { errs := make(chan error, blocks) getData := func(buff []byte, off int64) (int, error) { - i.lockers[off/int64(i.block_size)].RLock() - obj, err := i.client.GetObject(context.TODO(), i.bucket, fmt.Sprintf("%s-%d", i.prefix, off), minio.GetObjectOptions{}) - i.lockers[off/int64(i.block_size)].RUnlock() - if err != nil { - return 0, err + if i.dummy { + atomic.AddUint64(&i.metrics_blocks_r_count, 1) + atomic.AddUint64(&i.metrics_blocks_r_bytes, uint64(len(buff))) + //atomic.AddUint64(&i.metrics_blocks_r_time_ns, uint64(dtime.Nanoseconds())) + return len(buff), nil + } else { + i.lockers[off/int64(i.block_size)].RLock() + ctime := time.Now() + obj, err := i.client.GetObject(context.TODO(), i.bucket, fmt.Sprintf("%s-%d", i.prefix, off), minio.GetObjectOptions{}) + i.lockers[off/int64(i.block_size)].RUnlock() + if err != nil { + if err.Error() == errNoSuchKey.Error() { + return len(buff), nil + } + return 0, err + } + n, err := obj.Read(buff) + dtime := time.Since(ctime) + if err == nil { + atomic.AddUint64(&i.metrics_blocks_r_count, 1) + atomic.AddUint64(&i.metrics_blocks_r_bytes, uint64(n)) + atomic.AddUint64(&i.metrics_blocks_r_time_ns, uint64(dtime.Nanoseconds())) + } else if err.Error() == errNoSuchKey.Error() { + return len(buff), nil + } + + return n, err } - return obj.Read(buff) } for b := b_start; b < b_end; b++ { @@ -184,6 +219,8 @@ func (i *S3Storage) ReadAt(buffer []byte, offset int64) (int, error) { } } + atomic.AddUint64(&i.metrics_blocks_r_data_bytes, uint64(len(buffer))) + return len(buffer), nil } @@ -201,40 +238,78 @@ func (i *S3Storage) WriteAt(buffer []byte, offset int64) (int, error) { errs := make(chan error, blocks) getData := func(buff []byte, off int64) (int, error) { - ctx := context.TODO() - i.lockers[off/int64(i.block_size)].RLock() - obj, err := i.client.GetObject(ctx, i.bucket, fmt.Sprintf("%s-%d", i.prefix, off), minio.GetObjectOptions{}) - i.lockers[off/int64(i.block_size)].RUnlock() - if err != nil { - return 0, err + if i.dummy { + atomic.AddUint64(&i.metrics_blocks_w_pre_r_count, 1) + atomic.AddUint64(&i.metrics_blocks_w_pre_r_bytes, uint64(len(buff))) + //atomic.AddUint64(&i.metrics_blocks_w_pre_r_time_ns, uint64(dtime.Nanoseconds())) + return len(buff), nil + } else { + ctx := context.TODO() + i.lockers[off/int64(i.block_size)].RLock() + ctime := time.Now() + obj, err := i.client.GetObject(ctx, i.bucket, fmt.Sprintf("%s-%d", i.prefix, off), minio.GetObjectOptions{}) + i.lockers[off/int64(i.block_size)].RUnlock() + if err != nil { + if err.Error() == errNoSuchKey.Error() { + return len(buff), nil + } + return 0, err + } + n, err := obj.Read(buff) + dtime := time.Since(ctime) + if err == nil { + atomic.AddUint64(&i.metrics_blocks_w_pre_r_count, 1) + atomic.AddUint64(&i.metrics_blocks_w_pre_r_bytes, uint64(n)) + atomic.AddUint64(&i.metrics_blocks_w_pre_r_time_ns, uint64(dtime.Nanoseconds())) + } else if err.Error() == errNoSuchKey.Error() { + return len(buff), nil + } + return n, err } - return obj.Read(buff) } putData := func(buff []byte, off int64) (int, error) { - block := off / int64(i.block_size) - ctx, cancelFn := context.WithCancel(context.TODO()) - i.lockers[block].Lock() - - i.setContext(int(block), cancelFn) - - obj, err := i.client.PutObject(ctx, i.bucket, fmt.Sprintf("%s-%d", i.prefix, off), - bytes.NewReader(buff), int64(i.block_size), - minio.PutObjectOptions{}) + if i.dummy { + atomic.AddUint64(&i.metrics_blocks_w_count, 1) + atomic.AddUint64(&i.metrics_blocks_w_bytes, uint64(len(buff))) + //atomic.AddUint64(&i.metrics_blocks_w_time_ns, uint64(dtime.Nanoseconds())) + return len(buff), nil + } else { + block := off / int64(i.block_size) + ctx, cancelFn := context.WithCancel(context.TODO()) + i.lockers[block].Lock() + + i.setContext(int(block), cancelFn) + + ctime := time.Now() + obj, err := i.client.PutObject(ctx, i.bucket, fmt.Sprintf("%s-%d", i.prefix, off), + bytes.NewReader(buff), int64(i.block_size), + minio.PutObjectOptions{}) + dtime := time.Since(ctime) + + i.setContext(int(block), nil) + i.lockers[block].Unlock() + + if err == nil { + atomic.AddUint64(&i.metrics_blocks_w_count, 1) + atomic.AddUint64(&i.metrics_blocks_w_bytes, uint64(obj.Size)) + atomic.AddUint64(&i.metrics_blocks_w_time_ns, uint64(dtime.Nanoseconds())) + } else { + // Currently, if the context was canceled, we ignore it. + if !errors.Is(err, context.Canceled) { + return 0, err + } + } - i.setContext(int(block), nil) - i.lockers[block].Unlock() - if err != nil { - return 0, err + return int(obj.Size), nil } - return int(obj.Size), nil } for b := b_start; b < b_end; b++ { go func(block_no uint) { block_offset := int64(block_no) * int64(i.block_size) var err error - if block_offset > offset { + if block_offset >= offset { // Partial write at the end if len(buffer[block_offset-offset:]) < i.block_size { block_buffer := make([]byte, i.block_size) @@ -276,6 +351,8 @@ func (i *S3Storage) WriteAt(buffer []byte, offset int64) (int, error) { } } + atomic.AddUint64(&i.metrics_blocks_w_data_bytes, uint64(len(buffer))) + return len(buffer), nil } @@ -305,3 +382,33 @@ func (i *S3Storage) CancelWrites(offset int64, length int64) { i.setContext(int(b), nil) } } + +type S3Metrics struct { + Blocks_w_count uint64 + Blocks_w_bytes uint64 + Blocks_w_data_bytes uint64 + Blocks_w_time time.Duration + Blocks_w_pre_r_count uint64 + Blocks_w_pre_r_bytes uint64 + Blocks_w_pre_r_time time.Duration + Blocks_r_count uint64 + Blocks_r_bytes uint64 + Blocks_r_data_bytes uint64 + Blocks_r_time time.Duration +} + +func (i *S3Storage) Metrics() *S3Metrics { + return &S3Metrics{ + Blocks_w_count: atomic.LoadUint64(&i.metrics_blocks_w_count), + Blocks_w_bytes: atomic.LoadUint64(&i.metrics_blocks_w_bytes), + Blocks_w_data_bytes: atomic.LoadUint64(&i.metrics_blocks_w_data_bytes), + Blocks_w_time: time.Duration(atomic.LoadUint64(&i.metrics_blocks_w_time_ns)), + Blocks_w_pre_r_count: atomic.LoadUint64(&i.metrics_blocks_w_pre_r_count), + Blocks_w_pre_r_bytes: atomic.LoadUint64(&i.metrics_blocks_w_pre_r_bytes), + Blocks_w_pre_r_time: time.Duration(atomic.LoadUint64(&i.metrics_blocks_w_pre_r_time_ns)), + Blocks_r_count: atomic.LoadUint64(&i.metrics_blocks_r_count), + Blocks_r_bytes: atomic.LoadUint64(&i.metrics_blocks_r_bytes), + Blocks_r_data_bytes: atomic.LoadUint64(&i.metrics_blocks_r_data_bytes), + Blocks_r_time: time.Duration(atomic.LoadUint64(&i.metrics_blocks_r_time_ns)), + } +} diff --git a/pkg/storage/sources/s3_storage_test.go b/pkg/storage/sources/s3_storage_test.go index 233fea32..2f1473bf 100644 --- a/pkg/storage/sources/s3_storage_test.go +++ b/pkg/storage/sources/s3_storage_test.go @@ -18,7 +18,7 @@ func TestS3Storage(t *testing.T) { size := 64 * 1024 blockSize := 1024 - s3store, err := sources.NewS3StorageCreate(fmt.Sprintf("localhost:%s", PORT_9000), "silosilo", "silosilo", "silosilo", "file", uint64(size), blockSize) + s3store, err := sources.NewS3StorageCreate(false, fmt.Sprintf("localhost:%s", PORT_9000), "silosilo", "silosilo", "silosilo", "file", uint64(size), blockSize) assert.NoError(t, err) buffer := make([]byte, 32*1024) @@ -44,7 +44,7 @@ func TestS3StorageCancelWrites(t *testing.T) { size := 64 * 1024 blockSize := 1024 - s3store, err := sources.NewS3StorageCreate(fmt.Sprintf("localhost:%s", PORT_9000), "silosilo", "silosilo", "silosilo", "file", uint64(size), blockSize) + s3store, err := sources.NewS3StorageCreate(false, fmt.Sprintf("localhost:%s", PORT_9000), "silosilo", "silosilo", "silosilo", "file", uint64(size), blockSize) assert.NoError(t, err) buffer := make([]byte, 32*1024) From 5d3c6c0503dd8feca08befdf6928d51f0c1103f4 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Wed, 23 Oct 2024 19:20:02 +0100 Subject: [PATCH 2/5] Changed S3Storage to report context.Cancelled if cancelWrites was called Signed-off-by: Jimmy Moore --- pkg/storage/sources/s3_storage.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/storage/sources/s3_storage.go b/pkg/storage/sources/s3_storage.go index 3e40c363..3606e332 100644 --- a/pkg/storage/sources/s3_storage.go +++ b/pkg/storage/sources/s3_storage.go @@ -295,10 +295,7 @@ func (i *S3Storage) WriteAt(buffer []byte, offset int64) (int, error) { atomic.AddUint64(&i.metrics_blocks_w_bytes, uint64(obj.Size)) atomic.AddUint64(&i.metrics_blocks_w_time_ns, uint64(dtime.Nanoseconds())) } else { - // Currently, if the context was canceled, we ignore it. - if !errors.Is(err, context.Canceled) { - return 0, err - } + return 0, err } return int(obj.Size), nil From 4a634d94f8ccd1385b7f2edba93eac4903b1899f Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Wed, 23 Oct 2024 19:22:04 +0100 Subject: [PATCH 3/5] lint fix Signed-off-by: Jimmy Moore --- pkg/storage/device/device_sync_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/storage/device/device_sync_test.go b/pkg/storage/device/device_sync_test.go index c1ae696c..e6a1c64a 100644 --- a/pkg/storage/device/device_sync_test.go +++ b/pkg/storage/device/device_sync_test.go @@ -72,7 +72,8 @@ func TestDeviceSync(t *testing.T) { // Do a few write here, and wait a little bit for sync to happen... for i := 0; i < num_blocks; i++ { wbuffer := make([]byte, block_size) - rand.Read(wbuffer) + _, err = rand.Read(wbuffer) + assert.NoError(t, err) n, err = prov.WriteAt(wbuffer, int64(i*block_size)) assert.NoError(t, err) assert.Equal(t, 64*1024, n) From 85a59c307665043ea4bdcd1631b3789050e1b630 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Thu, 24 Oct 2024 11:14:48 +0100 Subject: [PATCH 4/5] Fixed up device.go Signed-off-by: Jimmy Moore --- pkg/storage/device/device.go | 41 +++++++++++++----------------------- 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/pkg/storage/device/device.go b/pkg/storage/device/device.go index f0b60633..98db5758 100644 --- a/pkg/storage/device/device.go +++ b/pkg/storage/device/device.go @@ -19,7 +19,6 @@ import ( "github.com/loopholelabs/silo/pkg/storage/modules" "github.com/loopholelabs/silo/pkg/storage/protocol/packets" "github.com/loopholelabs/silo/pkg/storage/sources" - "github.com/rs/zerolog/log" ) const ( @@ -58,8 +57,6 @@ func NewDevices(ds []*config.DeviceSchema) (map[string]*Device, error) { } func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.ExposedStorage, error) { - device_schema := string(ds.Encode()) - log.Info().Str("schema", device_schema).Msg("Setting up NewDevice from schema") var prov storage.StorageProvider var err error @@ -174,7 +171,6 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose // Optionally use a copy on write RO source... if ds.ROSource != nil { - log.Info().Str("schema", device_schema).Msg("Setting up CopyOnWrite") // Create the ROSource... rodev, _, err := NewDevice(ds.ROSource) @@ -218,7 +214,6 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose // Optionally binlog this dev to a file if ds.Binlog != "" { - log.Info().Str("schema", string(ds.Encode())).Msg("Setting up BinLog") prov, err = modules.NewBinLog(prov, ds.Binlog) if err != nil { return nil, nil, err @@ -229,7 +224,6 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose // NB You may well need to call ex.SetProvider if you wish to insert other things in the chain. var ex storage.ExposedStorage if ds.Expose { - log.Info().Str("schema", device_schema).Msg("Setting up Expose device") ex = expose.NewExposedStorageNBDNL(prov, 8, 0, prov.Size(), expose.NBD_DEFAULT_BLOCK_SIZE, true) @@ -242,7 +236,6 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose // Optionally sync the device to S3 if ds.Sync != nil { - log.Info().Str("schema", device_schema).Msg("Setting up Sync") s3dest, err := sources.NewS3StorageCreate(ds.Sync.Secure, ds.Sync.Endpoint, @@ -284,23 +277,23 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose ctx, cancelfn := context.WithCancel(context.TODO()) // Start doing the sync... - syncer := migrator.NewSyncer(ctx, &migrator.Sync_config{ - Name: ds.Name, - Integrity: false, - Cancel_writes: true, - Dedupe_writes: true, - Tracker: sourceDirtyRemote, - Lockable: sourceStorage, - Destination: s3dest, - Orderer: orderer, - Dirty_check_period: check_period, - Dirty_block_getter: func() []uint { + syncer := migrator.NewSyncer(ctx, &migrator.SyncConfig{ + Name: ds.Name, + Integrity: false, + CancelWrites: true, + DedupeWrites: true, + Tracker: sourceDirtyRemote, + Lockable: sourceStorage, + Destination: s3dest, + Orderer: orderer, + DirtyCheckPeriod: check_period, + DirtyBlockGetter: func() []uint { return sourceDirtyRemote.GetDirtyBlocks( max_age, ds.Sync.Config.Limit, ds.Sync.Config.BlockShift, ds.Sync.Config.MinChanged) }, - Block_size: bs, - Progress_handler: func(p *migrator.MigrationProgress) {}, - Error_handler: func(b *storage.BlockInfo, err error) {}, + BlockSize: bs, + ProgressHandler: func(p *migrator.MigrationProgress) {}, + ErrorHandler: func(b *storage.BlockInfo, err error) {}, }) // The provider we return should feed into our sync here. @@ -317,7 +310,6 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose sync_lock.Unlock() return false } - log.Info().Str("schema", device_schema).Msg("Starting sync") sync_running = true wg.Add(1) sync_lock.Unlock() @@ -325,8 +317,7 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose // Sync happens here... go func() { // Do this in a goroutine, but make sure it's cancelled etc - status, err := syncer.Sync(false, true) - log.Info().Str("schema", device_schema).Err(err).Any("status", status).Msg("Sync finished") + _, _ = syncer.Sync(false, true) wg.Done() }() return true @@ -344,7 +335,6 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose sync_lock.Unlock() return nil } - log.Info().Str("schema", device_schema).Msg("Stopping sync") cancelfn() // WAIT HERE for the sync to finish wg.Wait() @@ -365,7 +355,6 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose alt_sources = append(alt_sources, as) } - log.Info().Str("schema", device_schema).Int("sources", len(alt_sources)).Msg("Sync stopped with sources") return alt_sources }) From 00bf562937ee19e728ecb1cf3d9d01050111a7ee Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Thu, 24 Oct 2024 11:29:31 +0100 Subject: [PATCH 5/5] Lint fixes in S3Storage Signed-off-by: Jimmy Moore --- pkg/storage/device/device_sync_test.go | 2 +- pkg/storage/sources/s3_storage.go | 254 ++++++++++++------------- 2 files changed, 127 insertions(+), 129 deletions(-) diff --git a/pkg/storage/device/device_sync_test.go b/pkg/storage/device/device_sync_test.go index e6a1c64a..5e085b96 100644 --- a/pkg/storage/device/device_sync_test.go +++ b/pkg/storage/device/device_sync_test.go @@ -113,7 +113,7 @@ func TestDeviceSync(t *testing.T) { metrics := stats[0].(*sources.S3Metrics) // Do some asserts on the S3Metrics... It should have written each block at least once by now. - assert.GreaterOrEqual(t, num_blocks, int(metrics.Blocks_w_count)) + assert.GreaterOrEqual(t, num_blocks, int(metrics.BlocksWCount)) prov.Close() } diff --git a/pkg/storage/sources/s3_storage.go b/pkg/storage/sources/s3_storage.go index 3606e332..47205e60 100644 --- a/pkg/storage/sources/s3_storage.go +++ b/pkg/storage/sources/s3_storage.go @@ -21,26 +21,26 @@ var ( type S3Storage struct { storage.StorageProviderWithEvents - dummy bool - client *minio.Client - bucket string - prefix string - size uint64 - block_size int - lockers []sync.RWMutex - contexts []context.CancelFunc - contexts_lock sync.Mutex - metrics_blocks_w_count uint64 - metrics_blocks_w_data_bytes uint64 - metrics_blocks_w_bytes uint64 - metrics_blocks_w_time_ns uint64 - metrics_blocks_w_pre_r_count uint64 - metrics_blocks_w_pre_r_bytes uint64 - metrics_blocks_w_pre_r_time_ns uint64 - metrics_blocks_r_count uint64 - metrics_blocks_r_data_bytes uint64 - metrics_blocks_r_bytes uint64 - metrics_blocks_r_time_ns uint64 + dummy bool + client *minio.Client + bucket string + prefix string + size uint64 + blockSize int + lockers []sync.RWMutex + contexts []context.CancelFunc + contextsLock sync.Mutex + metricsBlocksWCount uint64 + metricsBlocksWDataBytes uint64 + metricsBlocksWBytes uint64 + metricsBlocksWTimeNS uint64 + metricsBlocksWPreRCount uint64 + metricsBlocksWPreRBytes uint64 + metricsBlocksWPreRTimeNS uint64 + metricsBlocksRCount uint64 + metricsBlocksRDataBytes uint64 + metricsBlocksRBytes uint64 + metricsBlocksRTimeNS uint64 } func NewS3Storage(secure bool, endpoint string, @@ -63,13 +63,13 @@ func NewS3Storage(secure bool, endpoint string, numBlocks := (int(size) + blockSize - 1) / blockSize return &S3Storage{ - size: size, - block_size: blockSize, - client: client, - bucket: bucket, - prefix: prefix, - lockers: make([]sync.RWMutex, numBlocks), - contexts: make([]context.CancelFunc, numBlocks), + size: size, + blockSize: blockSize, + client: client, + bucket: bucket, + prefix: prefix, + lockers: make([]sync.RWMutex, numBlocks), + contexts: make([]context.CancelFunc, numBlocks), }, nil } @@ -79,11 +79,11 @@ func NewS3StorageDummy(size uint64, numBlocks := (int(size) + blockSize - 1) / blockSize return &S3Storage{ - size: size, - block_size: blockSize, - dummy: true, - lockers: make([]sync.RWMutex, numBlocks), - contexts: make([]context.CancelFunc, numBlocks), + size: size, + blockSize: blockSize, + dummy: true, + lockers: make([]sync.RWMutex, numBlocks), + contexts: make([]context.CancelFunc, numBlocks), }, nil } @@ -118,24 +118,24 @@ func NewS3StorageCreate(secure bool, endpoint string, numBlocks := (int(size) + blockSize - 1) / blockSize return &S3Storage{ - size: size, - block_size: blockSize, - client: client, - bucket: bucket, - prefix: prefix, - lockers: make([]sync.RWMutex, numBlocks), - contexts: make([]context.CancelFunc, numBlocks), + size: size, + blockSize: blockSize, + client: client, + bucket: bucket, + prefix: prefix, + lockers: make([]sync.RWMutex, numBlocks), + contexts: make([]context.CancelFunc, numBlocks), }, nil } func (i *S3Storage) setContext(block int, cancel context.CancelFunc) { - i.contexts_lock.Lock() + i.contextsLock.Lock() ex := i.contexts[block] if ex != nil { ex() // Cancel any existing context for this block } i.contexts[block] = cancel // Set to the new context - i.contexts_lock.Unlock() + i.contextsLock.Unlock() } func (i *S3Storage) ReadAt(buffer []byte, offset int64) (int, error) { @@ -145,23 +145,23 @@ func (i *S3Storage) ReadAt(buffer []byte, offset int64) (int, error) { end = i.size } - b_start := uint(offset / int64(i.block_size)) - b_end := uint((end-1)/uint64(i.block_size)) + 1 + bStart := uint(offset / int64(i.blockSize)) + bEnd := uint((end-1)/uint64(i.blockSize)) + 1 - blocks := b_end - b_start + blocks := bEnd - bStart errs := make(chan error, blocks) getData := func(buff []byte, off int64) (int, error) { if i.dummy { - atomic.AddUint64(&i.metrics_blocks_r_count, 1) - atomic.AddUint64(&i.metrics_blocks_r_bytes, uint64(len(buff))) + atomic.AddUint64(&i.metricsBlocksRCount, 1) + atomic.AddUint64(&i.metricsBlocksRBytes, uint64(len(buff))) //atomic.AddUint64(&i.metrics_blocks_r_time_ns, uint64(dtime.Nanoseconds())) return len(buff), nil } else { - i.lockers[off/int64(i.block_size)].RLock() + i.lockers[off/int64(i.blockSize)].RLock() ctime := time.Now() obj, err := i.client.GetObject(context.TODO(), i.bucket, fmt.Sprintf("%s-%d", i.prefix, off), minio.GetObjectOptions{}) - i.lockers[off/int64(i.block_size)].RUnlock() + i.lockers[off/int64(i.blockSize)].RUnlock() if err != nil { if err.Error() == errNoSuchKey.Error() { return len(buff), nil @@ -171,9 +171,9 @@ func (i *S3Storage) ReadAt(buffer []byte, offset int64) (int, error) { n, err := obj.Read(buff) dtime := time.Since(ctime) if err == nil { - atomic.AddUint64(&i.metrics_blocks_r_count, 1) - atomic.AddUint64(&i.metrics_blocks_r_bytes, uint64(n)) - atomic.AddUint64(&i.metrics_blocks_r_time_ns, uint64(dtime.Nanoseconds())) + atomic.AddUint64(&i.metricsBlocksRCount, 1) + atomic.AddUint64(&i.metricsBlocksRBytes, uint64(n)) + atomic.AddUint64(&i.metricsBlocksRTimeNS, uint64(dtime.Nanoseconds())) } else if err.Error() == errNoSuchKey.Error() { return len(buff), nil } @@ -182,44 +182,44 @@ func (i *S3Storage) ReadAt(buffer []byte, offset int64) (int, error) { } } - for b := b_start; b < b_end; b++ { - go func(block_no uint) { - block_offset := int64(block_no) * int64(i.block_size) + for b := bStart; b < bEnd; b++ { + go func(blockNo uint) { + blockOffset := int64(blockNo) * int64(i.blockSize) var err error - if block_offset > offset { + if blockOffset > offset { // Partial read at the end - if len(buffer[block_offset-offset:]) < i.block_size { - block_buffer := make([]byte, i.block_size) - _, err = getData(block_buffer, block_offset) - copy(buffer[block_offset-offset:], block_buffer) + if len(buffer[blockOffset-offset:]) < i.blockSize { + blockBuffer := make([]byte, i.blockSize) + _, err = getData(blockBuffer, blockOffset) + copy(buffer[blockOffset-offset:], blockBuffer) } else { // Complete read in the middle - s := block_offset - offset - e := s + int64(i.block_size) + s := blockOffset - offset + e := s + int64(i.blockSize) if e > int64(len(buffer)) { e = int64(len(buffer)) } - _, err = getData(buffer[s:e], block_offset) + _, err = getData(buffer[s:e], blockOffset) } } else { // Partial read at the start - block_buffer := make([]byte, i.block_size) - _, err = getData(block_buffer, block_offset) - copy(buffer, block_buffer[offset-block_offset:]) + blockBuffer := make([]byte, i.blockSize) + _, err = getData(blockBuffer, blockOffset) + copy(buffer, blockBuffer[offset-blockOffset:]) } errs <- err }(b) } // Wait for completion, Check for errors and return... - for b := b_start; b < b_end; b++ { + for b := bStart; b < bEnd; b++ { e := <-errs if e != nil && !errors.Is(e, io.EOF) { return 0, e } } - atomic.AddUint64(&i.metrics_blocks_r_data_bytes, uint64(len(buffer))) + atomic.AddUint64(&i.metricsBlocksRDataBytes, uint64(len(buffer))) return len(buffer), nil } @@ -231,24 +231,23 @@ func (i *S3Storage) WriteAt(buffer []byte, offset int64) (int, error) { end = i.size } - b_start := uint(offset / int64(i.block_size)) - b_end := uint((end-1)/uint64(i.block_size)) + 1 + bStart := uint(offset / int64(i.blockSize)) + bEnd := uint((end-1)/uint64(i.blockSize)) + 1 - blocks := b_end - b_start + blocks := bEnd - bStart errs := make(chan error, blocks) getData := func(buff []byte, off int64) (int, error) { if i.dummy { - atomic.AddUint64(&i.metrics_blocks_w_pre_r_count, 1) - atomic.AddUint64(&i.metrics_blocks_w_pre_r_bytes, uint64(len(buff))) - //atomic.AddUint64(&i.metrics_blocks_w_pre_r_time_ns, uint64(dtime.Nanoseconds())) + atomic.AddUint64(&i.metricsBlocksWPreRCount, 1) + atomic.AddUint64(&i.metricsBlocksWPreRBytes, uint64(len(buff))) return len(buff), nil } else { ctx := context.TODO() - i.lockers[off/int64(i.block_size)].RLock() + i.lockers[off/int64(i.blockSize)].RLock() ctime := time.Now() obj, err := i.client.GetObject(ctx, i.bucket, fmt.Sprintf("%s-%d", i.prefix, off), minio.GetObjectOptions{}) - i.lockers[off/int64(i.block_size)].RUnlock() + i.lockers[off/int64(i.blockSize)].RUnlock() if err != nil { if err.Error() == errNoSuchKey.Error() { return len(buff), nil @@ -258,9 +257,9 @@ func (i *S3Storage) WriteAt(buffer []byte, offset int64) (int, error) { n, err := obj.Read(buff) dtime := time.Since(ctime) if err == nil { - atomic.AddUint64(&i.metrics_blocks_w_pre_r_count, 1) - atomic.AddUint64(&i.metrics_blocks_w_pre_r_bytes, uint64(n)) - atomic.AddUint64(&i.metrics_blocks_w_pre_r_time_ns, uint64(dtime.Nanoseconds())) + atomic.AddUint64(&i.metricsBlocksWPreRCount, 1) + atomic.AddUint64(&i.metricsBlocksWPreRBytes, uint64(n)) + atomic.AddUint64(&i.metricsBlocksWPreRTimeNS, uint64(dtime.Nanoseconds())) } else if err.Error() == errNoSuchKey.Error() { return len(buff), nil } @@ -270,12 +269,11 @@ func (i *S3Storage) WriteAt(buffer []byte, offset int64) (int, error) { putData := func(buff []byte, off int64) (int, error) { if i.dummy { - atomic.AddUint64(&i.metrics_blocks_w_count, 1) - atomic.AddUint64(&i.metrics_blocks_w_bytes, uint64(len(buff))) - //atomic.AddUint64(&i.metrics_blocks_w_time_ns, uint64(dtime.Nanoseconds())) + atomic.AddUint64(&i.metricsBlocksWCount, 1) + atomic.AddUint64(&i.metricsBlocksWBytes, uint64(len(buff))) return len(buff), nil } else { - block := off / int64(i.block_size) + block := off / int64(i.blockSize) ctx, cancelFn := context.WithCancel(context.TODO()) i.lockers[block].Lock() @@ -283,7 +281,7 @@ func (i *S3Storage) WriteAt(buffer []byte, offset int64) (int, error) { ctime := time.Now() obj, err := i.client.PutObject(ctx, i.bucket, fmt.Sprintf("%s-%d", i.prefix, off), - bytes.NewReader(buff), int64(i.block_size), + bytes.NewReader(buff), int64(i.blockSize), minio.PutObjectOptions{}) dtime := time.Since(ctime) @@ -291,9 +289,9 @@ func (i *S3Storage) WriteAt(buffer []byte, offset int64) (int, error) { i.lockers[block].Unlock() if err == nil { - atomic.AddUint64(&i.metrics_blocks_w_count, 1) - atomic.AddUint64(&i.metrics_blocks_w_bytes, uint64(obj.Size)) - atomic.AddUint64(&i.metrics_blocks_w_time_ns, uint64(dtime.Nanoseconds())) + atomic.AddUint64(&i.metricsBlocksWCount, 1) + atomic.AddUint64(&i.metricsBlocksWBytes, uint64(obj.Size)) + atomic.AddUint64(&i.metricsBlocksWTimeNS, uint64(dtime.Nanoseconds())) } else { return 0, err } @@ -302,38 +300,38 @@ func (i *S3Storage) WriteAt(buffer []byte, offset int64) (int, error) { } } - for b := b_start; b < b_end; b++ { + for b := bStart; b < bEnd; b++ { go func(block_no uint) { - block_offset := int64(block_no) * int64(i.block_size) + blockOffset := int64(block_no) * int64(i.blockSize) var err error - if block_offset >= offset { + if blockOffset >= offset { // Partial write at the end - if len(buffer[block_offset-offset:]) < i.block_size { - block_buffer := make([]byte, i.block_size) + if len(buffer[blockOffset-offset:]) < i.blockSize { + blockBuffer := make([]byte, i.blockSize) // Read existing data - _, err = getData(block_buffer, block_offset) + _, err = getData(blockBuffer, blockOffset) if err == nil || errors.Is(err, io.EOF) { // Update the data - copy(block_buffer, buffer[block_offset-offset:]) + copy(blockBuffer, buffer[blockOffset-offset:]) // Write back - _, err = putData(block_buffer, block_offset) + _, err = putData(blockBuffer, blockOffset) } } else { // Complete write in the middle - s := block_offset - offset - e := s + int64(i.block_size) + s := blockOffset - offset + e := s + int64(i.blockSize) if e > int64(len(buffer)) { e = int64(len(buffer)) } - _, err = putData(buffer[s:e], block_offset) + _, err = putData(buffer[s:e], blockOffset) } } else { // Partial write at the start - block_buffer := make([]byte, i.block_size) - _, err = getData(block_buffer, block_offset) + blockBuffer := make([]byte, i.blockSize) + _, err = getData(blockBuffer, blockOffset) if err == nil || errors.Is(err, io.EOF) { - copy(block_buffer[offset-block_offset:], buffer) - _, err = putData(block_buffer, block_offset) + copy(blockBuffer[offset-blockOffset:], buffer) + _, err = putData(blockBuffer, blockOffset) } } errs <- err @@ -341,14 +339,14 @@ func (i *S3Storage) WriteAt(buffer []byte, offset int64) (int, error) { } // Wait for completion, Check for errors and return... - for b := b_start; b < b_end; b++ { + for b := bStart; b < bEnd; b++ { e := <-errs if e != nil && !errors.Is(e, io.EOF) { return 0, e } } - atomic.AddUint64(&i.metrics_blocks_w_data_bytes, uint64(len(buffer))) + atomic.AddUint64(&i.metricsBlocksWDataBytes, uint64(len(buffer))) return len(buffer), nil } @@ -371,41 +369,41 @@ func (i *S3Storage) CancelWrites(offset int64, length int64) { end = i.size } - b_start := uint(offset / int64(i.block_size)) - b_end := uint((end-1)/uint64(i.block_size)) + 1 + bStart := uint(offset / int64(i.blockSize)) + bEnd := uint((end-1)/uint64(i.blockSize)) + 1 - for b := b_start; b < b_end; b++ { + for b := bStart; b < bEnd; b++ { // Cancel any writes for the given block... i.setContext(int(b), nil) } } type S3Metrics struct { - Blocks_w_count uint64 - Blocks_w_bytes uint64 - Blocks_w_data_bytes uint64 - Blocks_w_time time.Duration - Blocks_w_pre_r_count uint64 - Blocks_w_pre_r_bytes uint64 - Blocks_w_pre_r_time time.Duration - Blocks_r_count uint64 - Blocks_r_bytes uint64 - Blocks_r_data_bytes uint64 - Blocks_r_time time.Duration + BlocksWCount uint64 + BlocksWBytes uint64 + BlocksWDataBytes uint64 + BlocksWTime time.Duration + BlocksWPreRCount uint64 + BlocksWPreRBytes uint64 + BlocksWPreRTime time.Duration + BlocksRCount uint64 + BlocksRBytes uint64 + BlocksRDataBytes uint64 + BlocksRTime time.Duration } func (i *S3Storage) Metrics() *S3Metrics { return &S3Metrics{ - Blocks_w_count: atomic.LoadUint64(&i.metrics_blocks_w_count), - Blocks_w_bytes: atomic.LoadUint64(&i.metrics_blocks_w_bytes), - Blocks_w_data_bytes: atomic.LoadUint64(&i.metrics_blocks_w_data_bytes), - Blocks_w_time: time.Duration(atomic.LoadUint64(&i.metrics_blocks_w_time_ns)), - Blocks_w_pre_r_count: atomic.LoadUint64(&i.metrics_blocks_w_pre_r_count), - Blocks_w_pre_r_bytes: atomic.LoadUint64(&i.metrics_blocks_w_pre_r_bytes), - Blocks_w_pre_r_time: time.Duration(atomic.LoadUint64(&i.metrics_blocks_w_pre_r_time_ns)), - Blocks_r_count: atomic.LoadUint64(&i.metrics_blocks_r_count), - Blocks_r_bytes: atomic.LoadUint64(&i.metrics_blocks_r_bytes), - Blocks_r_data_bytes: atomic.LoadUint64(&i.metrics_blocks_r_data_bytes), - Blocks_r_time: time.Duration(atomic.LoadUint64(&i.metrics_blocks_r_time_ns)), + BlocksWCount: atomic.LoadUint64(&i.metricsBlocksWCount), + BlocksWBytes: atomic.LoadUint64(&i.metricsBlocksWBytes), + BlocksWDataBytes: atomic.LoadUint64(&i.metricsBlocksWDataBytes), + BlocksWTime: time.Duration(atomic.LoadUint64(&i.metricsBlocksWTimeNS)), + BlocksWPreRCount: atomic.LoadUint64(&i.metricsBlocksWPreRCount), + BlocksWPreRBytes: atomic.LoadUint64(&i.metricsBlocksWPreRBytes), + BlocksWPreRTime: time.Duration(atomic.LoadUint64(&i.metricsBlocksWPreRTimeNS)), + BlocksRCount: atomic.LoadUint64(&i.metricsBlocksRCount), + BlocksRBytes: atomic.LoadUint64(&i.metricsBlocksRBytes), + BlocksRDataBytes: atomic.LoadUint64(&i.metricsBlocksRDataBytes), + BlocksRTime: time.Duration(atomic.LoadUint64(&i.metricsBlocksRTimeNS)), } }