Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jamesmoore/arch 161 silo add sync to newdevice config #39

Merged
merged 5 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func sync_setup_device(conf *config.DeviceSchema) (*syncStorageInfo, error) {
if sync_dummy {
dest = modules.NewNothing(sourceStorage.Size())
} else {
dest, err = sources.NewS3StorageCreate(sync_endpoint,
dest, err = sources.NewS3StorageCreate(false, sync_endpoint,
sync_access,
sync_secret,
sync_bucket,
Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/config/silo.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ type DeviceSchema struct {
ROSource *DeviceSchema `hcl:"source,block"`
Binlog string `hcl:"binlog,optional"`
PageServerPID int `hcl:"pid,optional"`
Sync *SyncS3Schema `hcl:"sync,block"`
}

type SyncConfigSchema struct {
BlockShift int `hcl:"blockshift,attr"`
MaxAge string `hcl:"maxage,attr"`
MinChanged int `hcl:"minchanged,attr"`
CheckPeriod string `hcl:"checkperiod,attr"`
Limit int `hcl:"limit,attr"`
}

type SyncS3Schema struct {
Secure bool `hcl:"secure,attr"`
AccessKey string `hcl:"accesskey,attr"`
SecretKey string `hcl:"secretkey,attr"`
Endpoint string `hcl:"endpoint,attr"`
Bucket string `hcl:"bucket,attr"`
Config *SyncConfigSchema `hcl:"config,block"`
}

func parseByteValue(val string) int64 {
Expand Down
136 changes: 136 additions & 0 deletions pkg/storage/device/device.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package device

import (
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"path"
"sync"
"time"

"github.com/loopholelabs/silo/pkg/storage"
"github.com/loopholelabs/silo/pkg/storage/blocks"
"github.com/loopholelabs/silo/pkg/storage/config"
"github.com/loopholelabs/silo/pkg/storage/dirtytracker"
"github.com/loopholelabs/silo/pkg/storage/expose"
"github.com/loopholelabs/silo/pkg/storage/migrator"
"github.com/loopholelabs/silo/pkg/storage/modules"
"github.com/loopholelabs/silo/pkg/storage/protocol/packets"
"github.com/loopholelabs/silo/pkg/storage/sources"
)

Expand Down Expand Up @@ -50,6 +57,7 @@ func NewDevices(ds []*config.DeviceSchema) (map[string]*Device, error) {
}

func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.ExposedStorage, error) {

var prov storage.StorageProvider
var err error

Expand Down Expand Up @@ -163,6 +171,7 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose

// Optionally use a copy on write RO source...
if ds.ROSource != nil {

// Create the ROSource...
rodev, _, err := NewDevice(ds.ROSource)
if err != nil {
Expand Down Expand Up @@ -215,6 +224,7 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose
// NB You may well need to call ex.SetProvider if you wish to insert other things in the chain.
var ex storage.ExposedStorage
if ds.Expose {

ex = expose.NewExposedStorageNBDNL(prov, 8, 0, prov.Size(), expose.NBD_DEFAULT_BLOCK_SIZE, true)

err := ex.Init()
Expand All @@ -224,5 +234,131 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose
}
}

// Optionally sync the device to S3
if ds.Sync != nil {

s3dest, err := sources.NewS3StorageCreate(ds.Sync.Secure,
ds.Sync.Endpoint,
ds.Sync.AccessKey,
ds.Sync.SecretKey,
ds.Sync.Bucket,
ds.Name,
prov.Size(),
bs)

if err != nil {
prov.Close()
return nil, nil, err
}

dirty_block_size := bs >> ds.Sync.Config.BlockShift

num_blocks := (int(prov.Size()) + bs - 1) / bs

sourceDirtyLocal, sourceDirtyRemote := dirtytracker.NewDirtyTracker(prov, dirty_block_size)
sourceStorage := modules.NewLockable(sourceDirtyLocal)

// Setup a block order
orderer := blocks.NewAnyBlockOrder(num_blocks, nil)
orderer.AddAll()

check_period, err := time.ParseDuration(ds.Sync.Config.CheckPeriod)
if err != nil {
prov.Close()
return nil, nil, err
}

max_age, err := time.ParseDuration(ds.Sync.Config.MaxAge)
if err != nil {
prov.Close()
return nil, nil, err
}

ctx, cancelfn := context.WithCancel(context.TODO())

// Start doing the sync...
syncer := migrator.NewSyncer(ctx, &migrator.SyncConfig{
Name: ds.Name,
Integrity: false,
CancelWrites: true,
DedupeWrites: true,
Tracker: sourceDirtyRemote,
Lockable: sourceStorage,
Destination: s3dest,
Orderer: orderer,
DirtyCheckPeriod: check_period,
DirtyBlockGetter: func() []uint {
return sourceDirtyRemote.GetDirtyBlocks(
max_age, ds.Sync.Config.Limit, ds.Sync.Config.BlockShift, ds.Sync.Config.MinChanged)
},
BlockSize: bs,
ProgressHandler: func(p *migrator.MigrationProgress) {},
ErrorHandler: func(b *storage.BlockInfo, err error) {},
})

// The provider we return should feed into our sync here.
prov = sourceStorage

var sync_lock sync.Mutex
var sync_running bool
var wg sync.WaitGroup

// If the storage gets a "sync.start", we should start syncing to S3.
storage.AddEventNotification(prov, "sync.start", func(event_type storage.EventType, data storage.EventData) storage.EventReturnData {
sync_lock.Lock()
if sync_running {
sync_lock.Unlock()
return false
}
sync_running = true
wg.Add(1)
sync_lock.Unlock()

// Sync happens here...
go func() {
// Do this in a goroutine, but make sure it's cancelled etc
_, _ = syncer.Sync(false, true)
wg.Done()
}()
return true
})

// If the storage gets a "sync.status", get some status on the S3Storage
storage.AddEventNotification(prov, "sync.status", func(event_type storage.EventType, data storage.EventData) storage.EventReturnData {
return s3dest.Metrics()
})

// If the storage gets a "sync.stop", we should cancel the sync, and return the safe blocks
storage.AddEventNotification(prov, "sync.stop", func(event_type storage.EventType, data storage.EventData) storage.EventReturnData {
sync_lock.Lock()
if !sync_running {
sync_lock.Unlock()
return nil
}
cancelfn()
// WAIT HERE for the sync to finish
wg.Wait()
sync_running = false
sync_lock.Unlock()

// Get the list of safe blocks we can use.
blocks := syncer.GetSafeBlockMap()
// Translate these to locations so they can be sent to a destination...
alt_sources := make([]packets.AlternateSource, 0)
for block, hash := range blocks {
as := packets.AlternateSource{
Offset: int64(block * uint(bs)),
Length: int64(bs),
Hash: hash,
Location: fmt.Sprintf("%s %s %s", ds.Sync.Endpoint, ds.Sync.Bucket, ds.Name),
}
alt_sources = append(alt_sources, as)
}

return alt_sources
})

}

return prov, ex, nil
}
119 changes: 119 additions & 0 deletions pkg/storage/device/device_sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package device

import (
"crypto/rand"
"crypto/sha256"
"fmt"
"os"
"testing"
"time"

"github.com/loopholelabs/silo/pkg/storage"
"github.com/loopholelabs/silo/pkg/storage/config"
"github.com/loopholelabs/silo/pkg/storage/protocol/packets"
"github.com/loopholelabs/silo/pkg/storage/sources"
"github.com/loopholelabs/silo/pkg/testutils"
"github.com/stretchr/testify/assert"
)

func TestDeviceSync(t *testing.T) {
PORT_9000 := testutils.SetupMinio(t.Cleanup)

block_size := 64 * 1024

testSyncSchema := fmt.Sprintf(`
device TestSync {
system = "file"
size = "1m"
blocksize = "64k"
location = "./testdata/testfile_sync"
sync {
secure = false
accesskey = "silosilo"
secretkey = "silosilo"
endpoint = "%s"
bucket = "silosilo"
config {
blockshift = 2
maxage = "100ms"
minchanged = 4
limit = 256
checkperiod = "100ms"
}
}
}
`, fmt.Sprintf("localhost:%s", PORT_9000))

s := new(config.SiloSchema)
err := s.Decode([]byte(testSyncSchema))
assert.NoError(t, err)
devs, err := NewDevices(s.Device)
assert.NoError(t, err)
t.Cleanup(func() {
os.Remove("./testdata/testfile_sync")
})

assert.Equal(t, 1, len(devs))

prov := devs["TestSync"].Provider

num_blocks := (int(prov.Size()) + block_size - 1) / block_size

buffer := make([]byte, 1024*1024)
_, err = rand.Read(buffer)
assert.NoError(t, err)
n, err := prov.WriteAt(buffer, 0)
assert.NoError(t, err)
assert.Equal(t, 1024*1024, n)

// Tell the sync to start.
storage.SendEvent(prov, "sync.start", nil)

// Do a few write here, and wait a little bit for sync to happen...
for i := 0; i < num_blocks; i++ {
wbuffer := make([]byte, block_size)
_, err = rand.Read(wbuffer)
assert.NoError(t, err)
n, err = prov.WriteAt(wbuffer, int64(i*block_size))
assert.NoError(t, err)
assert.Equal(t, 64*1024, n)
}

// Should be enough time here to migrate the changed data blocks, since we have set the config.
time.Sleep(500 * time.Millisecond)

// Tell the sync to stop, and return the AlternateSource details.
asources := storage.SendEvent(prov, "sync.stop", nil)

locs := make([]string, 0)

for _, r := range asources {
alt := r.([]packets.AlternateSource)
for _, as := range alt {
// Check the data matches what we have locally...
buff := make([]byte, as.Length)
n, err := prov.ReadAt(buff, as.Offset)
assert.NoError(t, err)
assert.Equal(t, n, int(as.Length))

hash := sha256.Sum256(buff)
assert.Equal(t, hash, as.Hash)

locs = append(locs, as.Location)
}
}

// If everything worked, all blocks should be present on S3.
assert.Equal(t, num_blocks, len(locs))

// Get some statistics
stats := storage.SendEvent(prov, "sync.status", nil)

assert.Equal(t, 1, len(stats))
metrics := stats[0].(*sources.S3Metrics)

// Do some asserts on the S3Metrics... It should have written each block at least once by now.
assert.GreaterOrEqual(t, num_blocks, int(metrics.BlocksWCount))

prov.Close()
}
2 changes: 1 addition & 1 deletion pkg/storage/migrator/migrator_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestMigratorToS3(t *testing.T) {
orderer.AddAll()

// START moving data from sourceStorage to destStorage
destStorage, err := sources.NewS3StorageCreate(fmt.Sprintf("localhost:%s", PORT_9000), "silosilo", "silosilo", "silosilo", "file", uint64(size), blockSize)
destStorage, err := sources.NewS3StorageCreate(false, fmt.Sprintf("localhost:%s", PORT_9000), "silosilo", "silosilo", "silosilo", "file", uint64(size), blockSize)

assert.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/migrator/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestSyncToS3(t *testing.T) {
orderer.AddAll()

// START moving data from sourceStorage to destStorage
destStorage, err := sources.NewS3StorageCreate(fmt.Sprintf("localhost:%s", PORT_9000), "silosilo", "silosilo", "silosilo", "file", uint64(size), blockSize)
destStorage, err := sources.NewS3StorageCreate(false, fmt.Sprintf("localhost:%s", PORT_9000), "silosilo", "silosilo", "silosilo", "file", uint64(size), blockSize)

assert.NoError(t, err)

Expand Down
Loading
Loading