From 6df84258b31ceb7ef3bb6ddeef2b2cc949200d42 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Fri, 9 Feb 2024 16:58:41 +0000 Subject: [PATCH] nbd reorg/benchmarking start --- cmd/connect.go | 4 +- cmd/serve.go | 7 +- internal/expose/nbd.go | 258 ++++++++++++- internal/expose/nbd_dispatch.go | 249 ------------- internal/expose/nbd_dispatch_simple.go | 106 ------ internal/expose/nbd_test.go | 306 +++++++-------- internal/expose/ndb_dev_test.go | 296 +++++++-------- main.go | 7 +- pkg/storage/expose/nbd.go | 497 +++++++++++++++++++++++++ pkg/storage/expose/nbd_dev_test.go | 194 ++++++++++ pkg/storage/expose/nbd_test.go | 44 +++ 11 files changed, 1261 insertions(+), 707 deletions(-) delete mode 100644 internal/expose/nbd_dispatch.go delete mode 100644 internal/expose/nbd_dispatch_simple.go create mode 100644 pkg/storage/expose/nbd.go create mode 100644 pkg/storage/expose/nbd_dev_test.go create mode 100644 pkg/storage/expose/nbd_test.go diff --git a/cmd/connect.go b/cmd/connect.go index 644273e9..df48b118 100644 --- a/cmd/connect.go +++ b/cmd/connect.go @@ -11,7 +11,6 @@ import ( "syscall" "time" - "github.com/loopholelabs/silo/internal/expose" "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/modules" "github.com/loopholelabs/silo/pkg/storage/protocol" @@ -122,8 +121,7 @@ func runConnect(ccmd *cobra.Command, args []string) { }() if connect_dev != "" { - d := expose.NewDispatch() - p, err = setup(connect_dev, d, destStorageMetrics, false) + p, err = setup(connect_dev, destStorageMetrics, false) if err != nil { fmt.Printf("Error during setup %v\n", err) return diff --git a/cmd/serve.go b/cmd/serve.go index 4e7366fd..ac4a5f88 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -109,8 +109,7 @@ func runServe(ccmd *cobra.Command, args []string) { if serve_dev != "" { var err error - d := expose.NewDispatch() - p, err = setup(serve_dev, d, sourceStorage, true) + p, err = setup(serve_dev, sourceStorage, true) if err != nil { fmt.Printf("Error during setup %v\n", err) return @@ -234,8 +233,8 @@ func runServe(ccmd *cobra.Command, args []string) { * Setup a disk * */ -func setup(device string, dispatch expose.NBDDispatcher, prov storage.StorageProvider, server bool) (storage.ExposedStorage, error) { - p, err := expose.NewNBD(dispatch, fmt.Sprintf("/dev/%s", device)) +func setup(device string, prov storage.StorageProvider, server bool) (storage.ExposedStorage, error) { + p, err := expose.NewNBD(fmt.Sprintf("/dev/%s", device)) if err != nil { return nil, err } diff --git a/internal/expose/nbd.go b/internal/expose/nbd.go index d8295880..6f1abeb9 100644 --- a/internal/expose/nbd.go +++ b/internal/expose/nbd.go @@ -1,8 +1,10 @@ package expose import ( + "encoding/binary" "fmt" "os" + "sync" "syscall" "github.com/loopholelabs/silo/pkg/storage" @@ -25,12 +27,14 @@ const NBD_DISCONNECT = 8 | NBD_COMMAND const NBD_SET_TIMEOUT = 9 | NBD_COMMAND const NBD_SET_FLAGS = 10 | NBD_COMMAND +// NBD Commands const NBD_CMD_READ = 0 const NBD_CMD_WRITE = 1 const NBD_CMD_DISCONNECT = 2 const NBD_CMD_FLUSH = 3 const NBD_CMD_TRIM = 4 +// NBD Flags const NBD_FLAG_HAS_FLAGS = (1 << 0) const NBD_FLAG_READ_ONLY = (1 << 1) const NBD_FLAG_SEND_FLUSH = (1 << 2) @@ -66,14 +70,7 @@ type NBDExposedStorage struct { fp uintptr socketPair [2]int readyChannel chan error - dispatcher NBDDispatcher -} - -// Want to swap in and out different versions here so we can benchmark -type NBDDispatcher interface { - Handle(fd int, prov storage.StorageProvider) error - Name() string - Wait() + dispatcher *Dispatch } /** @@ -81,7 +78,6 @@ type NBDDispatcher interface { * */ func (s *NBDExposedStorage) Handle(prov storage.StorageProvider) error { - // Handle incoming requests... go func() { err := s.dispatcher.Handle(s.socketPair[0], prov) if err != nil { @@ -102,7 +98,7 @@ func (s *NBDExposedStorage) Handle(prov storage.StorageProvider) error { // Issue ioctl calls to set it up calls := []IoctlCall{ - // {NBD_SET_BLKSIZE, 4096}, + {NBD_SET_BLKSIZE, 4096}, {NBD_SET_TIMEOUT, 0}, {NBD_PRINT_DEBUG, 1}, {NBD_SET_SIZE, uintptr(prov.Size())}, @@ -136,7 +132,7 @@ func (s *NBDExposedStorage) Handle(prov storage.StorageProvider) error { */ func (s *NBDExposedStorage) WaitReady() error { // fmt.Printf("WaitReady\n") - // Wait for a ready signal + // Wait for a ready signal (The ioctl DO_IT has just about been sent) return <-s.readyChannel } @@ -148,7 +144,6 @@ func (s *NBDExposedStorage) Shutdown() error { s.dispatcher.Wait() calls := []IoctlCall{ - {NBD_CLEAR_QUE, 0}, {NBD_DISCONNECT, 0}, {NBD_CLEAR_SOCK, 0}, } @@ -175,7 +170,7 @@ func (s *NBDExposedStorage) Shutdown() error { * Create a new NBD device * */ -func NewNBD(d NBDDispatcher, dev string) (storage.ExposedStorage, error) { +func NewNBD(dev string) (storage.ExposedStorage, error) { // Create a pair of sockets to communicate with the NBD device over // fmt.Printf("Create socketpair\n") sockPair, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0) @@ -196,8 +191,243 @@ func NewNBD(d NBDDispatcher, dev string) (storage.ExposedStorage, error) { device: fp, fp: fp.Fd(), readyChannel: make(chan error), - dispatcher: d, + dispatcher: NewDispatch(), } return es, nil } + +type Dispatch struct { + ASYNC_READS bool + ASYNC_WRITES bool + fp *os.File + responseHeader []byte + writeLock sync.Mutex + prov storage.StorageProvider + fatal chan error + pendingResponses sync.WaitGroup +} + +func NewDispatch() *Dispatch { + d := &Dispatch{ + ASYNC_WRITES: true, + ASYNC_READS: true, + responseHeader: make([]byte, 16), + fatal: make(chan error, 8), + } + binary.BigEndian.PutUint32(d.responseHeader, NBD_RESPONSE_MAGIC) + return d +} + +func (d *Dispatch) Wait() { + // Wait for any pending responses + d.pendingResponses.Wait() +} + +/** + * Write a response... + * + */ +func (d *Dispatch) writeResponse(respError uint32, respHandle uint64, chunk []byte) error { + d.writeLock.Lock() + defer d.writeLock.Unlock() + + // fmt.Printf("WriteResponse %x -> %d\n", respHandle, len(chunk)) + + binary.BigEndian.PutUint32(d.responseHeader[4:], respError) + binary.BigEndian.PutUint64(d.responseHeader[8:], respHandle) + + _, err := d.fp.Write(d.responseHeader) + if err != nil { + return err + } + if len(chunk) > 0 { + _, err = d.fp.Write(chunk) + if err != nil { + return err + } + } + return nil +} + +/** + * This dispatches incoming NBD requests sequentially to the provider. + * + */ +func (d *Dispatch) Handle(fd int, prov storage.StorageProvider) error { + d.prov = prov + d.fp = os.NewFile(uintptr(fd), "unix") + + // Speed read and dispatch... + + BUFFER_SIZE := 4 * 1024 * 1024 + buffer := make([]byte, BUFFER_SIZE) + wp := 0 + + request := Request{} + + for { + // fmt.Printf("Calling read...\n") + n, err := d.fp.Read(buffer[wp:]) + if err != nil { + fmt.Printf("Error %v\n", err) + return err + } + wp += n + + // fmt.Printf("Read %d\n", n) + + // Now go through processing complete packets + rp := 0 + for { + // fmt.Printf("Processing data %d %d\n", rp, wp) + // Make sure we have a complete header + if wp-rp >= 28 { + // We can read the neader... + + header := buffer[rp : rp+28] + request.Magic = binary.BigEndian.Uint32(header) + request.Type = binary.BigEndian.Uint32(header[4:8]) + request.Handle = binary.BigEndian.Uint64(header[8:16]) + request.From = binary.BigEndian.Uint64(header[16:24]) + request.Length = binary.BigEndian.Uint32(header[24:28]) + + if request.Magic != NBD_REQUEST_MAGIC { + return fmt.Errorf("Received invalid MAGIC") + } + + if request.Type == NBD_CMD_DISCONNECT { + // fmt.Printf("CMD_DISCONNECT") + return nil // All done + } else if request.Type == NBD_CMD_FLUSH { + return fmt.Errorf("Not supported: Flush") + } else if request.Type == NBD_CMD_READ { + // fmt.Printf("READ %x %d\n", request.Handle, request.Length) + rp += 28 + err := d.cmdRead(request.Handle, request.From, request.Length) + if err != nil { + return err + } + } else if request.Type == NBD_CMD_WRITE { + rp += 28 + if wp-rp < int(request.Length) { + rp -= 28 + break // We don't have enough data yet... Wait for next read + } + data := make([]byte, request.Length) + copy(data, buffer[rp:rp+int(request.Length)]) + rp += int(request.Length) + // fmt.Printf("WRITE %x %d\n", request.Handle, request.Length) + err := d.cmdWrite(request.Handle, request.From, request.Length, data) + if err != nil { + return err + } + } else if request.Type == NBD_CMD_TRIM { + // fmt.Printf("TRIM\n") + rp += 28 + err = d.cmdTrim(request.Handle, request.From, request.Length) + if err != nil { + return err + } + } else { + return fmt.Errorf("NBD Not implemented %d\n", request.Type) + } + + } else { + break // Try again when we have more data... + } + } + // Now we need to move any partial to the start + if rp != 0 && rp != wp { + // fmt.Printf("Copy partial %d %d\n", rp, wp) + + copy(buffer, buffer[rp:wp]) + } + wp -= rp + + } +} + +/** + * cmdRead + * + */ +func (d *Dispatch) cmdRead(cmd_handle uint64, cmd_from uint64, cmd_length uint32) error { + + performRead := func(handle uint64, from uint64, length uint32) error { + data := make([]byte, length) + _, e := d.prov.ReadAt(data, int64(from)) + errorValue := uint32(0) + if e != nil { + errorValue = 1 + data = make([]byte, 0) // If there was an error, don't send data + } + return d.writeResponse(errorValue, handle, data) + } + + if d.ASYNC_READS { + d.pendingResponses.Add(1) + go func() { + err := performRead(cmd_handle, cmd_from, cmd_length) + if err != nil { + d.fatal <- err + } + d.pendingResponses.Done() + }() + } else { + return performRead(cmd_handle, cmd_from, cmd_length) + } + return nil +} + +/** + * cmdWrite + * + */ +func (d *Dispatch) cmdWrite(cmd_handle uint64, cmd_from uint64, cmd_length uint32, cmd_data []byte) error { + performWrite := func(handle uint64, from uint64, length uint32, data []byte) error { + _, e := d.prov.WriteAt(data, int64(from)) + errorValue := uint32(0) + if e != nil { + errorValue = 1 + } + return d.writeResponse(errorValue, handle, []byte{}) + } + + if d.ASYNC_WRITES { + d.pendingResponses.Add(1) + go func() { + err := performWrite(cmd_handle, cmd_from, cmd_length, cmd_data) + if err != nil { + d.fatal <- err + } + d.pendingResponses.Done() + }() + } else { + return performWrite(cmd_handle, cmd_from, cmd_length, cmd_data) + } + return nil +} + +/** + * cmdTrim + * + */ +func (d *Dispatch) cmdTrim(handle uint64, from uint64, length uint32) error { + // Ask the provider + /* + e := d.prov.Trim(from, length) + if e != storage.StorageError_SUCCESS { + err := d.writeResponse(1, handle, []byte{}) + if err != nil { + return err + } + } else { + */ + err := d.writeResponse(0, handle, []byte{}) + if err != nil { + return err + } + // } + return nil +} diff --git a/internal/expose/nbd_dispatch.go b/internal/expose/nbd_dispatch.go deleted file mode 100644 index 709ab567..00000000 --- a/internal/expose/nbd_dispatch.go +++ /dev/null @@ -1,249 +0,0 @@ -package expose - -import ( - "encoding/binary" - "fmt" - "os" - "sync" - - "github.com/loopholelabs/silo/pkg/storage" -) - -type Dispatch struct { - ASYNC_READS bool - ASYNC_WRITES bool - fp *os.File - responseHeader []byte - writeLock sync.Mutex - prov storage.StorageProvider - fatal chan error - pendingResponses sync.WaitGroup -} - -func NewDispatch() *Dispatch { - d := &Dispatch{ - ASYNC_WRITES: true, - ASYNC_READS: true, - responseHeader: make([]byte, 16), - fatal: make(chan error, 8), - } - binary.BigEndian.PutUint32(d.responseHeader, NBD_RESPONSE_MAGIC) - return d -} - -func (d *Dispatch) Wait() { - // Wait for any pending responses - d.pendingResponses.Wait() -} - -/** - * Write a response... - * - */ -func (d *Dispatch) writeResponse(respError uint32, respHandle uint64, chunk []byte) error { - d.writeLock.Lock() - defer d.writeLock.Unlock() - - // fmt.Printf("WriteResponse %x -> %d\n", respHandle, len(chunk)) - - binary.BigEndian.PutUint32(d.responseHeader[4:], respError) - binary.BigEndian.PutUint64(d.responseHeader[8:], respHandle) - - _, err := d.fp.Write(d.responseHeader) - if err != nil { - return err - } - if len(chunk) > 0 { - _, err = d.fp.Write(chunk) - if err != nil { - return err - } - } - return nil -} - -func (d *Dispatch) Name() string { - return "Dispatch" -} - -/** - * This dispatches incoming NBD requests sequentially to the provider. - * - */ -func (d *Dispatch) Handle(fd int, prov storage.StorageProvider) error { - d.prov = prov - d.fp = os.NewFile(uintptr(fd), "unix") - - // Speed read and dispatch... - - BUFFER_SIZE := 4 * 1024 * 1024 - buffer := make([]byte, BUFFER_SIZE) - wp := 0 - - request := Request{} - - for { - // fmt.Printf("Calling read...\n") - n, err := d.fp.Read(buffer[wp:]) - if err != nil { - fmt.Printf("Error %v\n", err) - return err - } - wp += n - - // fmt.Printf("Read %d\n", n) - - // Now go through processing complete packets - rp := 0 - for { - // fmt.Printf("Processing data %d %d\n", rp, wp) - // Make sure we have a complete header - if wp-rp >= 28 { - // We can read the neader... - - header := buffer[rp : rp+28] - request.Magic = binary.BigEndian.Uint32(header) - request.Type = binary.BigEndian.Uint32(header[4:8]) - request.Handle = binary.BigEndian.Uint64(header[8:16]) - request.From = binary.BigEndian.Uint64(header[16:24]) - request.Length = binary.BigEndian.Uint32(header[24:28]) - - if request.Magic != NBD_REQUEST_MAGIC { - return fmt.Errorf("Received invalid MAGIC") - } - - if request.Type == NBD_CMD_DISCONNECT { - // fmt.Printf("CMD_DISCONNECT") - return nil // All done - } else if request.Type == NBD_CMD_FLUSH { - return fmt.Errorf("Not supported: Flush") - } else if request.Type == NBD_CMD_READ { - // fmt.Printf("READ %x %d\n", request.Handle, request.Length) - rp += 28 - err := d.cmdRead(request.Handle, request.From, request.Length) - if err != nil { - return err - } - } else if request.Type == NBD_CMD_WRITE { - rp += 28 - if wp-rp < int(request.Length) { - rp -= 28 - break // We don't have enough data yet... Wait for next read - } - data := make([]byte, request.Length) - copy(data, buffer[rp:rp+int(request.Length)]) - rp += int(request.Length) - // fmt.Printf("WRITE %x %d\n", request.Handle, request.Length) - err := d.cmdWrite(request.Handle, request.From, request.Length, data) - if err != nil { - return err - } - } else if request.Type == NBD_CMD_TRIM { - // fmt.Printf("TRIM\n") - rp += 28 - err = d.cmdTrim(request.Handle, request.From, request.Length) - if err != nil { - return err - } - } else { - return fmt.Errorf("NBD Not implemented %d\n", request.Type) - } - - } else { - break // Try again when we have more data... - } - } - // Now we need to move any partial to the start - if rp != 0 && rp != wp { - // fmt.Printf("Copy partial %d %d\n", rp, wp) - - copy(buffer, buffer[rp:wp]) - } - wp -= rp - - } -} - -/** - * cmdRead - * - */ -func (d *Dispatch) cmdRead(cmd_handle uint64, cmd_from uint64, cmd_length uint32) error { - - performRead := func(handle uint64, from uint64, length uint32) error { - data := make([]byte, length) - _, e := d.prov.ReadAt(data, int64(from)) - errorValue := uint32(0) - if e != nil { - errorValue = 1 - data = make([]byte, 0) // If there was an error, don't send data - } - return d.writeResponse(errorValue, handle, data) - } - - if d.ASYNC_READS { - d.pendingResponses.Add(1) - go func() { - err := performRead(cmd_handle, cmd_from, cmd_length) - if err != nil { - d.fatal <- err - } - d.pendingResponses.Done() - }() - } else { - return performRead(cmd_handle, cmd_from, cmd_length) - } - return nil -} - -/** - * cmdWrite - * - */ -func (d *Dispatch) cmdWrite(cmd_handle uint64, cmd_from uint64, cmd_length uint32, cmd_data []byte) error { - performWrite := func(handle uint64, from uint64, length uint32, data []byte) error { - _, e := d.prov.WriteAt(data, int64(from)) - errorValue := uint32(0) - if e != nil { - errorValue = 1 - } - return d.writeResponse(errorValue, handle, []byte{}) - } - - if d.ASYNC_WRITES { - d.pendingResponses.Add(1) - go func() { - err := performWrite(cmd_handle, cmd_from, cmd_length, cmd_data) - if err != nil { - d.fatal <- err - } - d.pendingResponses.Done() - }() - } else { - return performWrite(cmd_handle, cmd_from, cmd_length, cmd_data) - } - return nil -} - -/** - * cmdTrim - * - */ -func (d *Dispatch) cmdTrim(handle uint64, from uint64, length uint32) error { - // Ask the provider - /* - e := d.prov.Trim(from, length) - if e != storage.StorageError_SUCCESS { - err := d.writeResponse(1, handle, []byte{}) - if err != nil { - return err - } - } else { - */ - err := d.writeResponse(0, handle, []byte{}) - if err != nil { - return err - } - // } - return nil -} diff --git a/internal/expose/nbd_dispatch_simple.go b/internal/expose/nbd_dispatch_simple.go deleted file mode 100644 index b71e654e..00000000 --- a/internal/expose/nbd_dispatch_simple.go +++ /dev/null @@ -1,106 +0,0 @@ -package expose - -import ( - "encoding/binary" - "fmt" - "io" - "os" - - "github.com/loopholelabs/silo/pkg/storage" -) - -type DispatchSimple struct{} - -func NewDispatchSimple() *DispatchSimple { - return &DispatchSimple{} -} - -func (d *DispatchSimple) Name() string { - return "SimpleDispatch" -} - -func (d *DispatchSimple) Wait() {} - -/** - * This dispatches incoming NBD requests sequentially to the provider. - * - */ -func (d *DispatchSimple) Handle(fd int, prov storage.StorageProvider) error { - fp := os.NewFile(uintptr(fd), "unix") - - request := Request{} - response := Response{} - buf := make([]byte, 28) - for { - _, err := fp.Read(buf) - if err != nil { - return err - } - - request.Magic = binary.BigEndian.Uint32(buf) - request.Type = binary.BigEndian.Uint32(buf[4:8]) - request.Handle = binary.BigEndian.Uint64(buf[8:16]) - request.From = binary.BigEndian.Uint64(buf[16:24]) - request.Length = binary.BigEndian.Uint32(buf[24:28]) - - chunk := make([]byte, 0) - response.Handle = request.Handle - response.Error = 0 - - if request.Magic != NBD_REQUEST_MAGIC { - return fmt.Errorf("Received invalid MAGIC") - } - if request.Type == NBD_CMD_DISCONNECT { - // fmt.Printf("CMD_DISCONNECT") - return nil // All done - } else if request.Type == NBD_CMD_FLUSH { - return fmt.Errorf("Not supported: Flush") - } else if request.Type == NBD_CMD_READ { - chunk = make([]byte, request.Length) - - // Ask the provider for some data - _, e := prov.ReadAt(chunk, int64(request.From)) - if e != nil { - response.Error = 1 // FIXME - } - } else if request.Type == NBD_CMD_TRIM { - // Ask the provider - // e := prov.Trim(request.From, request.Length) - // if e != storage.StorageError_SUCCESS { - // response.Error = 1 // FIXME - // } - } else if request.Type == NBD_CMD_WRITE { - data := make([]byte, request.Length) - - _, err = io.ReadFull(fp, data) - if err != nil { - return err - } - - // Ask the provider - _, e := prov.WriteAt(data, int64(request.From)) - if e != nil { - response.Error = 1 // FIXME - } - } else { - return fmt.Errorf("NBD Not implemented %d\n", request.Type) - } - - // Write a response... - buff := make([]byte, 16) - binary.BigEndian.PutUint32(buff, NBD_RESPONSE_MAGIC) - binary.BigEndian.PutUint32(buff[4:], response.Error) - binary.BigEndian.PutUint64(buff[8:], response.Handle) - - _, err = fp.Write(buff) - if err != nil { - return err - } - if len(chunk) > 0 { - _, err = fp.Write(chunk) - if err != nil { - return err - } - } - } -} diff --git a/internal/expose/nbd_test.go b/internal/expose/nbd_test.go index 4cfb18ca..d8657768 100644 --- a/internal/expose/nbd_test.go +++ b/internal/expose/nbd_test.go @@ -21,7 +21,7 @@ const NBDdevice = "nbd2" * Setup a disk with some files created. * */ -func setup(dispatch NBDDispatcher, prov storage.StorageProvider, fileMin uint64, fileMax uint64, maxSize uint64) (storage.ExposedStorage, []string, error) { +func setup(prov storage.StorageProvider, fileMin uint64, fileMax uint64, maxSize uint64) (storage.ExposedStorage, []string, error) { defer func() { // fmt.Printf("umount\n") cmd := exec.Command("umount", fmt.Sprintf("/dev/%s", NBDdevice)) @@ -36,7 +36,7 @@ func setup(dispatch NBDDispatcher, prov storage.StorageProvider, fileMin uint64, } }() - p, err := NewNBD(dispatch, fmt.Sprintf("/dev/%s", NBDdevice)) + p, err := NewNBD(fmt.Sprintf("/dev/%s", NBDdevice)) if err != nil { return nil, nil, err } @@ -138,103 +138,91 @@ func BenchmarkRead(mb *testing.B) { {"SmallFiles", 100 * 1024 * 1024, 8 * 1024, 8 * 1024}, } - dispatchers := []string{ - "simple", "dispatch", - } - - for _, dt := range dispatchers { - for _, c := range tests { - - mb.Run(fmt.Sprintf("%s_%s", dt, c.Name), func(b *testing.B) { - - // Setup... - driver := modules.NewMetrics(sources.NewMemoryStorage(int(c.Size))) - - var d NBDDispatcher - if dt == "simple" { - d = NewDispatchSimple() - } else if dt == "dispatch" { - d = NewDispatch() - } - p, files, err := setup(d, driver, c.FileMin, c.FileMax, uint64(float64(c.Size)*0.8)) - if err != nil { - fmt.Printf("Error setup %v\n", err) - return - } - - // Now mount the disk... - err = os.Mkdir(fmt.Sprintf("/mnt/bench%s", NBDdevice), 0600) - if err != nil { - panic(err) - } - - // fmt.Printf("mount\n") - cmd := exec.Command("mount", fmt.Sprintf("/dev/%s", NBDdevice), fmt.Sprintf("/mnt/bench%s", NBDdevice)) - err = cmd.Run() - if err != nil { - panic(err) - } - - /** - * Cleanup everything - * - */ - b.Cleanup(func() { - shutdown(p) - driver.ShowStats("stats") - }) + for _, c := range tests { + + mb.Run(c.Name, func(b *testing.B) { + + // Setup... + driver := modules.NewMetrics(sources.NewMemoryStorage(int(c.Size))) + + p, files, err := setup(driver, c.FileMin, c.FileMax, uint64(float64(c.Size)*0.8)) + if err != nil { + fmt.Printf("Error setup %v\n", err) + return + } + + // Now mount the disk... + err = os.Mkdir(fmt.Sprintf("/mnt/bench%s", NBDdevice), 0600) + if err != nil { + panic(err) + } + + // fmt.Printf("mount\n") + cmd := exec.Command("mount", fmt.Sprintf("/dev/%s", NBDdevice), fmt.Sprintf("/mnt/bench%s", NBDdevice)) + err = cmd.Run() + if err != nil { + panic(err) + } + + /** + * Cleanup everything + * + */ + b.Cleanup(func() { + shutdown(p) + driver.ShowStats("stats") + }) - driver.ResetMetrics() // Only start counting from now... + driver.ResetMetrics() // Only start counting from now... - // Here's the actual benchmark... + // Here's the actual benchmark... - var wg sync.WaitGroup - concurrent := make(chan bool, 32) + var wg sync.WaitGroup + concurrent := make(chan bool, 32) - // Now do some timing... - b.ResetTimer() + // Now do some timing... + b.ResetTimer() - totalData := int64(0) + totalData := int64(0) - for i := 0; i < b.N; i++ { - fileno := rand.Intn(len(files)) - name := files[fileno] + for i := 0; i < b.N; i++ { + fileno := rand.Intn(len(files)) + name := files[fileno] - fi, err := os.Stat(name) - if err != nil { - fmt.Printf("Error statting file %v\n", err) - } else { + fi, err := os.Stat(name) + if err != nil { + fmt.Printf("Error statting file %v\n", err) + } else { - wg.Add(1) - concurrent <- true - totalData += fi.Size() + wg.Add(1) + concurrent <- true + totalData += fi.Size() - // Test read speed... - go func(filename string) { + // Test read speed... + go func(filename string) { - data, err := os.ReadFile(filename) - if err != nil { - fmt.Printf("Error reading file %v\n", err) - } + data, err := os.ReadFile(filename) + if err != nil { + fmt.Printf("Error reading file %v\n", err) + } - if len(data) == 0 { - fmt.Printf("Warning: The file %s was not there\n", filename) - } + if len(data) == 0 { + fmt.Printf("Warning: The file %s was not there\n", filename) + } - wg.Done() - <-concurrent - }(name) - } + wg.Done() + <-concurrent + }(name) } + } - b.SetBytes(totalData) + b.SetBytes(totalData) - wg.Wait() + wg.Wait() - fmt.Printf("Total Data %d\n", totalData) + fmt.Printf("Total Data %d\n", totalData) - }) - } + }) } } @@ -245,116 +233,104 @@ func BenchmarkWrite(mb *testing.B) { {"SmallFiles", 100 * 1024 * 1024, 8 * 1024, 8 * 1024}, } - dispatchers := []string{ - "simple", "dispatch", - } + for _, c := range tests { - for _, dt := range dispatchers { - for _, c := range tests { + mb.Run(c.Name, func(b *testing.B) { - mb.Run(fmt.Sprintf("%s_%s", dt, c.Name), func(b *testing.B) { + // Setup... + driver := modules.NewMetrics(sources.NewMemoryStorage(int(c.Size))) - // Setup... - driver := modules.NewMetrics(sources.NewMemoryStorage(int(c.Size))) + p, files, err := setup(driver, c.FileMin, c.FileMax, uint64(float64(c.Size)*0.8)) - var d NBDDispatcher - if dt == "simple" { - d = NewDispatchSimple() - } else if dt == "dispatch" { - d = NewDispatch() - } - p, files, err := setup(d, driver, c.FileMin, c.FileMax, uint64(float64(c.Size)*0.8)) + if err != nil { + fmt.Printf("Error setup %v\n", err) + return + } - if err != nil { - fmt.Printf("Error setup %v\n", err) - return - } + // Now mount the disk... + err = os.Mkdir(fmt.Sprintf("/mnt/bench%s", NBDdevice), 0600) + if err != nil { + panic(err) + } - // Now mount the disk... - err = os.Mkdir(fmt.Sprintf("/mnt/bench%s", NBDdevice), 0600) - if err != nil { - panic(err) - } + cmd := exec.Command("mount", fmt.Sprintf("/dev/%s", NBDdevice), fmt.Sprintf("/mnt/bench%s", NBDdevice)) + err = cmd.Run() + if err != nil { + panic(err) + } - cmd := exec.Command("mount", fmt.Sprintf("/dev/%s", NBDdevice), fmt.Sprintf("/mnt/bench%s", NBDdevice)) - err = cmd.Run() - if err != nil { - panic(err) - } - - /** - * Cleanup everything - * - */ - b.Cleanup(func() { - shutdown(p) - driver.ShowStats("stats") - }) + /** + * Cleanup everything + * + */ + b.Cleanup(func() { + shutdown(p) + driver.ShowStats("stats") + }) - driver.ResetMetrics() + driver.ResetMetrics() - // Here's the actual benchmark... + // Here's the actual benchmark... - var wg sync.WaitGroup - concurrent := make(chan bool, 32) + var wg sync.WaitGroup + concurrent := make(chan bool, 32) - // Now do some timing... - b.ResetTimer() + // Now do some timing... + b.ResetTimer() - totalData := int64(0) + totalData := int64(0) - for i := 0; i < b.N; i++ { - fileno := rand.Intn(len(files)) - name := files[fileno] + for i := 0; i < b.N; i++ { + fileno := rand.Intn(len(files)) + name := files[fileno] - fi, err := os.Stat(name) - if err != nil { - fmt.Printf("Error statting file %v\n", err) - } else { + fi, err := os.Stat(name) + if err != nil { + fmt.Printf("Error statting file %v\n", err) + } else { - totalData += fi.Size() + totalData += fi.Size() - newData := make([]byte, fi.Size()) - crand.Read(newData) + newData := make([]byte, fi.Size()) + crand.Read(newData) - wg.Add(1) - concurrent <- true + wg.Add(1) + concurrent <- true - // Test write speed... FIXME: Concurrent access to same file - go func(filename string, data []byte) { + // Test write speed... FIXME: Concurrent access to same file + go func(filename string, data []byte) { - af, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + af, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + fmt.Printf("Error opening file %v\n", err) + } else { + _, err = af.Write(data) if err != nil { - fmt.Printf("Error opening file %v\n", err) - } else { - _, err = af.Write(data) - if err != nil { - fmt.Printf("Error writing file %v\n", err) - time.Sleep(1 * time.Minute) - } - err = af.Close() - if err != nil { - fmt.Printf("Error closing file %v\n", err) - } + fmt.Printf("Error writing file %v\n", err) + time.Sleep(1 * time.Minute) } - - //err := os.WriteFile(filename, data, 0600) + err = af.Close() if err != nil { - fmt.Printf("Error writing file %v\n", err) + fmt.Printf("Error closing file %v\n", err) } + } + + //err := os.WriteFile(filename, data, 0600) + if err != nil { + fmt.Printf("Error writing file %v\n", err) + } - wg.Done() - <-concurrent - }(name, newData) - } + wg.Done() + <-concurrent + }(name, newData) } + } - b.SetBytes(totalData) + b.SetBytes(totalData) - wg.Wait() + wg.Wait() - fmt.Printf("Total Data %d\n", totalData) - }) - } + fmt.Printf("Total Data %d\n", totalData) + }) } } diff --git a/internal/expose/ndb_dev_test.go b/internal/expose/ndb_dev_test.go index a67e5d37..431705b8 100644 --- a/internal/expose/ndb_dev_test.go +++ b/internal/expose/ndb_dev_test.go @@ -15,213 +15,185 @@ func BenchmarkDevRead(mb *testing.B) { NBDdevice := "nbd1" diskSize := 1024 * 1024 * 1024 * 4 - dispatchers := []string{ - "simple", "dispatch", - } + mb.Run("devRead", func(b *testing.B) { - for _, dt := range dispatchers { + // Setup... + // Lets simulate a little latency + store := sources.NewMemoryStorage(int(diskSize)) + store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0) + driver := modules.NewMetrics(store_latency) - mb.Run(fmt.Sprintf("%s_devRead", dt), func(b *testing.B) { + p, err := NewNBD(fmt.Sprintf("/dev/%s", NBDdevice)) + if err != nil { + panic(err) + } - // Setup... - // Lets simulate a little latency - store := sources.NewMemoryStorage(int(diskSize)) - store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0) - driver := modules.NewMetrics(store_latency) - - var d NBDDispatcher - if dt == "simple" { - d = NewDispatchSimple() - } else if dt == "dispatch" { - d = NewDispatch() - } - - p, err := NewNBD(d, fmt.Sprintf("/dev/%s", NBDdevice)) + go func() { + err := p.Handle(driver) if err != nil { - panic(err) + fmt.Printf("p.Handle returned %v\n", err) } + }() - go func() { - err := p.Handle(driver) - if err != nil { - fmt.Printf("p.Handle returned %v\n", err) - } - }() + p.WaitReady() - p.WaitReady() + /** + * Cleanup everything + * + */ + b.Cleanup(func() { + err = p.Shutdown() + if err != nil { + fmt.Printf("Error cleaning up %v\n", err) + } + driver.ShowStats("stats") + }) - /** - * Cleanup everything - * - */ - b.Cleanup(func() { - err = p.Shutdown() - if err != nil { - fmt.Printf("Error cleaning up %v\n", err) - } - driver.ShowStats("stats") - }) + driver.ResetMetrics() // Only start counting from now... - driver.ResetMetrics() // Only start counting from now... + // Here's the actual benchmark... - // Here's the actual benchmark... + devfile, err := os.OpenFile(fmt.Sprintf("/dev/%s", NBDdevice), os.O_RDWR, 0666) + if err != nil { + panic("Error opening dev file\n") + } - devfile, err := os.OpenFile(fmt.Sprintf("/dev/%s", NBDdevice), os.O_RDWR, 0666) - if err != nil { - panic("Error opening dev file\n") - } + b.Cleanup(func() { + devfile.Close() + }) - b.Cleanup(func() { - devfile.Close() - }) + var wg sync.WaitGroup + concurrent := make(chan bool, 100) // Do max 100 reads concurrently - var wg sync.WaitGroup - concurrent := make(chan bool, 100) // Do max 100 reads concurrently + // Now do some timing... + b.ResetTimer() - // Now do some timing... - b.ResetTimer() + offset := int64(0) + totalData := int64(0) - offset := int64(0) - totalData := int64(0) + for i := 0; i < b.N; i++ { + wg.Add(1) + concurrent <- true + length := int64(4096) + offset += 4096 + if offset+length >= int64(diskSize) { + offset = 0 + } + totalData += length - for i := 0; i < b.N; i++ { - wg.Add(1) - concurrent <- true - length := int64(4096) - offset += 4096 - if offset+length >= int64(diskSize) { - offset = 0 + // Test read speed... + go func(f_offset int64, f_length int64) { + buffer := make([]byte, f_length) + _, err := devfile.ReadAt(buffer, f_offset) + if err != nil { + fmt.Printf("Error reading file %v\n", err) } - totalData += length - - // Test read speed... - go func(f_offset int64, f_length int64) { - buffer := make([]byte, f_length) - _, err := devfile.ReadAt(buffer, f_offset) - if err != nil { - fmt.Printf("Error reading file %v\n", err) - } - - wg.Done() - <-concurrent - }(offset, length) - } - b.SetBytes(totalData) + wg.Done() + <-concurrent + }(offset, length) + } - wg.Wait() + b.SetBytes(totalData) - fmt.Printf("Total Data %d\n", totalData) - }) - } + wg.Wait() + + fmt.Printf("Total Data %d\n", totalData) + }) } func BenchmarkDevWrite(mb *testing.B) { NBDdevice := "nbd2" diskSize := 1024 * 1024 * 1024 * 4 - dispatchers := []string{ - "simple", "dispatch", - } - - for _, dt := range dispatchers { + mb.Run("devWrite", func(b *testing.B) { - mb.Run(fmt.Sprintf("%s_devWrite", dt), func(b *testing.B) { + // Setup... + // Lets simulate a little latency + store := sources.NewMemoryStorage(int(diskSize)) + store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0) + driver := modules.NewMetrics(store_latency) - // Setup... - // Lets simulate a little latency - store := sources.NewMemoryStorage(int(diskSize)) - store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0) - driver := modules.NewMetrics(store_latency) + p, err := NewNBD(fmt.Sprintf("/dev/%s", NBDdevice)) + if err != nil { + panic(err) + } - var d NBDDispatcher - if dt == "simple" { - d = NewDispatchSimple() - } else if dt == "dispatch" { - d = NewDispatch() - } - - p, err := NewNBD(d, fmt.Sprintf("/dev/%s", NBDdevice)) + go func() { + err := p.Handle(driver) if err != nil { - panic(err) + fmt.Printf("p.Handle returned %v\n", err) } + }() - go func() { - err := p.Handle(driver) - if err != nil { - fmt.Printf("p.Handle returned %v\n", err) - } - }() + p.WaitReady() - p.WaitReady() + /** + * Cleanup everything + * + */ + b.Cleanup(func() { + err = p.Shutdown() + if err != nil { + fmt.Printf("Error cleaning up %v\n", err) + } + driver.ShowStats("stats") + }) - /** - * Cleanup everything - * - */ - b.Cleanup(func() { - err = p.Shutdown() - if err != nil { - fmt.Printf("Error cleaning up %v\n", err) - } - driver.ShowStats("stats") - }) + driver.ResetMetrics() // Only start counting from now... - driver.ResetMetrics() // Only start counting from now... + // Here's the actual benchmark... - // Here's the actual benchmark... + devfile, err := os.OpenFile(fmt.Sprintf("/dev/%s", NBDdevice), os.O_RDWR, 0666) + if err != nil { + panic("Error opening dev file\n") + } - devfile, err := os.OpenFile(fmt.Sprintf("/dev/%s", NBDdevice), os.O_RDWR, 0666) - if err != nil { - panic("Error opening dev file\n") - } + b.Cleanup(func() { + devfile.Close() + }) - b.Cleanup(func() { - devfile.Close() - }) + var wg sync.WaitGroup + concurrent := make(chan bool, 100) // Do max 100 reads concurrently - var wg sync.WaitGroup - concurrent := make(chan bool, 100) // Do max 100 reads concurrently + // Now do some timing... + b.ResetTimer() - // Now do some timing... - b.ResetTimer() + offset := int64(0) + totalData := int64(0) - offset := int64(0) - totalData := int64(0) + for i := 0; i < b.N; i++ { + wg.Add(1) + concurrent <- true + length := int64(4096) + offset += 4096 + if offset+length >= int64(diskSize) { + offset = 0 + } + totalData += length - for i := 0; i < b.N; i++ { - wg.Add(1) - concurrent <- true - length := int64(4096) - offset += 4096 - if offset+length >= int64(diskSize) { - offset = 0 + // Test read speed... + go func(f_offset int64, f_length int64) { + buffer := make([]byte, f_length) + _, err := devfile.WriteAt(buffer, f_offset) + if err != nil { + fmt.Printf("Error writing file %d %d | %v\n", f_offset, f_length, err) } - totalData += length - - // Test read speed... - go func(f_offset int64, f_length int64) { - buffer := make([]byte, f_length) - _, err := devfile.WriteAt(buffer, f_offset) - if err != nil { - fmt.Printf("Error writing file %d %d | %v\n", f_offset, f_length, err) - } - - wg.Done() - <-concurrent - }(offset, length) - } - err = devfile.Sync() - if err != nil { - fmt.Printf("Error syncing %v\n", err) - } + wg.Done() + <-concurrent + }(offset, length) + } - b.SetBytes(totalData) + err = devfile.Sync() + if err != nil { + fmt.Printf("Error syncing %v\n", err) + } - wg.Wait() + b.SetBytes(totalData) - fmt.Printf("Total Data %d\n", totalData) - }) - } + wg.Wait() + + fmt.Printf("Total Data %d\n", totalData) + }) } diff --git a/main.go b/main.go index ca52a01b..284ba63b 100644 --- a/main.go +++ b/main.go @@ -66,8 +66,7 @@ func main() { os.Exit(1) }() - d := expose.NewDispatch() - p, err := setup(d, driver) + p, err := setup(driver) // Add a cache NOW driver.AddProvider(cache) @@ -99,8 +98,8 @@ func main() { * Setup a disk with some files created. * */ -func setup(dispatch expose.NBDDispatcher, prov storage.StorageProvider) (storage.ExposedStorage, error) { - p, err := expose.NewNBD(dispatch, device) +func setup(prov storage.StorageProvider) (storage.ExposedStorage, error) { + p, err := expose.NewNBD(device) if err != nil { return nil, err } diff --git a/pkg/storage/expose/nbd.go b/pkg/storage/expose/nbd.go new file mode 100644 index 00000000..3cad9619 --- /dev/null +++ b/pkg/storage/expose/nbd.go @@ -0,0 +1,497 @@ +package expose + +import ( + "encoding/binary" + "errors" + "fmt" + "os" + "sync" + "syscall" + "time" + + "github.com/loopholelabs/silo/pkg/storage" +) + +/** + * Exposes a storage provider as an nbd device + * + */ +const NBD_COMMAND = 0xab00 +const NBD_SET_SOCK = 0 | NBD_COMMAND +const NBD_SET_BLKSIZE = 1 | NBD_COMMAND +const NBD_SET_SIZE = 2 | NBD_COMMAND +const NBD_DO_IT = 3 | NBD_COMMAND +const NBD_CLEAR_SOCK = 4 | NBD_COMMAND +const NBD_CLEAR_QUE = 5 | NBD_COMMAND +const NBD_PRINT_DEBUG = 6 | NBD_COMMAND +const NBD_SET_SIZE_BLOCKS = 7 | NBD_COMMAND +const NBD_DISCONNECT = 8 | NBD_COMMAND +const NBD_SET_TIMEOUT = 9 | NBD_COMMAND +const NBD_SET_FLAGS = 10 | NBD_COMMAND + +// NBD Commands +const NBD_CMD_READ = 0 +const NBD_CMD_WRITE = 1 +const NBD_CMD_DISCONNECT = 2 +const NBD_CMD_FLUSH = 3 +const NBD_CMD_TRIM = 4 + +// NBD Flags +const NBD_FLAG_HAS_FLAGS = (1 << 0) +const NBD_FLAG_READ_ONLY = (1 << 1) +const NBD_FLAG_SEND_FLUSH = (1 << 2) +const NBD_FLAG_SEND_TRIM = (1 << 5) + +const NBD_REQUEST_MAGIC = 0x25609513 +const NBD_RESPONSE_MAGIC = 0x67446698 + +// NBD Request packet +type Request struct { + Magic uint32 + Type uint32 + Handle uint64 + From uint64 + Length uint32 +} + +// NBD Response packet +type Response struct { + Magic uint32 + Error uint32 + Handle uint64 +} + +// IOctl call info +type IoctlCall struct { + Cmd uintptr + Value uintptr +} + +type ExposedStorageNBD struct { + dev string + num_connections int + timeout uint64 + size uint64 + block_size uint64 + flags uint64 + socketPairs [][2]int + device_file uintptr + dispatch *Dispatch + prov storage.StorageProvider +} + +func NewExposedStorageNBD(prov storage.StorageProvider, dev string, num_connections int, timeout uint64, size uint64, block_size uint64, flags uint64) *ExposedStorageNBD { + return &ExposedStorageNBD{ + prov: prov, + dev: dev, + num_connections: num_connections, + timeout: timeout, + size: size, + block_size: block_size, + flags: flags, + dispatch: NewDispatch(), + } +} + +func (n *ExposedStorageNBD) setSizes(fp uintptr, size uint64, block_size uint64, flags uint64) error { + //read_only := ((flags & NBD_FLAG_READ_ONLY) == 1) + + tmp_blocksize := uint64(4096) + if size/block_size <= ^uint64(0) { + tmp_blocksize = block_size + } + _, _, en := syscall.Syscall(syscall.SYS_IOCTL, fp, NBD_SET_BLKSIZE, uintptr(tmp_blocksize)) + if en != 0 { + return errors.New(fmt.Sprintf("Error setting blocksize %d\n", tmp_blocksize)) + } + size_blocks := size / tmp_blocksize + _, _, en = syscall.Syscall(syscall.SYS_IOCTL, fp, NBD_SET_SIZE_BLOCKS, uintptr(size_blocks)) + if en != 0 { + return errors.New(fmt.Sprintf("Error setting blocks %d\n", size_blocks)) + } + if tmp_blocksize != block_size { + _, _, en = syscall.Syscall(syscall.SYS_IOCTL, fp, NBD_SET_BLKSIZE, uintptr(block_size)) + if en != 0 { + return errors.New(fmt.Sprintf("Error setting blocksize %d\n", block_size)) + } + } + + syscall.Syscall(syscall.SYS_IOCTL, fp, NBD_CLEAR_SOCK, 0) + + syscall.Syscall(syscall.SYS_IOCTL, fp, NBD_SET_FLAGS, uintptr(flags)) + + // if (ioctl(nbd, BLKROSET, (unsigned long) &read_only) < 0) + return nil +} + +func (n *ExposedStorageNBD) setTimeout(fp uintptr, timeout uint64) error { + _, _, en := syscall.Syscall(syscall.SYS_IOCTL, fp, NBD_SET_TIMEOUT, uintptr(timeout)) + if en != 0 { + return errors.New(fmt.Sprintf("Error setting timeout %d", timeout)) + } + return nil +} + +func (n *ExposedStorageNBD) finishSock(fp uintptr, sock int) error { + _, _, en := syscall.Syscall(syscall.SYS_IOCTL, fp, NBD_SET_SOCK, uintptr(sock)) + if en == syscall.EBUSY { + return errors.New(fmt.Sprintf("Kernel doesn't support multiple connections")) + } else if en != 0 { + return errors.New(fmt.Sprintf("Error setting socket")) + } + return nil +} + +/** + * Check if the nbd connection is up and running... + * + */ +func (n *ExposedStorageNBD) checkConn(dev string) error { + _, err := os.ReadFile(fmt.Sprintf("/sys/block/%s/pid", dev)) + if err == nil { + //fmt.Printf("Connection %s\n", data) + } + return err +} + +func (n *ExposedStorageNBD) Start() error { + device_file := fmt.Sprintf("/dev/%s", n.dev) + + fp, err := os.OpenFile(device_file, os.O_RDWR, 0600) + if err != nil { + return err + } + + n.device_file = fp.Fd() + + // Create the socket pairs, and setup NBD options. + for i := 0; i < n.num_connections; i++ { + sockPair, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0) + if err != nil { + return err + } + n.socketPairs = append(n.socketPairs, sockPair) + + // Start reading commands on the socket and dispatching them to our provider + go func(fd int) { + n.dispatch.Handle(fd, n.prov) + }(sockPair[1]) + + if i == 0 { + n.setSizes(fp.Fd(), n.size, n.block_size, n.flags) + n.setTimeout(fp.Fd(), n.timeout) + } + n.finishSock(fp.Fd(), sockPair[0]) + } + + go func() { + for { + err := n.checkConn(n.dev) + if err == nil { + break + } + // Sleep a bit + // nanosleep(&req, NULL) + time.Sleep(100000000 * time.Nanosecond) + } + _, err := os.OpenFile(device_file, os.O_RDWR, 0600) + if err != nil { + fmt.Printf("Could not open device for updating partition table\n") + } + }() + + // Now do it... + _, _, en := syscall.Syscall(syscall.SYS_IOCTL, fp.Fd(), NBD_DO_IT, 0) + + // Clear the socket + syscall.Syscall(syscall.SYS_IOCTL, fp.Fd(), NBD_CLEAR_SOCK, 0) + + if en != 0 { + return errors.New(fmt.Sprintf("Error after DO_IT %d\n", en)) + } + + // Close the device... + fp.Close() + return nil +} + +// Wait until it's ready... +func (n *ExposedStorageNBD) Ready() { + for { + err := n.checkConn(n.dev) + if err == nil { + break + } + time.Sleep(100 * time.Nanosecond) + } +} + +func (n *ExposedStorageNBD) Disconnect() error { + + fmt.Printf("Closing sockets...\n") + // Close all the socket pairs... + for _, v := range n.socketPairs { + err := syscall.Close(v[1]) + if err != nil { + return err + } + } + + fmt.Printf("NBD_DISCONNECT\n") + _, _, en := syscall.Syscall(syscall.SYS_IOCTL, uintptr(n.device_file), NBD_DISCONNECT, 0) + if en != 0 { + return errors.New(fmt.Sprintf("Error disconnecting %d", en)) + } + fmt.Printf("NBD_CLEAR_SOCK\n") + _, _, en = syscall.Syscall(syscall.SYS_IOCTL, uintptr(n.device_file), NBD_CLEAR_SOCK, 0) + if en != 0 { + return errors.New(fmt.Sprintf("Error clearing sock %d", en)) + } + + // Wait for the pid to go away + fmt.Printf("Wait pid\n") + for { + err := n.checkConn(n.dev) + if err != nil { + break + } + time.Sleep(100 * time.Nanosecond) + } + + return nil +} + +type Dispatch struct { + ASYNC_READS bool + ASYNC_WRITES bool + fp *os.File + responseHeader []byte + writeLock sync.Mutex + prov storage.StorageProvider + fatal chan error + pendingResponses sync.WaitGroup +} + +func NewDispatch() *Dispatch { + d := &Dispatch{ + ASYNC_WRITES: true, + ASYNC_READS: true, + responseHeader: make([]byte, 16), + fatal: make(chan error, 8), + } + binary.BigEndian.PutUint32(d.responseHeader, NBD_RESPONSE_MAGIC) + return d +} + +func (d *Dispatch) Wait() { + // Wait for any pending responses + d.pendingResponses.Wait() +} + +/** + * Write a response... + * + */ +func (d *Dispatch) writeResponse(respError uint32, respHandle uint64, chunk []byte) error { + d.writeLock.Lock() + defer d.writeLock.Unlock() + + // fmt.Printf("WriteResponse %x -> %d\n", respHandle, len(chunk)) + + binary.BigEndian.PutUint32(d.responseHeader[4:], respError) + binary.BigEndian.PutUint64(d.responseHeader[8:], respHandle) + + _, err := d.fp.Write(d.responseHeader) + if err != nil { + return err + } + if len(chunk) > 0 { + _, err = d.fp.Write(chunk) + if err != nil { + return err + } + } + return nil +} + +/** + * This dispatches incoming NBD requests sequentially to the provider. + * + */ +func (d *Dispatch) Handle(fd int, prov storage.StorageProvider) error { + d.prov = prov + d.fp = os.NewFile(uintptr(fd), "unix") + + // Speed read and dispatch... + + BUFFER_SIZE := 4 * 1024 * 1024 + buffer := make([]byte, BUFFER_SIZE) + wp := 0 + + request := Request{} + + for { + // fmt.Printf("Calling read...\n") + n, err := d.fp.Read(buffer[wp:]) + if err != nil { + fmt.Printf("Error %v\n", err) + return err + } + wp += n + + // fmt.Printf("Read %d\n", n) + + // Now go through processing complete packets + rp := 0 + for { + // fmt.Printf("Processing data %d %d\n", rp, wp) + // Make sure we have a complete header + if wp-rp >= 28 { + // We can read the neader... + + header := buffer[rp : rp+28] + request.Magic = binary.BigEndian.Uint32(header) + request.Type = binary.BigEndian.Uint32(header[4:8]) + request.Handle = binary.BigEndian.Uint64(header[8:16]) + request.From = binary.BigEndian.Uint64(header[16:24]) + request.Length = binary.BigEndian.Uint32(header[24:28]) + + if request.Magic != NBD_REQUEST_MAGIC { + return fmt.Errorf("Received invalid MAGIC") + } + + if request.Type == NBD_CMD_DISCONNECT { + fmt.Printf(" -> CMD_DISCONNECT\n") + return nil // All done + } else if request.Type == NBD_CMD_FLUSH { + return fmt.Errorf("Not supported: Flush") + } else if request.Type == NBD_CMD_READ { + // fmt.Printf("READ %x %d\n", request.Handle, request.Length) + rp += 28 + err := d.cmdRead(request.Handle, request.From, request.Length) + if err != nil { + return err + } + } else if request.Type == NBD_CMD_WRITE { + rp += 28 + if wp-rp < int(request.Length) { + rp -= 28 + break // We don't have enough data yet... Wait for next read + } + data := make([]byte, request.Length) + copy(data, buffer[rp:rp+int(request.Length)]) + rp += int(request.Length) + //fmt.Printf("WRITE %x %d\n", request.Handle, request.Length) + err := d.cmdWrite(request.Handle, request.From, request.Length, data) + if err != nil { + return err + } + } else if request.Type == NBD_CMD_TRIM { + // fmt.Printf("TRIM\n") + rp += 28 + err = d.cmdTrim(request.Handle, request.From, request.Length) + if err != nil { + return err + } + } else { + return fmt.Errorf("NBD Not implemented %d\n", request.Type) + } + + } else { + break // Try again when we have more data... + } + } + // Now we need to move any partial to the start + if rp != 0 && rp != wp { + // fmt.Printf("Copy partial %d %d\n", rp, wp) + + copy(buffer, buffer[rp:wp]) + } + wp -= rp + + } +} + +/** + * cmdRead + * + */ +func (d *Dispatch) cmdRead(cmd_handle uint64, cmd_from uint64, cmd_length uint32) error { + + performRead := func(handle uint64, from uint64, length uint32) error { + data := make([]byte, length) + _, e := d.prov.ReadAt(data, int64(from)) + errorValue := uint32(0) + if e != nil { + errorValue = 1 + data = make([]byte, 0) // If there was an error, don't send data + } + return d.writeResponse(errorValue, handle, data) + } + + if d.ASYNC_READS { + d.pendingResponses.Add(1) + go func() { + err := performRead(cmd_handle, cmd_from, cmd_length) + if err != nil { + d.fatal <- err + } + d.pendingResponses.Done() + }() + } else { + return performRead(cmd_handle, cmd_from, cmd_length) + } + return nil +} + +/** + * cmdWrite + * + */ +func (d *Dispatch) cmdWrite(cmd_handle uint64, cmd_from uint64, cmd_length uint32, cmd_data []byte) error { + performWrite := func(handle uint64, from uint64, length uint32, data []byte) error { + _, e := d.prov.WriteAt(data, int64(from)) + errorValue := uint32(0) + if e != nil { + errorValue = 1 + } + return d.writeResponse(errorValue, handle, []byte{}) + } + + if d.ASYNC_WRITES { + d.pendingResponses.Add(1) + go func() { + err := performWrite(cmd_handle, cmd_from, cmd_length, cmd_data) + if err != nil { + d.fatal <- err + } + d.pendingResponses.Done() + }() + } else { + return performWrite(cmd_handle, cmd_from, cmd_length, cmd_data) + } + return nil +} + +/** + * cmdTrim + * + */ +func (d *Dispatch) cmdTrim(handle uint64, from uint64, length uint32) error { + // Ask the provider + /* + e := d.prov.Trim(from, length) + if e != storage.StorageError_SUCCESS { + err := d.writeResponse(1, handle, []byte{}) + if err != nil { + return err + } + } else { + */ + err := d.writeResponse(0, handle, []byte{}) + if err != nil { + return err + } + // } + return nil +} diff --git a/pkg/storage/expose/nbd_dev_test.go b/pkg/storage/expose/nbd_dev_test.go new file mode 100644 index 00000000..d22a1b52 --- /dev/null +++ b/pkg/storage/expose/nbd_dev_test.go @@ -0,0 +1,194 @@ +package expose + +import ( + "fmt" + "os" + "sync" + "testing" + + "github.com/loopholelabs/silo/pkg/storage/modules" + "github.com/loopholelabs/silo/pkg/storage/sources" +) + +func BenchmarkDevRead(b *testing.B) { + NBDdevice := "nbd1" + diskSize := 1024 * 1024 * 1024 * 4 + + // Setup... + // Lets simulate a little latency here + store := sources.NewMemoryStorage(int(diskSize)) + //store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0) + driver := modules.NewMetrics(store) + + n := NewExposedStorageNBD(driver, NBDdevice, 1, 0, uint64(driver.Size()), 4096, 0) + + go func() { + err := n.Start() + if err != nil { + panic(err) + } + }() + + n.Ready() + + /** + * Cleanup everything + * + */ + b.Cleanup(func() { + err := n.Disconnect() + if err != nil { + fmt.Printf("Error cleaning up %v\n", err) + } + driver.ShowStats("stats") + }) + + driver.ResetMetrics() // Only start counting from now... + + // Here's the actual benchmark... + + devfile, err := os.OpenFile(fmt.Sprintf("/dev/%s", NBDdevice), os.O_RDWR, 0666) + if err != nil { + panic("Error opening dev file\n") + } + + b.Cleanup(func() { + devfile.Close() + }) + + var wg sync.WaitGroup + concurrent := make(chan bool, 100) // Do max 100 reads concurrently + + // Now do some timing... + b.ResetTimer() + + read_size := 4096 + offset := int64(0) + totalData := int64(0) + + for i := 0; i < b.N; i++ { + wg.Add(1) + concurrent <- true + length := int64(read_size) + offset += int64(read_size) + if offset+length >= int64(diskSize) { + offset = 0 + } + totalData += length + + // Test read speed... + go func(f_offset int64, f_length int64) { + buffer := make([]byte, f_length) + _, err := devfile.ReadAt(buffer, f_offset) + if err != nil { + fmt.Printf("Error reading file %v\n", err) + } + + wg.Done() + <-concurrent + }(offset, length) + } + + b.SetBytes(totalData) + + wg.Wait() + + fmt.Printf("Total Data %d from %d ops\n", totalData, b.N) +} + +func BenchmarkDevWrite(b *testing.B) { + NBDdevice := "nbd1" + diskSize := 1024 * 1024 * 1024 * 4 + + // Setup... + // Lets simulate a little latency here + store := sources.NewMemoryStorage(int(diskSize)) + // store_latency := modules.NewArtificialLatency(store, 100*time.Millisecond, 0, 100*time.Millisecond, 0) + driver := modules.NewMetrics(store) + + n := NewExposedStorageNBD(driver, NBDdevice, 1, 0, uint64(driver.Size()), 4096, 0) + + go func() { + err := n.Start() + if err != nil { + panic(err) + } + }() + + n.Ready() + + /** + * Cleanup everything + * + */ + b.Cleanup(func() { + err := n.Disconnect() + if err != nil { + fmt.Printf("Error cleaning up %v\n", err) + } + driver.ShowStats("stats") + }) + + driver.ResetMetrics() // Only start counting from now... + + // Here's the actual benchmark... + + devfile, err := os.OpenFile(fmt.Sprintf("/dev/%s", NBDdevice), os.O_RDWR, 0666) + if err != nil { + panic("Error opening dev file\n") + } + + /* + b.Cleanup(func() { + devfile.Close() + }) + */ + + var wg sync.WaitGroup + concurrent := make(chan bool, 100) // Do max 100 writes concurrently + + // Now do some timing... + b.ResetTimer() + + offset := int64(0) + totalData := int64(0) + + for i := 0; i < b.N; i++ { + wg.Add(1) + concurrent <- true + length := int64(4096) + offset += 4096 + if offset+length >= int64(diskSize) { + offset = 0 + } + totalData += length + + // Test write speed... + go func(f_offset int64, f_length int64) { + buffer := make([]byte, f_length) + _, err := devfile.WriteAt(buffer, f_offset) + if err != nil { + fmt.Printf("Error writing file %v\n", err) + } + + wg.Done() + <-concurrent + }(offset, length) + } + + b.SetBytes(totalData) + + wg.Wait() + + // Flush things... + err = devfile.Sync() + if err != nil { + fmt.Printf("Error performing sync %v\n", err) + } + err = devfile.Close() + if err != nil { + fmt.Printf("Error closing dev %v\n", err) + } + + fmt.Printf("Total Data %d from %d ops\n", totalData, b.N) +} diff --git a/pkg/storage/expose/nbd_test.go b/pkg/storage/expose/nbd_test.go new file mode 100644 index 00000000..3f5e9575 --- /dev/null +++ b/pkg/storage/expose/nbd_test.go @@ -0,0 +1,44 @@ +package expose + +import ( + "fmt" + "os" + "testing" + + "github.com/loopholelabs/silo/pkg/storage/sources" + "github.com/stretchr/testify/assert" +) + +func TestNBDDevice(t *testing.T) { + var n *ExposedStorageNBD + dev := "nbd1" + defer func() { + fmt.Printf("Shutting down properly...\n") + err := n.Disconnect() + assert.NoError(t, err) + fmt.Printf("Shutdown complete\n") + }() + + size := 4096 * 1024 * 1024 + prov := sources.NewMemoryStorage(size) + + n = NewExposedStorageNBD(prov, dev, 1, 0, uint64(size), 4096, 0) + + go func() { + err := n.Start() + assert.NoError(t, err) + }() + + n.Ready() + + devfile, err := os.OpenFile(fmt.Sprintf("/dev/%s", dev), os.O_RDWR, 0666) + assert.NoError(t, err) + + // Try doing a read... + buffer := make([]byte, 4096) + num, err := devfile.ReadAt(buffer, 10) + assert.NoError(t, err) + assert.Equal(t, len(buffer), num) + + devfile.Close() +}