Skip to content

Commit

Permalink
Tidied up 'connect', can now receive multiple devs
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmyaxod committed Feb 29, 2024
1 parent 0d8967a commit 531e444
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 73 deletions.
107 changes: 38 additions & 69 deletions cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@ import (
"context"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/loopholelabs/silo/pkg/storage"
"github.com/loopholelabs/silo/pkg/storage/modules"
"github.com/loopholelabs/silo/pkg/storage/protocol"
"github.com/loopholelabs/silo/pkg/storage/sources"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
)

Expand All @@ -30,45 +27,66 @@ var (
var connect_addr string
var connect_expose_dev bool

var exposed []storage.ExposedStorage

func init() {
rootCmd.AddCommand(cmdConnect)
cmdConnect.Flags().StringVarP(&connect_addr, "addr", "a", "localhost:5170", "Address to serve from")
cmdConnect.Flags().BoolVarP(&connect_expose_dev, "expose", "e", false, "Expose as an nbd devices")
}

func runConnect(ccmd *cobra.Command, args []string) {
fmt.Printf("Starting silo connect from %s\n", connect_addr)

// Setup some statistics output
http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":4114", nil)
fmt.Printf("Starting silo connect from source %s\n", connect_addr)

// Size of migration blocks
block_size := 1024 * 64
exposed = make([]storage.ExposedStorage, 0)

// Connect to the source
con, err := net.Dial("tcp", connect_addr)
if err != nil {
panic("Error connecting")
}

var p storage.ExposedStorage
// Wrap the connection in a protocol, and handle incoming devices
pro := protocol.NewProtocolRW(context.TODO(), con, con, handleIncomingDevice)

c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c

for _, e := range exposed {
fmt.Printf("\nShutting down cleanly...\n")
shutdown(e)
}
os.Exit(1)
}()

// Let the protocol do its thing.
err = pro.Handle()
if err != nil {
fmt.Printf("Silo protocol error %v\n", err)
}
}

// Handle a new incoming device
func handleIncomingDevice(pro protocol.Protocol, dev uint32) {
// Size of migration blocks
// TODO: Configurable
block_size := 1024 * 64

var destStorage storage.StorageProvider
var destWaitingLocal *modules.WaitingCacheLocal
var destWaitingRemote *modules.WaitingCacheRemote
var destStorageMetrics *modules.Metrics
var dest *modules.FromProtocol

destWaitingRemoteFactory := func(di *protocol.DevInfo) storage.StorageProvider {
fmt.Printf("Received DevInfo size=%d\n", di.Size)
fmt.Printf("Received DevInfo name=%s size=%d\n", di.Name, di.Size)
cr := func(s int) storage.StorageProvider {
return sources.NewMemoryStorage(s)
}
// Setup some sharded memory storage (for concurrent write speed)
destStorage = modules.NewShardedStorage(int(di.Size), int(di.Size/1024), cr)
// Wrap it in metrics
destWaitingLocal, destWaitingRemote = modules.NewWaitingCache(destStorage, block_size)
destStorageMetrics = modules.NewMetrics(destWaitingLocal)

// Connect the waitingCache to the FromProtocol
destWaitingLocal.NeedAt = func(offset int64, length int32) {
Expand All @@ -81,79 +99,30 @@ func runConnect(ccmd *cobra.Command, args []string) {

// Expose it if we should...
if connect_expose_dev {
p, err = setup(destWaitingLocal, false)
p, err := setup(destWaitingLocal, false)
if err != nil {
fmt.Printf("Error during setup (expose nbd) %v\n", err)
}
exposed = append(exposed, p)
}

fmt.Printf("Returning destWaitingRemote...\n")
return destWaitingRemote
}

pro := protocol.NewProtocolRW(context.TODO(), con, con, func(dev uint32) {
fmt.Printf("NEW DEVICE %d\n", dev)
})

// TODO: Need to allow for DevInfo on different IDs better here...
dest = modules.NewFromProtocol(777, destWaitingRemoteFactory, pro)

go func() {
err := pro.Handle()
fmt.Printf("PROTOCOL ERROR %v\n", err)
// If it's EOF then the migration is completed
}()
dest = modules.NewFromProtocol(dev, destWaitingRemoteFactory, pro)

go dest.HandleSend(context.TODO())
go dest.HandleReadAt()
go dest.HandleWriteAt()
go dest.HandleDevInfo()
go dest.HandleEvent(func(e protocol.EventType) {
fmt.Printf("= Event %s\n", protocol.EventsByType[e])
fmt.Printf("= %d = Event %s\n", dev, protocol.EventsByType[e])
})

go dest.HandleDirtyList(func(dirty []uint) {
fmt.Printf("GOT LIST OF DIRTY BLOCKS %v\n", dirty)
fmt.Printf("= %d = LIST OF DIRTY BLOCKS %v\n", dev, dirty)
destWaitingLocal.DirtyBlocks(dirty)
})

c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c

if connect_expose_dev {
fmt.Printf("\nShutting down cleanly...\n")
shutdown(p)
}
destStorageMetrics.ShowStats("Source")
os.Exit(1)
}()

ticker := time.NewTicker(time.Second)

for {
select {
case <-ticker.C:
// Show some stats...
if destStorageMetrics != nil {
destStorageMetrics.ShowStats("Dest")

s := destStorageMetrics.Snapshot()
prom_read_ops.Set(float64(s.Read_ops))
prom_read_bytes.Set(float64(s.Read_bytes))
prom_read_time.Set(float64(s.Read_time))
prom_read_errors.Set(float64(s.Read_errors))

prom_write_ops.Set(float64(s.Write_ops))
prom_write_bytes.Set(float64(s.Write_bytes))
prom_write_time.Set(float64(s.Write_time))
prom_write_errors.Set(float64(s.Write_errors))

prom_flush_ops.Set(float64(s.Flush_ops))
prom_flush_time.Set(float64(s.Flush_time))
prom_flush_errors.Set(float64(s.Flush_errors))
}
}
}
}
6 changes: 3 additions & 3 deletions pkg/storage/protocol/protocol_rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ type ProtocolRW struct {
active_devs map[uint32]bool
waiters map[uint32]Waiters
waiters_lock sync.Mutex
newdev_fn func(uint32)
newdev_fn func(Protocol, uint32)
}

func NewProtocolRW(ctx context.Context, r io.Reader, w io.Writer, newdev_fn func(uint32)) *ProtocolRW {
func NewProtocolRW(ctx context.Context, r io.Reader, w io.Writer, newdev_fn func(Protocol, uint32)) *ProtocolRW {
return &ProtocolRW{
ctx: ctx,
r: r,
Expand Down Expand Up @@ -98,7 +98,7 @@ func (p *ProtocolRW) Handle() error {
if !ok {
p.active_devs[dev] = true
if p.newdev_fn != nil {
p.newdev_fn(dev)
p.newdev_fn(p, dev)
}
}

Expand Down
2 changes: 1 addition & 1 deletion testing/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestProtocolRWWriteAt(t *testing.T) {
destDev := make(chan uint32, 8)

prSource := protocol.NewProtocolRW(context.TODO(), r1, w2, nil)
prDest := protocol.NewProtocolRW(context.TODO(), r2, w1, func(dev uint32) {
prDest := protocol.NewProtocolRW(context.TODO(), r2, w1, func(p protocol.Protocol, dev uint32) {
destDev <- dev
})

Expand Down

0 comments on commit 531e444

Please sign in to comment.