Skip to content

Commit

Permalink
wip: spooled manager to handle common memory limitations
Browse files Browse the repository at this point in the history
  • Loading branch information
CorentinB committed Jan 17, 2025
1 parent a9cf503 commit e042cbd
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 62 deletions.
111 changes: 49 additions & 62 deletions spooled.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package warc

// The following code is heavily inspired by: https://github.com/tgulacsi/go/blob/master/temp/memfile.go

import (
"bytes"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -46,8 +43,9 @@ type spooledTempFile struct {
tempDir string
maxInMemorySize int
fullOnDisk bool
reading bool // transitions at most once from false -> true
reading bool
closed bool
manager *SpoolManager
}

// ReadWriteSeekCloser is an io.Writer + io.Reader + io.Seeker + io.Closer.
Expand All @@ -56,44 +54,42 @@ type ReadWriteSeekCloser interface {
io.Writer
}

// NewSpooledTempFile returns an ReadWriteSeekCloser,
// with some important constraints:
// you can Write into it, but whenever you call Read or Seek on it,
// Write is forbidden, will return an error.
// NewSpooledTempFile returns an ReadWriteSeekCloser.
// If threshold is -1, then the default MaxInMemorySize is used.
func NewSpooledTempFile(filePrefix string, tempDir string, threshold int, fullOnDisk bool) ReadWriteSeekCloser {
if threshold < 0 {
threshold = MaxInMemorySize
}

return &spooledTempFile{
s := &spooledTempFile{
filePrefix: filePrefix,
tempDir: tempDir,
buf: spooledPool.Get().(*bytes.Buffer),
maxInMemorySize: threshold,
fullOnDisk: fullOnDisk,
manager: DefaultSpoolManager,
}

s.manager.RegisterSpool(s)

return s
}

func (s *spooledTempFile) prepareRead() error {
if s.closed {
return io.EOF
}

if s.reading && (s.file != nil || s.buf == nil || s.mem != nil) {
return nil
}

s.reading = true
if s.file != nil {
if _, err := s.file.Seek(0, 0); err != nil {
return fmt.Errorf("file=%v: %w", s.file, err)
}
return nil
}

s.mem = bytes.NewReader(s.buf.Bytes())

return nil
}

Expand All @@ -105,117 +101,108 @@ func (s *spooledTempFile) Len() int {
}
return int(fi.Size())
}

return s.buf.Len()
}

func (s *spooledTempFile) Read(p []byte) (n int, err error) {
if err := s.prepareRead(); err != nil {
return 0, err
}

if s.file != nil {
return s.file.Read(p)
}

return s.mem.Read(p)
}

func (s *spooledTempFile) ReadAt(p []byte, off int64) (n int, err error) {
if err := s.prepareRead(); err != nil {
return 0, err
}

if s.file != nil {
return s.file.ReadAt(p, off)
}

return s.mem.ReadAt(p, off)
}

func (s *spooledTempFile) Seek(offset int64, whence int) (int64, error) {
if err := s.prepareRead(); err != nil {
return 0, err
}

if s.file != nil {
return s.file.Seek(offset, whence)
}

return s.mem.Seek(offset, whence)
}

func (s *spooledTempFile) Write(p []byte) (n int, err error) {
if s.closed {
return 0, io.EOF
}

if s.reading {
panic("write after read")
}

if s.file != nil {
n, err = s.file.Write(p)
return
return s.file.Write(p)
}

if (s.buf.Len()+len(p) > s.maxInMemorySize) || s.fullOnDisk {
s.file, err = os.CreateTemp(s.tempDir, s.filePrefix+"-")
if err != nil {
return
}

_, err = io.Copy(s.file, s.buf)
if err != nil {
s.file.Close()
s.file = nil
return
}

s.buf.Reset()
spooledPool.Put(s.buf)
s.buf = nil

if n, err = s.file.Write(p); err != nil {
s.file.Close()
s.file = nil
proposedSize := s.buf.Len() + len(p)
if s.fullOnDisk ||
proposedSize > s.maxInMemorySize {
if err := s.switchToFile(); err != nil {
return 0, err
}
return s.file.Write(p)
}
s.manager.AddBytes(len(p))
return s.buf.Write(p)
}

return
func (s *spooledTempFile) switchToFile() error {
f, err := os.CreateTemp(s.tempDir, s.filePrefix+"-")
if err != nil {
return err
}
if _, err = io.Copy(f, s.buf); err != nil {
f.Close()
return err
}
s.manager.SubBytes(s.buf.Len())
s.buf.Reset()
spooledPool.Put(s.buf)
s.buf = nil
s.file = f
return nil
}

return s.buf.Write(p)
func (s *spooledTempFile) forceToDiskIfInMemory() {
if s.file == nil && !s.closed {
_ = s.switchToFile()
}
}

func (s *spooledTempFile) Close() error {
if s.closed {
return nil
}
s.closed = true
s.mem = nil

if s.buf != nil {
s.manager.SubBytes(s.buf.Len())
s.buf.Reset()
spooledPool.Put(s.buf)
s.buf = nil
}

if s.file == nil {
return nil
}

s.file.Close()

if err := os.Remove(s.file.Name()); err != nil && !errors.Is(err, os.ErrNotExist) {
return err
if s.file != nil {
s.file.Close()
os.Remove(s.file.Name())
s.file = nil
}

s.file = nil

s.manager.UnregisterSpool(s)
return nil
}

func (s *spooledTempFile) FileName() string {
if s.file != nil {
return s.file.Name()
} else {
return ""
}
return ""
}
131 changes: 131 additions & 0 deletions spooledmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package warc

import (
"container/heap"
"sync"
"time"

"golang.org/x/sys/unix"
)

// SpoolManager enforces a global memory limit, tracks spoolers, and handles eviction.
type SpoolManager struct {
mu sync.Mutex
spoolers spoolHeap
spoolerIndex map[*spooledTempFile]*spoolItem
currentMemUsage int64
GlobalMemoryLimit int64
}

type spoolItem struct {
s *spooledTempFile
priority time.Time // used to determine which spooler is oldest (min-heap)
index int // heap interface requirement
}

type spoolHeap []*spoolItem

func (h spoolHeap) Len() int { return len(h) }

func (h spoolHeap) Less(i, j int) bool {
return h[i].priority.Before(h[j].priority)
}

func (h spoolHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}

func (h *spoolHeap) Push(x interface{}) {
item := x.(*spoolItem)
item.index = len(*h)
*h = append(*h, item)
}

func (h *spoolHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}

// DefaultSpoolManager is the global manager. Adjust limit as desired.
var DefaultSpoolManager = NewSpoolManager(getHalfOfAvailableRAM())

func NewSpoolManager(limit int64) *SpoolManager {
m := &SpoolManager{
GlobalMemoryLimit: limit,
spoolerIndex: make(map[*spooledTempFile]*spoolItem),
}
heap.Init(&m.spoolers)
return m
}

func (m *SpoolManager) RegisterSpool(s *spooledTempFile) {
m.mu.Lock()
defer m.mu.Unlock()
item := &spoolItem{
s: s,
priority: time.Now(),
}
m.spoolerIndex[s] = item
heap.Push(&m.spoolers, item)
}

func (m *SpoolManager) UnregisterSpool(s *spooledTempFile) {
m.mu.Lock()
defer m.mu.Unlock()
item, ok := m.spoolerIndex[s]
if !ok {
return
}
delete(m.spoolerIndex, s)
heap.Remove(&m.spoolers, item.index)
}

func (m *SpoolManager) CanAddBytes(n int) bool {
m.mu.Lock()
defer m.mu.Unlock()
return m.currentMemUsage+int64(n) <= m.GlobalMemoryLimit
}

func (m *SpoolManager) AddBytes(n int) {
m.mu.Lock()
m.currentMemUsage += int64(n)
m.mu.Unlock()
}

func (m *SpoolManager) SubBytes(n int) {
m.mu.Lock()
m.currentMemUsage -= int64(n)
if m.currentMemUsage < 0 {
m.currentMemUsage = 0
}
m.mu.Unlock()
}

func (m *SpoolManager) EvictIfNeeded() {
m.mu.Lock()
defer m.mu.Unlock()

for m.currentMemUsage > m.GlobalMemoryLimit && len(m.spoolers) > 0 {
item := m.spoolers[0]
if item.s.file == nil && !item.s.closed {
item.s.forceToDiskIfInMemory()
} else {
// If it's already on disk or closed, pop it to avoid looping
heap.Remove(&m.spoolers, item.index)
delete(m.spoolerIndex, item.s)
}
}
}

func getHalfOfAvailableRAM() int64 {
var info unix.Sysinfo_t
if err := unix.Sysinfo(&info); err != nil {
panic(err)
}
return int64(info.Totalram) / 2
}

0 comments on commit e042cbd

Please sign in to comment.