From 531e444c5f92363973f403d222b88fe17c906617 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Thu, 29 Feb 2024 13:00:52 +0000 Subject: [PATCH] Tidied up 'connect', can now receive multiple devs --- cmd/connect.go | 107 ++++++++++------------------ pkg/storage/protocol/protocol_rw.go | 6 +- testing/protocol_test.go | 2 +- 3 files changed, 42 insertions(+), 73 deletions(-) diff --git a/cmd/connect.go b/cmd/connect.go index 6404e67c..85340a03 100644 --- a/cmd/connect.go +++ b/cmd/connect.go @@ -4,17 +4,14 @@ import ( "context" "fmt" "net" - "net/http" "os" "os/signal" "syscall" - "time" "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/modules" "github.com/loopholelabs/silo/pkg/storage/protocol" "github.com/loopholelabs/silo/pkg/storage/sources" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" ) @@ -30,6 +27,8 @@ var ( var connect_addr string var connect_expose_dev bool +var exposed []storage.ExposedStorage + func init() { rootCmd.AddCommand(cmdConnect) cmdConnect.Flags().StringVarP(&connect_addr, "addr", "a", "localhost:5170", "Address to serve from") @@ -37,14 +36,9 @@ func init() { } func runConnect(ccmd *cobra.Command, args []string) { - fmt.Printf("Starting silo connect from %s\n", connect_addr) - - // Setup some statistics output - http.Handle("/metrics", promhttp.Handler()) - go http.ListenAndServe(":4114", nil) + fmt.Printf("Starting silo connect from source %s\n", connect_addr) - // Size of migration blocks - block_size := 1024 * 64 + exposed = make([]storage.ExposedStorage, 0) // Connect to the source con, err := net.Dial("tcp", connect_addr) @@ -52,23 +46,47 @@ func runConnect(ccmd *cobra.Command, args []string) { panic("Error connecting") } - var p storage.ExposedStorage + // Wrap the connection in a protocol, and handle incoming devices + pro := protocol.NewProtocolRW(context.TODO(), con, con, handleIncomingDevice) + + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + + for _, e := range exposed { + fmt.Printf("\nShutting down cleanly...\n") + shutdown(e) + } + os.Exit(1) + }() + + // Let the protocol do its thing. + err = pro.Handle() + if err != nil { + fmt.Printf("Silo protocol error %v\n", err) + } +} + +// Handle a new incoming device +func handleIncomingDevice(pro protocol.Protocol, dev uint32) { + // Size of migration blocks + // TODO: Configurable + block_size := 1024 * 64 + var destStorage storage.StorageProvider var destWaitingLocal *modules.WaitingCacheLocal var destWaitingRemote *modules.WaitingCacheRemote - var destStorageMetrics *modules.Metrics var dest *modules.FromProtocol destWaitingRemoteFactory := func(di *protocol.DevInfo) storage.StorageProvider { - fmt.Printf("Received DevInfo size=%d\n", di.Size) + fmt.Printf("Received DevInfo name=%s size=%d\n", di.Name, di.Size) cr := func(s int) storage.StorageProvider { return sources.NewMemoryStorage(s) } // Setup some sharded memory storage (for concurrent write speed) destStorage = modules.NewShardedStorage(int(di.Size), int(di.Size/1024), cr) - // Wrap it in metrics destWaitingLocal, destWaitingRemote = modules.NewWaitingCache(destStorage, block_size) - destStorageMetrics = modules.NewMetrics(destWaitingLocal) // Connect the waitingCache to the FromProtocol destWaitingLocal.NeedAt = func(offset int64, length int32) { @@ -81,79 +99,30 @@ func runConnect(ccmd *cobra.Command, args []string) { // Expose it if we should... if connect_expose_dev { - p, err = setup(destWaitingLocal, false) + p, err := setup(destWaitingLocal, false) if err != nil { fmt.Printf("Error during setup (expose nbd) %v\n", err) } + exposed = append(exposed, p) } fmt.Printf("Returning destWaitingRemote...\n") return destWaitingRemote } - pro := protocol.NewProtocolRW(context.TODO(), con, con, func(dev uint32) { - fmt.Printf("NEW DEVICE %d\n", dev) - }) - // TODO: Need to allow for DevInfo on different IDs better here... - dest = modules.NewFromProtocol(777, destWaitingRemoteFactory, pro) - - go func() { - err := pro.Handle() - fmt.Printf("PROTOCOL ERROR %v\n", err) - // If it's EOF then the migration is completed - }() + dest = modules.NewFromProtocol(dev, destWaitingRemoteFactory, pro) go dest.HandleSend(context.TODO()) go dest.HandleReadAt() go dest.HandleWriteAt() go dest.HandleDevInfo() go dest.HandleEvent(func(e protocol.EventType) { - fmt.Printf("= Event %s\n", protocol.EventsByType[e]) + fmt.Printf("= %d = Event %s\n", dev, protocol.EventsByType[e]) }) go dest.HandleDirtyList(func(dirty []uint) { - fmt.Printf("GOT LIST OF DIRTY BLOCKS %v\n", dirty) + fmt.Printf("= %d = LIST OF DIRTY BLOCKS %v\n", dev, dirty) destWaitingLocal.DirtyBlocks(dirty) }) - - c := make(chan os.Signal) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - go func() { - <-c - - if connect_expose_dev { - fmt.Printf("\nShutting down cleanly...\n") - shutdown(p) - } - destStorageMetrics.ShowStats("Source") - os.Exit(1) - }() - - ticker := time.NewTicker(time.Second) - - for { - select { - case <-ticker.C: - // Show some stats... - if destStorageMetrics != nil { - destStorageMetrics.ShowStats("Dest") - - s := destStorageMetrics.Snapshot() - prom_read_ops.Set(float64(s.Read_ops)) - prom_read_bytes.Set(float64(s.Read_bytes)) - prom_read_time.Set(float64(s.Read_time)) - prom_read_errors.Set(float64(s.Read_errors)) - - prom_write_ops.Set(float64(s.Write_ops)) - prom_write_bytes.Set(float64(s.Write_bytes)) - prom_write_time.Set(float64(s.Write_time)) - prom_write_errors.Set(float64(s.Write_errors)) - - prom_flush_ops.Set(float64(s.Flush_ops)) - prom_flush_time.Set(float64(s.Flush_time)) - prom_flush_errors.Set(float64(s.Flush_errors)) - } - } - } } diff --git a/pkg/storage/protocol/protocol_rw.go b/pkg/storage/protocol/protocol_rw.go index 10438113..2ec0dfc1 100644 --- a/pkg/storage/protocol/protocol_rw.go +++ b/pkg/storage/protocol/protocol_rw.go @@ -27,10 +27,10 @@ type ProtocolRW struct { active_devs map[uint32]bool waiters map[uint32]Waiters waiters_lock sync.Mutex - newdev_fn func(uint32) + newdev_fn func(Protocol, uint32) } -func NewProtocolRW(ctx context.Context, r io.Reader, w io.Writer, newdev_fn func(uint32)) *ProtocolRW { +func NewProtocolRW(ctx context.Context, r io.Reader, w io.Writer, newdev_fn func(Protocol, uint32)) *ProtocolRW { return &ProtocolRW{ ctx: ctx, r: r, @@ -98,7 +98,7 @@ func (p *ProtocolRW) Handle() error { if !ok { p.active_devs[dev] = true if p.newdev_fn != nil { - p.newdev_fn(dev) + p.newdev_fn(p, dev) } } diff --git a/testing/protocol_test.go b/testing/protocol_test.go index 42c6cd35..728969ab 100644 --- a/testing/protocol_test.go +++ b/testing/protocol_test.go @@ -117,7 +117,7 @@ func TestProtocolRWWriteAt(t *testing.T) { destDev := make(chan uint32, 8) prSource := protocol.NewProtocolRW(context.TODO(), r1, w2, nil) - prDest := protocol.NewProtocolRW(context.TODO(), r2, w1, func(dev uint32) { + prDest := protocol.NewProtocolRW(context.TODO(), r2, w1, func(p protocol.Protocol, dev uint32) { destDev <- dev })