diff --git a/pkg/spooledtempfile/spooled.go b/pkg/spooledtempfile/spooled.go index be90061..a54c0eb 100644 --- a/pkg/spooledtempfile/spooled.go +++ b/pkg/spooledtempfile/spooled.go @@ -13,32 +13,31 @@ import ( "time" ) -// MaxInMemorySize is the max number of bytes (currently 1MB) -// to hold in memory before starting to write to disk -const MaxInMemorySize = 1024 * 1024 - -// DefaultMaxRAMUsageFraction is the default fraction of system RAM above which -// we'll force spooling to disk. For example, 0.5 = 50%. -const DefaultMaxRAMUsageFraction = 0.50 - -// Constant defining how often we check system memory usage. -const memoryCheckInterval = 500 * time.Millisecond +const ( + // InitialBufferSize is the initial pre-allocated buffer size for in-memory writes + InitialBufferSize = 64 * 1024 // 64 KB initial buffer size + // MaxInMemorySize is the max number of bytes (currently 1MB) to hold in memory before starting to write to disk + MaxInMemorySize = 1024 * 1024 + // DefaultMaxRAMUsageFraction is the default fraction of system RAM above which we'll force spooling to disk + DefaultMaxRAMUsageFraction = 0.50 + // memoryCheckInterval defines how often we check system memory usage. + memoryCheckInterval = 500 * time.Millisecond +) -// globalMemoryCache is a struct representing global cache of memory usage data. type globalMemoryCache struct { sync.Mutex lastChecked time.Time lastFraction float64 } -// memoryUsageCache is an atomic pointer to memoryUsageData. -var memoryUsageCache = &globalMemoryCache{} - -var spooledPool = sync.Pool{ - New: func() interface{} { - return bytes.NewBuffer(nil) - }, -} +var ( + memoryUsageCache = &globalMemoryCache{} + spooledPool = sync.Pool{ + New: func() interface{} { + return make([]byte, 0, InitialBufferSize) // Small initial buffer + }, + } +) // ReaderAt is the interface for ReadAt - read at position, without moving pointer. type ReaderAt interface { @@ -58,8 +57,8 @@ type ReadSeekCloser interface { // spooledTempFile writes to memory (or to disk if // over MaxInMemorySize) and deletes the file on Close type spooledTempFile struct { - buf *bytes.Buffer - mem *bytes.Reader + buf []byte // Use []byte instead of bytes.Buffer + mem *bytes.Reader // Reader for in-memory data file *os.File filePrefix string tempDir string @@ -86,25 +85,27 @@ type ReadWriteSeekCloser interface { // If the system memory usage is above maxRAMUsageFraction, we skip writing // to memory and spool directly on disk to avoid OOM scenarios in high concurrency. // -// From our tests, we've seen that a bytes.Buffer minimum allocated size is 64 bytes, any threshold below that will cause the first write to be spooled on disk. -func NewSpooledTempFile( - filePrefix string, - tempDir string, - threshold int, - fullOnDisk bool, - maxRAMUsageFraction float64, -) ReadWriteSeekCloser { +// If threshold is less than InitialBufferSize, we default to InitialBufferSize. +// This can cause a buffer not to spool to disk as expected given the threshold passed in. +// e.g.: If threshold is 100, it will default to InitialBufferSize (64KB), then 150B are written effectively crossing the passed threshold, +// but the buffer will not spool to disk as expected. Only when the buffer grows beyond 64KB will it spool to disk. +func NewSpooledTempFile(filePrefix string, tempDir string, threshold int, fullOnDisk bool, maxRAMUsageFraction float64) ReadWriteSeekCloser { if threshold < 0 { threshold = MaxInMemorySize } + if maxRAMUsageFraction <= 0 { maxRAMUsageFraction = DefaultMaxRAMUsageFraction } + if threshold <= InitialBufferSize { + threshold = InitialBufferSize + } + return &spooledTempFile{ filePrefix: filePrefix, tempDir: tempDir, - buf: spooledPool.Get().(*bytes.Buffer), + buf: spooledPool.Get().([]byte), // Get a []byte from the pool maxInMemorySize: threshold, fullOnDisk: fullOnDisk, maxRAMUsageFraction: maxRAMUsageFraction, @@ -128,7 +129,7 @@ func (s *spooledTempFile) prepareRead() error { return nil } - s.mem = bytes.NewReader(s.buf.Bytes()) + s.mem = bytes.NewReader(s.buf) // Create a reader from the []byte slice return nil } @@ -140,7 +141,7 @@ func (s *spooledTempFile) Len() int { } return int(fi.Size()) } - return s.buf.Len() + return len(s.buf) // Return the length of the []byte slice } func (s *spooledTempFile) Read(p []byte) (n int, err error) { @@ -151,6 +152,7 @@ func (s *spooledTempFile) Read(p []byte) (n int, err error) { if s.file != nil { return s.file.Read(p) } + return s.mem.Read(p) } @@ -162,6 +164,7 @@ func (s *spooledTempFile) ReadAt(p []byte, off int64) (n int, err error) { if s.file != nil { return s.file.ReadAt(p, off) } + return s.mem.ReadAt(p, off) } @@ -184,15 +187,12 @@ func (s *spooledTempFile) Write(p []byte) (n int, err error) { panic("write after read") } - // If we already have a file open, we always write to disk. if s.file != nil { return s.file.Write(p) } - // Otherwise, check if system memory usage is above threshold - // or if we've exceeded our own in-memory limit, or if user forced on-disk. aboveRAMThreshold := s.isSystemMemoryUsageHigh() - if aboveRAMThreshold || s.fullOnDisk || (s.buf.Len()+len(p) > s.maxInMemorySize) || (s.buf.Cap() > s.maxInMemorySize) { + if aboveRAMThreshold || s.fullOnDisk || (len(s.buf)+len(p) > s.maxInMemorySize) { // Switch to file if we haven't already s.file, err = os.CreateTemp(s.tempDir, s.filePrefix+"-") if err != nil { @@ -200,22 +200,19 @@ func (s *spooledTempFile) Write(p []byte) (n int, err error) { } // Copy what we already had in the buffer - _, err = io.Copy(s.file, s.buf) + _, err = s.file.Write(s.buf) if err != nil { s.file.Close() s.file = nil return 0, err } - // If we're above the RAM threshold, we don't want to keep the buffer around. - if s.buf.Cap() > s.maxInMemorySize { - s.buf = nil - } else { - // Release the buffer - s.buf.Reset() - spooledPool.Put(s.buf) - s.buf = nil + // Release the buffer back to the pool + if s.buf != nil && cap(s.buf) <= InitialBufferSize && cap(s.buf) > 0 { + spooledPool.Put(s.buf[:0]) // Reset the buffer before returning it to the pool } + s.buf = nil + s.mem = nil // Discard the bytes.Reader // Write incoming bytes directly to file n, err = s.file.Write(p) @@ -227,22 +224,40 @@ func (s *spooledTempFile) Write(p []byte) (n int, err error) { return n, nil } - // Otherwise, stay in memory. - return s.buf.Write(p) + // Grow the buffer if necessary, but never exceed MaxInMemorySize + if len(s.buf)+len(p) > cap(s.buf) { + newCap := len(s.buf) + len(p) + if newCap > s.maxInMemorySize { + newCap = s.maxInMemorySize + } + + // Allocate a new buffer with the increased capacity + newBuf := make([]byte, len(s.buf), newCap) + copy(newBuf, s.buf) + + // Release the old buffer to the pool + if s.buf != nil && cap(s.buf) <= InitialBufferSize && cap(s.buf) > 0 { + spooledPool.Put(s.buf[:0]) // Reset the buffer before returning it to the pool + } + s.buf = newBuf + s.mem = nil // Discard the old bytes.Reader + } + + // Append data to the buffer + s.buf = append(s.buf, p...) + return len(p), nil } func (s *spooledTempFile) Close() error { s.closed = true s.mem = nil - // If we're above the RAM threshold, we don't want to keep the buffer around. - if s.buf != nil && s.buf.Cap() > s.maxInMemorySize { - s.buf = nil - } else if s.buf != nil { - // Release the buffer - s.buf.Reset() - spooledPool.Put(s.buf) + // Release the buffer back to the pool + if s.buf != nil { s.buf = nil + if s.buf != nil && cap(s.buf) <= InitialBufferSize && cap(s.buf) > 0 { + spooledPool.Put(s.buf[:0]) // Reset the buffer before returning it to the pool + } } if s.file == nil { @@ -282,35 +297,24 @@ func getCachedMemoryUsage() (float64, error) { memoryUsageCache.Lock() defer memoryUsageCache.Unlock() - // 1) If it's still fresh, just return the cached value. if time.Since(memoryUsageCache.lastChecked) < memoryCheckInterval { return memoryUsageCache.lastFraction, nil } - // 2) Otherwise, do a fresh read (expensive). fraction, err := getSystemMemoryUsedFraction() if err != nil { return 0, err } - // 3) Update the cache memoryUsageCache.lastChecked = time.Now() memoryUsageCache.lastFraction = fraction return fraction, nil } -// getSystemMemoryUsedFraction parses /proc/meminfo on Linux to figure out -// how much memory is used vs total. Returns fraction = used / total -// This is a Linux-specific implementation. -// This function is defined as a variable so it can be overridden in tests. -// Now includes lock-free CAS caching to avoid hammering /proc/meminfo on every call. var getSystemMemoryUsedFraction = func() (float64, error) { - // We're the winners and need to refresh the data. f, err := os.Open("/proc/meminfo") if err != nil { - // If we cannot open /proc/meminfo, return an error - // or fallback if you prefer return 0, fmt.Errorf("failed to open /proc/meminfo: %v", err) } defer f.Close() @@ -325,7 +329,6 @@ var getSystemMemoryUsedFraction = func() (float64, error) { } key := strings.TrimRight(fields[0], ":") value, _ := strconv.ParseUint(fields[1], 10, 64) - // value is typically in kB switch key { case "MemTotal": memTotal = value @@ -349,15 +352,11 @@ var getSystemMemoryUsedFraction = func() (float64, error) { var used uint64 if memAvailable > 0 { - // Linux 3.14+ has MemAvailable for better measure used = memTotal - memAvailable } else { - // Approximate available as free + buffers + cached approxAvailable := memFree + buffers + cached used = memTotal - approxAvailable } - fraction := float64(used) / float64(memTotal) - - return fraction, nil + return float64(used) / float64(memTotal), nil } diff --git a/pkg/spooledtempfile/spooled_test.go b/pkg/spooledtempfile/spooled_test.go index 62f7a8a..5f15364 100644 --- a/pkg/spooledtempfile/spooled_test.go +++ b/pkg/spooledtempfile/spooled_test.go @@ -11,6 +11,10 @@ import ( "testing" ) +func generateTestDataInKB(size int) []byte { + return bytes.Repeat([]byte("A"), size*1024) +} + // TestInMemoryBasic writes data below threshold and verifies it remains in memory. func TestInMemoryBasic(t *testing.T) { memoryUsageCache = &globalMemoryCache{} @@ -64,23 +68,19 @@ func TestInMemoryBasic(t *testing.T) { // TestThresholdCrossing writes enough data to switch from in-memory to disk. func TestThresholdCrossing(t *testing.T) { memoryUsageCache = &globalMemoryCache{} - spool := NewSpooledTempFile("test", os.TempDir(), 64, false, -1) - spoolFile := spool.(*spooledTempFile) + spool := NewSpooledTempFile("test", os.TempDir(), 64*1024, false, -1) defer spool.Close() - data1 := []byte("12345") - // Generate 64 random bytes in data2 - data2 := make([]byte, 64) - _, _ = io.ReadFull(bytes.NewReader([]byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")), data2) - - if spoolFile.buf.Cap() > 64 { - t.Errorf("Expected buffer capacity to be 64 or less, got %d", spoolFile.buf.Cap()) - } + data1 := generateTestDataInKB(63) + data2 := generateTestDataInKB(10) _, err := spool.Write(data1) if err != nil { t.Fatalf("First Write error: %v", err) } + if spool.Len() != 63*1024 { + t.Errorf("Len() mismatch: got %d, want %d", spool.Len(), 63*1024) + } if spool.FileName() != "" { t.Errorf("Expected to still be in memory, but file exists: %s", spool.FileName()) } @@ -89,6 +89,9 @@ func TestThresholdCrossing(t *testing.T) { if err != nil { t.Fatalf("Second Write error: %v", err) } + if spool.Len() != 73*1024 { + t.Errorf("Len() mismatch: got %d, want %d", spool.Len(), 63*1024) + } // Now spool should be on disk fn := spool.FileName() @@ -115,7 +118,7 @@ func TestThresholdCrossing(t *testing.T) { // TestForceOnDisk checks the fullOnDisk parameter. func TestForceOnDisk(t *testing.T) { memoryUsageCache = &globalMemoryCache{} - spool := NewSpooledTempFile("test", os.TempDir(), 1000000, true, -1) + spool := NewSpooledTempFile("test", os.TempDir(), 64*1024, true, -1) defer spool.Close() input := []byte("force to disk") @@ -123,6 +126,9 @@ func TestForceOnDisk(t *testing.T) { if err != nil { t.Fatalf("Write error: %v", err) } + if spool.Len() != len(input) { + t.Errorf("Len() mismatch: got %d, want %d", spool.Len(), len(input)) + } if spool.FileName() == "" { t.Errorf("Expected a file name because fullOnDisk = true, got empty") @@ -140,7 +146,7 @@ func TestForceOnDisk(t *testing.T) { // TestReadAtAndSeekInMemory tests seeking and ReadAt on an in-memory spool. func TestReadAtAndSeekInMemory(t *testing.T) { memoryUsageCache = &globalMemoryCache{} - spool := NewSpooledTempFile("test", "", 100, false, -1) + spool := NewSpooledTempFile("test", os.TempDir(), 64*1024, false, -1) defer spool.Close() data := []byte("HelloWorld123") @@ -185,14 +191,19 @@ func TestReadAtAndSeekInMemory(t *testing.T) { // TestReadAtAndSeekOnDisk tests seeking and ReadAt on a spool that has switched to disk. func TestReadAtAndSeekOnDisk(t *testing.T) { memoryUsageCache = &globalMemoryCache{} - spool := NewSpooledTempFile("test", "", 10, false, -1) + spool := NewSpooledTempFile("test", os.TempDir(), 64*1024, false, -1) defer spool.Close() - data := []byte("HelloWorld123") + data1 := []byte("HelloWorld123") + data2 := generateTestDataInKB(65) + data := append(data2, data1...) _, err := spool.Write(data) if err != nil { t.Fatalf("Write error: %v", err) } + if spool.Len() != len(data) { + t.Errorf("Len() mismatch: got %d, want %d", spool.Len(), len(data)) + } // We crossed threshold at 10 bytes => spool on disk if spool.FileName() == "" { @@ -205,7 +216,7 @@ func TestReadAtAndSeekOnDisk(t *testing.T) { } p := make([]byte, 5) - _, err = spool.ReadAt(p, 5) + _, err = spool.ReadAt(p, (65*1024)+5) if err != nil { t.Fatalf("ReadAt error: %v", err) } @@ -217,7 +228,7 @@ func TestReadAtAndSeekOnDisk(t *testing.T) { // TestWriteAfterReadPanic ensures writing after reading panics per your design. func TestWriteAfterReadPanic(t *testing.T) { memoryUsageCache = &globalMemoryCache{} - spool := NewSpooledTempFile("test", "", 100, false, -1) + spool := NewSpooledTempFile("test", os.TempDir(), 64*1024, false, -1) defer spool.Close() _, err := spool.Write([]byte("ABCDEFG")) @@ -251,7 +262,7 @@ func TestWriteAfterReadPanic(t *testing.T) { // TestCloseInMemory checks closing while still in-memory. func TestCloseInMemory(t *testing.T) { memoryUsageCache = &globalMemoryCache{} - spool := NewSpooledTempFile("test", "", 100, false, -1) + spool := NewSpooledTempFile("test", os.TempDir(), 64*1024, false, -1) _, err := spool.Write([]byte("Small data")) if err != nil { @@ -282,12 +293,16 @@ func TestCloseInMemory(t *testing.T) { // TestCloseOnDisk checks closing after spool has switched to disk. func TestCloseOnDisk(t *testing.T) { memoryUsageCache = &globalMemoryCache{} - spool := NewSpooledTempFile("test", "", 10, false, -1) + spool := NewSpooledTempFile("test", os.TempDir(), 64*1024, false, -1) - _, err := spool.Write([]byte("1234567890ABC")) + data := generateTestDataInKB(65) + _, err := spool.Write(data) if err != nil { t.Fatalf("Write error: %v", err) } + if spool.Len() != len(data) { + t.Errorf("Len() mismatch: got %d, want %d", spool.Len(), len(data)) + } fn := spool.FileName() if fn == "" { @@ -321,7 +336,7 @@ func TestCloseOnDisk(t *testing.T) { // TestLen verifies Len() for both in-memory and on-disk states. func TestLen(t *testing.T) { memoryUsageCache = &globalMemoryCache{} - spool := NewSpooledTempFile("test", "", 5, false, -1) + spool := NewSpooledTempFile("test", os.TempDir(), 64*1024, false, -1) defer spool.Close() data := []byte("1234") @@ -346,7 +361,7 @@ func TestLen(t *testing.T) { // TestFileName checks correctness of FileName in both modes. func TestFileName(t *testing.T) { memoryUsageCache = &globalMemoryCache{} - spool := NewSpooledTempFile("testprefix", os.TempDir(), 5, false, -1) + spool := NewSpooledTempFile("testprefix", os.TempDir(), 64*1024, false, -1) defer spool.Close() if spool.FileName() != "" { @@ -354,10 +369,14 @@ func TestFileName(t *testing.T) { } // Cross threshold - _, err := spool.Write([]byte("hellooooooo")) + data := generateTestDataInKB(65) + _, err := spool.Write(data) if err != nil { t.Fatalf("Write error crossing threshold: %v", err) } + if spool.Len() != len(data) { + t.Errorf("Len() mismatch on-disk: got %d, want %d", spool.Len(), len(data)) + } fn := spool.FileName() if fn == "" { @@ -417,3 +436,146 @@ func TestSkipInMemoryAboveRAMUsage(t *testing.T) { t.Errorf("Data mismatch. Got %q, want %q", out, data) } } + +// TestBufferGrowthWithinLimits verifies that the buffer grows dynamically but never exceeds MaxInMemorySize. +func TestBufferGrowthWithinLimits(t *testing.T) { + memoryUsageCache = &globalMemoryCache{} + spool := NewSpooledTempFile("test", os.TempDir(), 128*1024, false, -1) + defer spool.Close() + + // Write data that will cause the buffer to grow but stay within MaxInMemorySize + data1 := generateTestDataInKB(30) + data2 := generateTestDataInKB(35) + + _, err := spool.Write(data1) + if err != nil { + t.Fatalf("Write error: %v", err) + } + if spool.Len() != len(data1) { + t.Errorf("Len() mismatch: got %d, want %d", spool.Len(), len(data1)) + } + + // Check that the buffer is still in memory + if spool.FileName() != "" { + t.Errorf("Expected buffer to still be in memory, but file exists: %s", spool.FileName()) + } + + // Write more data to trigger buffer growth + _, err = spool.Write(data2) + if err != nil { + t.Fatalf("Write error: %v", err) + } + if spool.Len() != len(data1)+len(data2) { + t.Errorf("Len() mismatch: got %d, want %d", spool.Len(), len(data1)+len(data2)) + } + + // Check that the buffer grew + spoolBuffer := spool.(*spooledTempFile) + if cap(spoolBuffer.buf) <= InitialBufferSize { + t.Fatalf("Expected buffer capacity > %d, got %d", InitialBufferSize, cap(spoolBuffer.buf)) + } + + // Check that the buffer is still in memory and has grown + if spool.FileName() != "" { + t.Errorf("Expected buffer to still be in memory, but file exists: %s", spool.FileName()) + } +} + +// TestPoolBehavior verifies that buffers exceeding InitialBufferSize are not returned to the pool. +func TestPoolBehavior(t *testing.T) { + memoryUsageCache = &globalMemoryCache{} + spool := NewSpooledTempFile("test", os.TempDir(), 150*1024, false, -1) + defer spool.Close() + + // Write data to grow the buffer beyond InitialBufferSize + data := make([]byte, 100*1024) + n := copy(data, bytes.Repeat([]byte("A"), 100*1024)) + if n != 100*1024 { + t.Fatalf("Data copy mismatch: got %d, want %d", n, 100*1024) + } + if len(data) != 100*1024 { + t.Fatalf("Data length mismatch: got %d, want %d", len(data), 100*1024) + } + _, err := spool.Write(data) + if err != nil { + t.Fatalf("Write error: %v", err) + } + + // Ensure the buffer has grown beyond InitialBufferSize + spoolTempFile := spool.(*spooledTempFile) + if cap(spoolTempFile.buf) <= InitialBufferSize { + t.Fatalf("Expected buffer capacity > %d, got %d", InitialBufferSize, cap(spoolTempFile.buf)) + } + + // Close the spool to release the buffer + err = spool.Close() + if err != nil { + t.Fatalf("Close error: %v", err) + } + + // Retrieve a buffer from the pool + buf := spooledPool.Get().([]byte) + + // Verify that the retrieved buffer has the expected initial capacity + if cap(buf) != InitialBufferSize { + t.Errorf("Expected buffer in pool to have capacity %d, got %d", InitialBufferSize, cap(buf)) + } + + // Verify that the buffer is empty (reset) + if len(buf) != 0 { + t.Errorf("Expected buffer length to be 0, got %d", len(buf)) + } +} + +func TestBufferGrowthBeyondNewCap(t *testing.T) { + memoryUsageCache = &globalMemoryCache{} + spool := NewSpooledTempFile("test", os.TempDir(), 100*1024, false, -1) + defer spool.Close() + + // Write data to grow the buffer close to MaxInMemorySize + data1 := generateTestDataInKB(50) + _, err := spool.Write(data1) + if err != nil { + t.Fatalf("Write error: %v", err) + } + + if spool.Len() != 50*1024 { + t.Fatalf("Data length mismatch: got %d, want %d", spool.Len(), 50*1024) + } + + // Write more data to trigger buffer growth beyond MaxInMemorySize + data2 := generateTestDataInKB(51) + _, err = spool.Write(data2) + if err != nil { + t.Fatalf("Write error: %v", err) + } + + if spool.Len() != 101*1024 { + t.Fatalf("Data length mismatch: got %d, want %d", spool.Len(), 101*1024) + } + + // Check that the buffer has been spooled to disk + if spool.FileName() == "" { + t.Error("Expected buffer to be spooled to disk, but no file exists") + } + + // Verify the data was written correctly + expected := append(data1, data2...) + out := make([]byte, len(expected)) + _, err = spool.ReadAt(out, 0) + if err != nil && err != io.EOF { + t.Fatalf("ReadAt error: %v", err) + } + if !bytes.Equal(out, expected) { + t.Errorf("Data mismatch. Got %q, want %q", out, expected) + } + + // Verify that the buffer was released to the pool (if it meets the criteria) + buf := spooledPool.Get().([]byte) + if cap(buf) != InitialBufferSize { + t.Errorf("Expected buffer in pool to have capacity %d, got %d", InitialBufferSize, cap(buf)) + } + if len(buf) != 0 { + t.Errorf("Expected buffer length to be 0, got %d", len(buf)) + } +}