Skip to content

Commit

Permalink
Hash check, and S3 pulled in parallel
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmy Moore <[email protected]>
  • Loading branch information
jimmyaxod committed Oct 25, 2024
1 parent f862546 commit c929fa4
Showing 1 changed file with 25 additions and 11 deletions.
36 changes: 25 additions & 11 deletions pkg/storage/device/device.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package device

import (
"bytes"
"context"
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -308,20 +310,32 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose
if data != nil {
alternateSources := data.([]packets.AlternateSource)

// TODO: Pull these in parallel
var wg sync.WaitGroup

// Pull these blocks in parallel
for _, as := range alternateSources {
buffer := make([]byte, as.Length)
n, err := s3dest.ReadAt(buffer, as.Offset)
if err != nil || n != int(as.Length) {
panic(err)
}
wg.Add(1)
go func(a packets.AlternateSource) {
buffer := make([]byte, a.Length)
n, err := s3dest.ReadAt(buffer, a.Offset)
if err != nil || n != int(a.Length) {
panic(fmt.Sprintf("sync.start unable to read from S3. %v", err))
}

// TODO: We should probably check the hash here, though if it's incorrect there's nothing we can do.
n, err = prov.WriteAt(buffer, as.Offset)
if err != nil || n != int(as.Length) {
panic(err)
}
// Check the data in S3 hasn't changed.
hash := sha256.Sum256(buffer)
if !bytes.Equal(hash[:], a.Hash[:]) {
panic("The data in S3 is corrupt.")
}

n, err = prov.WriteAt(buffer, a.Offset)
if err != nil || n != int(a.Length) {
panic(fmt.Sprintf("sync.start unable to write data to device from S3. %v", err))
}
wg.Done()
}(as)
}
wg.Wait() // Wait for all S3 requests to complete
}

sync_lock.Lock()
Expand Down

0 comments on commit c929fa4

Please sign in to comment.