Skip to content

Commit

Permalink
Refactor spooledTempFile to use byte slices instead of bytes.Buffer (#66
Browse files Browse the repository at this point in the history
)

* refactor spooledTempFile to use byte slices instead of bytes.Buffer

* fix: removed an unreachable branch of Write() and updated unit tests
  • Loading branch information
equals215 authored Jan 24, 2025
1 parent 2e4336c commit cb54f4c
Show file tree
Hide file tree
Showing 2 changed files with 254 additions and 93 deletions.
141 changes: 70 additions & 71 deletions pkg/spooledtempfile/spooled.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,31 @@ import (
"time"
)

// MaxInMemorySize is the max number of bytes (currently 1MB)
// to hold in memory before starting to write to disk
const MaxInMemorySize = 1024 * 1024

// DefaultMaxRAMUsageFraction is the default fraction of system RAM above which
// we'll force spooling to disk. For example, 0.5 = 50%.
const DefaultMaxRAMUsageFraction = 0.50

// Constant defining how often we check system memory usage.
const memoryCheckInterval = 500 * time.Millisecond
const (
// InitialBufferSize is the initial pre-allocated buffer size for in-memory writes
InitialBufferSize = 64 * 1024 // 64 KB initial buffer size
// MaxInMemorySize is the max number of bytes (currently 1MB) to hold in memory before starting to write to disk
MaxInMemorySize = 1024 * 1024
// DefaultMaxRAMUsageFraction is the default fraction of system RAM above which we'll force spooling to disk
DefaultMaxRAMUsageFraction = 0.50
// memoryCheckInterval defines how often we check system memory usage.
memoryCheckInterval = 500 * time.Millisecond
)

// globalMemoryCache is a struct representing global cache of memory usage data.
type globalMemoryCache struct {
sync.Mutex
lastChecked time.Time
lastFraction float64
}

// memoryUsageCache is an atomic pointer to memoryUsageData.
var memoryUsageCache = &globalMemoryCache{}

var spooledPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}
var (
memoryUsageCache = &globalMemoryCache{}
spooledPool = sync.Pool{
New: func() interface{} {
return make([]byte, 0, InitialBufferSize) // Small initial buffer
},
}
)

// ReaderAt is the interface for ReadAt - read at position, without moving pointer.
type ReaderAt interface {
Expand All @@ -58,8 +57,8 @@ type ReadSeekCloser interface {
// spooledTempFile writes to memory (or to disk if
// over MaxInMemorySize) and deletes the file on Close
type spooledTempFile struct {
buf *bytes.Buffer
mem *bytes.Reader
buf []byte // Use []byte instead of bytes.Buffer
mem *bytes.Reader // Reader for in-memory data
file *os.File
filePrefix string
tempDir string
Expand All @@ -86,25 +85,27 @@ type ReadWriteSeekCloser interface {
// If the system memory usage is above maxRAMUsageFraction, we skip writing
// to memory and spool directly on disk to avoid OOM scenarios in high concurrency.
//
// From our tests, we've seen that a bytes.Buffer minimum allocated size is 64 bytes, any threshold below that will cause the first write to be spooled on disk.
func NewSpooledTempFile(
filePrefix string,
tempDir string,
threshold int,
fullOnDisk bool,
maxRAMUsageFraction float64,
) ReadWriteSeekCloser {
// If threshold is less than InitialBufferSize, we default to InitialBufferSize.
// This can cause a buffer not to spool to disk as expected given the threshold passed in.
// e.g.: If threshold is 100, it will default to InitialBufferSize (64KB), then 150B are written effectively crossing the passed threshold,
// but the buffer will not spool to disk as expected. Only when the buffer grows beyond 64KB will it spool to disk.
func NewSpooledTempFile(filePrefix string, tempDir string, threshold int, fullOnDisk bool, maxRAMUsageFraction float64) ReadWriteSeekCloser {
if threshold < 0 {
threshold = MaxInMemorySize
}

if maxRAMUsageFraction <= 0 {
maxRAMUsageFraction = DefaultMaxRAMUsageFraction
}

if threshold <= InitialBufferSize {
threshold = InitialBufferSize
}

return &spooledTempFile{
filePrefix: filePrefix,
tempDir: tempDir,
buf: spooledPool.Get().(*bytes.Buffer),
buf: spooledPool.Get().([]byte), // Get a []byte from the pool
maxInMemorySize: threshold,
fullOnDisk: fullOnDisk,
maxRAMUsageFraction: maxRAMUsageFraction,
Expand All @@ -128,7 +129,7 @@ func (s *spooledTempFile) prepareRead() error {
return nil
}

s.mem = bytes.NewReader(s.buf.Bytes())
s.mem = bytes.NewReader(s.buf) // Create a reader from the []byte slice
return nil
}

Expand All @@ -140,7 +141,7 @@ func (s *spooledTempFile) Len() int {
}
return int(fi.Size())
}
return s.buf.Len()
return len(s.buf) // Return the length of the []byte slice
}

func (s *spooledTempFile) Read(p []byte) (n int, err error) {
Expand All @@ -151,6 +152,7 @@ func (s *spooledTempFile) Read(p []byte) (n int, err error) {
if s.file != nil {
return s.file.Read(p)
}

return s.mem.Read(p)
}

Expand All @@ -162,6 +164,7 @@ func (s *spooledTempFile) ReadAt(p []byte, off int64) (n int, err error) {
if s.file != nil {
return s.file.ReadAt(p, off)
}

return s.mem.ReadAt(p, off)
}

Expand All @@ -184,38 +187,32 @@ func (s *spooledTempFile) Write(p []byte) (n int, err error) {
panic("write after read")
}

// If we already have a file open, we always write to disk.
if s.file != nil {
return s.file.Write(p)
}

// Otherwise, check if system memory usage is above threshold
// or if we've exceeded our own in-memory limit, or if user forced on-disk.
aboveRAMThreshold := s.isSystemMemoryUsageHigh()
if aboveRAMThreshold || s.fullOnDisk || (s.buf.Len()+len(p) > s.maxInMemorySize) || (s.buf.Cap() > s.maxInMemorySize) {
if aboveRAMThreshold || s.fullOnDisk || (len(s.buf)+len(p) > s.maxInMemorySize) {
// Switch to file if we haven't already
s.file, err = os.CreateTemp(s.tempDir, s.filePrefix+"-")
if err != nil {
return 0, err
}

// Copy what we already had in the buffer
_, err = io.Copy(s.file, s.buf)
_, err = s.file.Write(s.buf)
if err != nil {
s.file.Close()
s.file = nil
return 0, err
}

// If we're above the RAM threshold, we don't want to keep the buffer around.
if s.buf.Cap() > s.maxInMemorySize {
s.buf = nil
} else {
// Release the buffer
s.buf.Reset()
spooledPool.Put(s.buf)
s.buf = nil
// Release the buffer back to the pool
if s.buf != nil && cap(s.buf) <= InitialBufferSize && cap(s.buf) > 0 {
spooledPool.Put(s.buf[:0]) // Reset the buffer before returning it to the pool
}
s.buf = nil
s.mem = nil // Discard the bytes.Reader

// Write incoming bytes directly to file
n, err = s.file.Write(p)
Expand All @@ -227,22 +224,40 @@ func (s *spooledTempFile) Write(p []byte) (n int, err error) {
return n, nil
}

// Otherwise, stay in memory.
return s.buf.Write(p)
// Grow the buffer if necessary, but never exceed MaxInMemorySize
if len(s.buf)+len(p) > cap(s.buf) {
newCap := len(s.buf) + len(p)
if newCap > s.maxInMemorySize {
newCap = s.maxInMemorySize
}

// Allocate a new buffer with the increased capacity
newBuf := make([]byte, len(s.buf), newCap)
copy(newBuf, s.buf)

// Release the old buffer to the pool
if s.buf != nil && cap(s.buf) <= InitialBufferSize && cap(s.buf) > 0 {
spooledPool.Put(s.buf[:0]) // Reset the buffer before returning it to the pool
}
s.buf = newBuf
s.mem = nil // Discard the old bytes.Reader
}

// Append data to the buffer
s.buf = append(s.buf, p...)
return len(p), nil
}

func (s *spooledTempFile) Close() error {
s.closed = true
s.mem = nil

// If we're above the RAM threshold, we don't want to keep the buffer around.
if s.buf != nil && s.buf.Cap() > s.maxInMemorySize {
s.buf = nil
} else if s.buf != nil {
// Release the buffer
s.buf.Reset()
spooledPool.Put(s.buf)
// Release the buffer back to the pool
if s.buf != nil {
s.buf = nil
if s.buf != nil && cap(s.buf) <= InitialBufferSize && cap(s.buf) > 0 {
spooledPool.Put(s.buf[:0]) // Reset the buffer before returning it to the pool
}
}

if s.file == nil {
Expand Down Expand Up @@ -282,35 +297,24 @@ func getCachedMemoryUsage() (float64, error) {
memoryUsageCache.Lock()
defer memoryUsageCache.Unlock()

// 1) If it's still fresh, just return the cached value.
if time.Since(memoryUsageCache.lastChecked) < memoryCheckInterval {
return memoryUsageCache.lastFraction, nil
}

// 2) Otherwise, do a fresh read (expensive).
fraction, err := getSystemMemoryUsedFraction()
if err != nil {
return 0, err
}

// 3) Update the cache
memoryUsageCache.lastChecked = time.Now()
memoryUsageCache.lastFraction = fraction

return fraction, nil
}

// getSystemMemoryUsedFraction parses /proc/meminfo on Linux to figure out
// how much memory is used vs total. Returns fraction = used / total
// This is a Linux-specific implementation.
// This function is defined as a variable so it can be overridden in tests.
// Now includes lock-free CAS caching to avoid hammering /proc/meminfo on every call.
var getSystemMemoryUsedFraction = func() (float64, error) {
// We're the winners and need to refresh the data.
f, err := os.Open("/proc/meminfo")
if err != nil {
// If we cannot open /proc/meminfo, return an error
// or fallback if you prefer
return 0, fmt.Errorf("failed to open /proc/meminfo: %v", err)
}
defer f.Close()
Expand All @@ -325,7 +329,6 @@ var getSystemMemoryUsedFraction = func() (float64, error) {
}
key := strings.TrimRight(fields[0], ":")
value, _ := strconv.ParseUint(fields[1], 10, 64)
// value is typically in kB
switch key {
case "MemTotal":
memTotal = value
Expand All @@ -349,15 +352,11 @@ var getSystemMemoryUsedFraction = func() (float64, error) {

var used uint64
if memAvailable > 0 {
// Linux 3.14+ has MemAvailable for better measure
used = memTotal - memAvailable
} else {
// Approximate available as free + buffers + cached
approxAvailable := memFree + buffers + cached
used = memTotal - approxAvailable
}

fraction := float64(used) / float64(memTotal)

return fraction, nil
return float64(used) / float64(memTotal), nil
}
Loading

0 comments on commit cb54f4c

Please sign in to comment.