diff --git a/vfs/file.go b/vfs/file.go index 3a829adf6..846377d63 100644 --- a/vfs/file.go +++ b/vfs/file.go @@ -48,7 +48,8 @@ type File struct { pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written pendingRenameFun func(ctx context.Context) error // will be run/renamed after all writers close appendMode bool // file was opened with O_APPEND - sys atomic.Value // user defined info to be attached here + messagedWrite bool + sys atomic.Value // user defined info to be attached here muRW sync.Mutex // synchronize RWFileHandle.openPending(), RWFileHandle.close() and File.Remove } @@ -505,8 +506,17 @@ func (f *File) waitForValidObject() (o fs.Object, err error) { } // openRead open the file for read -func (f *File) openRead() (fh *ReadFileHandle, err error) { +func (f *File) openRead() (fh Handle, err error) { // if o is nil it isn't valid yet + if f.messagedWrite { + fh, err = newStubReadFileHandle(f) + if err != nil { + fs.Debugf(f.Path(), "File.openRead failed: %v", err) + return nil, err + } + return fh, nil + } + _, err = f.waitForValidObject() if err != nil { return nil, err @@ -532,6 +542,9 @@ func (f *File) openWrite(flags int) (fh *WriteFileHandle, err error) { } // fs.Debugf(f.Path(), "File.openWrite") + if d.vfs.Opt.MessagedWrite { + f.messagedWrite = true + } fh, err = newWriteFileHandle(d, f, f.Path(), flags) if err != nil { fs.Debugf(f.Path(), "File.openWrite failed: %v", err) diff --git a/vfs/read.go b/vfs/read.go index 1dae5a8d8..b14e1c978 100644 --- a/vfs/read.go +++ b/vfs/read.go @@ -14,6 +14,64 @@ import ( "github.com/rclone/rclone/fs/hash" ) +type StubReadFileHandle struct { + baseHandle + file *File + size int64 // size of the object (0 for unknown length) +} + +func newStubReadFileHandle(f *File) (*StubReadFileHandle, error) { + o := f.getObject() + return &StubReadFileHandle{ + file: f, + size: nonNegative(o.Size()), + }, nil +} + +// String converts it to printable +func (fh *StubReadFileHandle) String() string { + return "<*StubReadFileHandle>" +} + +// Node returns the Node associated with this - satisfies Noder interface +func (fh *StubReadFileHandle) Node() Node { + return fh.file +} + +func (fh *StubReadFileHandle) Seek(offset int64, whence int) (n int64, err error) { + return 0, nil +} + +func (fh *StubReadFileHandle) ReadAt(p []byte, off int64) (n int, err error) { + return len(p), nil +} + +func (fh *StubReadFileHandle) Read(p []byte) (n int, err error) { + return len(p), nil +} + +func (fh *StubReadFileHandle) Close() error { + return nil +} + +func (fh *StubReadFileHandle) Flush() error { + return nil +} + +func (fh *StubReadFileHandle) Release() error { + return nil +} + +// Size returns the size of the underlying file +func (fh *StubReadFileHandle) Size() int64 { + return fh.size +} + +// Stat returns info about the file +func (fh *StubReadFileHandle) Stat() (os.FileInfo, error) { + return fh.file, nil +} + // ReadFileHandle is an open for read file handle on a File type ReadFileHandle struct { baseHandle diff --git a/vfs/vfscommon/options.go b/vfs/vfscommon/options.go index 36476d417..48c7c1745 100644 --- a/vfs/vfscommon/options.go +++ b/vfs/vfscommon/options.go @@ -27,6 +27,7 @@ type Options struct { CacheMaxAge time.Duration CacheMaxSize fs.SizeSuffix CachePollInterval time.Duration + MessagedWrite bool CaseInsensitive bool WriteWait time.Duration // time to wait for in-sequence write ReadWait time.Duration // time to wait for in-sequence read @@ -55,6 +56,7 @@ var DefaultOpt = Options{ ChunkSizeLimit: -1, CacheMaxSize: -1, CaseInsensitive: runtime.GOOS == "windows" || runtime.GOOS == "darwin", // default to true on Windows and Mac, false otherwise + MessagedWrite: false, WriteWait: 1000 * time.Millisecond, ReadWait: 20 * time.Millisecond, WriteBack: 5 * time.Second, diff --git a/vfs/vfsflags/vfsflags.go b/vfs/vfsflags/vfsflags.go index d08370399..edafd62c7 100644 --- a/vfs/vfsflags/vfsflags.go +++ b/vfs/vfsflags/vfsflags.go @@ -28,6 +28,7 @@ func AddFlags(flagSet *pflag.FlagSet) { flags.DurationVarP(flagSet, &Opt.CachePollInterval, "vfs-cache-poll-interval", "", Opt.CachePollInterval, "Interval to poll the cache for stale objects.") flags.DurationVarP(flagSet, &Opt.CacheMaxAge, "vfs-cache-max-age", "", Opt.CacheMaxAge, "Max age of objects in the cache.") flags.FVarP(flagSet, &Opt.CacheMaxSize, "vfs-cache-max-size", "", "Max total size of objects in the cache.") + flags.BoolVarP(flagSet, &Opt.MessagedWrite, "vfs-messaged-write", "", Opt.MessagedWrite, "On vfs-cache-mode <= minimal, serialize write-only files' write operation into messages to support seek.") flags.FVarP(flagSet, &Opt.ChunkSize, "vfs-read-chunk-size", "", "Read the source objects in chunks.") flags.FVarP(flagSet, &Opt.ChunkSizeLimit, "vfs-read-chunk-size-limit", "", "If greater than --vfs-read-chunk-size, double the chunk size after each chunk read, until the limit is reached. 'off' is unlimited.") flags.FVarP(flagSet, DirPerms, "dir-perms", "", "Directory permissions") diff --git a/vfs/write.go b/vfs/write.go index 4f0590312..8be2e8472 100644 --- a/vfs/write.go +++ b/vfs/write.go @@ -1,7 +1,9 @@ package vfs import ( + "bytes" "context" + "encoding/binary" "io" "os" "sync" @@ -11,6 +13,152 @@ import ( "github.com/rclone/rclone/fs/operations" ) +var writerMap sync.Map + +type MessagedWriter struct { + mu sync.Mutex + openCount int + Id string + innerWriter io.WriteCloser + curWriteOffset int64 + wroteBytes int64 + messaged bool + hasWrote bool +} + +const ( + MSG_WRITE int32 = 1 + MSG_TRUNC int32 = 2 +) + +func GetMessagedWriter(fh *WriteFileHandle, writerFunc func() io.WriteCloser) *MessagedWriter { + retWriter := &MessagedWriter{ + Id: fh.file.Path(), + innerWriter: nil, + wroteBytes: 0, + messaged: fh.file.messagedWrite, + hasWrote: false, + } + tempWriter := retWriter + tempWriter.mu.Lock() + _retWriter, loaded := writerMap.LoadOrStore(fh.file.Path(), retWriter) + retWriter = _retWriter.(*MessagedWriter) + if !loaded { + retWriter.innerWriter = writerFunc() + } + tempWriter.mu.Unlock() + + retWriter.Open() + return retWriter +} + +func (w *MessagedWriter) Open() { + w.mu.Lock() + defer w.mu.Unlock() + w.openCount++ + fs.Debugf(w.Id, "file openCount++ to %d", w.openCount) +} +func (w *MessagedWriter) blockFinishTrailer() (err error) { + // header: [8B-magic"RCLONEMP"][4B-REQTYPE(MSG_WRITE)][8B-off][8B-size] + hdr := make([]byte, 8+4+8+8) + buf := bytes.NewBuffer(hdr) + buf.Reset() + buf.WriteString("RCLONEMP") + binary.Write(buf, binary.BigEndian, MSG_WRITE) + binary.Write(buf, binary.BigEndian, w.curWriteOffset) + binary.Write(buf, binary.BigEndian, w.wroteBytes) + _, err = w.innerWriter.Write(buf.Bytes()) + return +} +func (w *MessagedWriter) truncateTrailer(off int64) (err error) { + // header: [8B-magic"RCLONEMP"][4B-REQTYPE(MSG_TRUNC)][8B-off] + hdr := make([]byte, 8+4+8) + buf := bytes.NewBuffer(hdr) + buf.Reset() + buf.WriteString("RCLONEMP") + binary.Write(buf, binary.BigEndian, MSG_TRUNC) + binary.Write(buf, binary.BigEndian, off) + _, err = w.innerWriter.Write(buf.Bytes()) + return +} + +func (w *MessagedWriter) Truncate(off int64) (err error) { + w.mu.Lock() + defer w.mu.Unlock() + return w.truncateTrailer(off) +} + +func (w *MessagedWriter) WriteAt(data []byte, off int64) (n int, err error) { + w.mu.Lock() + defer w.mu.Unlock() + + if !w.hasWrote { + err = w.blockFinishTrailer() + } else if w.curWriteOffset != off { + err = w.blockFinishTrailer() + w.curWriteOffset = off + w.wroteBytes = 0 + } + if err != nil { + return + } + w.hasWrote = true + n, err = w.innerWriter.Write(data) + w.wroteBytes += int64(n) + w.curWriteOffset += int64(n) + return +} + +// Close closes the writer; subsequent reads from the +// read half of the pipe will return no bytes and EOF. +func (w *MessagedWriter) Close() (err error) { + w.mu.Lock() + defer w.mu.Unlock() + err = nil + + w.openCount-- + fs.Debugf(w.Id, "file openCount-- to %d", w.openCount) + if w.openCount > 0 { + return + } + + waitTime := 180 * time.Second + if w.hasWrote { + waitTime = 0 * time.Second + } + + destroyFunc := func() { + fs.Infof(w.Id, "Going to late close writer due to openCount gets to %d", w.openCount) + var err error + if w.hasWrote { + err = w.blockFinishTrailer() + if err != nil { + fs.Errorf(w.Id, "Failed to write finish trailer, err: %v", err) + } + } + err = w.innerWriter.Close() + if err != nil { + fs.Errorf(w.Id, "Failed to close writer, err: %v", err) + } + writerMap.Delete(w.Id) + } + // handle qbittorrent downloader's pattern + // qbt: 1. open with flags=O_RDWR|O_CREATE|0x40000 2. close 3. open again with O_RDWR + if waitTime == 0 { + destroyFunc() + } else { + time.AfterFunc(waitTime, func() { + if w.openCount <= 0 { + w.mu.Lock() + defer w.mu.Unlock() + destroyFunc() + } + }) + } + + return +} + // WriteFileHandle is an open for write handle on a File type WriteFileHandle struct { baseHandle @@ -18,7 +166,7 @@ type WriteFileHandle struct { cond *sync.Cond // cond lock for out of sequence writes closed bool // set if handle has been closed remote string - pipeWriter *io.PipeWriter + pipeWriter *MessagedWriter o fs.Object result chan error file *File @@ -34,6 +182,7 @@ var ( _ io.Writer = (*WriteFileHandle)(nil) _ io.WriterAt = (*WriteFileHandle)(nil) _ io.Closer = (*WriteFileHandle)(nil) + _ io.Seeker = (*WriteFileHandle)(nil) ) func newWriteFileHandle(d *Dir, f *File, remote string, flags int) (*WriteFileHandle, error) { @@ -64,19 +213,22 @@ func (fh *WriteFileHandle) openPending() (err error) { fs.Errorf(fh.remote, "WriteFileHandle: Can't open for write without O_TRUNC on existing file without --vfs-cache-mode >= writes") return EPERM } - var pipeReader *io.PipeReader - pipeReader, fh.pipeWriter = io.Pipe() - go func() { - // NB Rcat deals with Stats.Transferring, etc. - o, err := operations.Rcat(context.TODO(), fh.file.Fs(), fh.remote, pipeReader, time.Now()) - if err != nil { - fs.Errorf(fh.remote, "WriteFileHandle.New Rcat failed: %v", err) - } - // Close the pipeReader so the pipeWriter fails with ErrClosedPipe - _ = pipeReader.Close() - fh.o = o - fh.result <- err - }() + + fh.pipeWriter = GetMessagedWriter(fh, func() io.WriteCloser { + pipeReader, pipeWriter := io.Pipe() + go func() { + // NB Rcat deals with Stats.Transferring, etc. + o, err := operations.Rcat(context.TODO(), fh.file.Fs(), fh.remote, pipeReader, time.Now()) + if err != nil { + fs.Errorf(fh.remote, "WriteFileHandle.New Rcat failed: %v", err) + } + // Close the pipeReader so the pipeWriter fails with ErrClosedPipe + _ = pipeReader.Close() + fh.o = o + fh.result <- err + }() + return pipeWriter + }) fh.file.setSize(0) fh.truncated = true fh.file.Dir().addObject(fh.file) // make sure the directory has this object in it now @@ -132,17 +284,25 @@ func (fh *WriteFileHandle) writeAt(p []byte, off int64) (n int, err error) { if fh.offset != off { waitSequential("write", fh.remote, fh.cond, fh.file.VFS().Opt.WriteWait, &fh.offset, off) } - if fh.offset != off { + if fh.offset != off && !fh.file.messagedWrite { fs.Errorf(fh.remote, "WriteFileHandle.Write: can't seek in file without --vfs-cache-mode >= writes") return 0, ESPIPE } if err = fh.openPending(); err != nil { return 0, err } + + oriSize := fh.file.Size() + newSize := oriSize + n, err = fh.pipeWriter.WriteAt(p, off) + newOff := off + int64(n) + //fh.offset = newOff + if newOff > oriSize { + newSize = newOff + } + fh.writeCalled = true - n, err = fh.pipeWriter.Write(p) - fh.offset += int64(n) - fh.file.setSize(fh.offset) + fh.file.setSize(newSize) if err != nil { fs.Errorf(fh.remote, "WriteFileHandle.Write error: %v", err) return 0, err @@ -163,7 +323,9 @@ func (fh *WriteFileHandle) Write(p []byte) (n int, err error) { fh.mu.Lock() defer fh.mu.Unlock() // Since we can't seek, just call WriteAt with the current offset - return fh.writeAt(p, fh.offset) + n, err = fh.writeAt(p, fh.offset) + fh.offset += int64(n) + return } // WriteString a string to the file @@ -269,7 +431,7 @@ func (fh *WriteFileHandle) Release() error { if err != nil { fs.Errorf(fh.remote, "WriteFileHandle.Release error: %v", err) } else { - // fs.Debugf(fh.remote, "WriteFileHandle.Release OK") + //fs.Debugf(fh.remote, "WriteFileHandle.Release OK") } return err } @@ -281,11 +443,43 @@ func (fh *WriteFileHandle) Stat() (os.FileInfo, error) { return fh.file, nil } +// Seek to new file position +func (fh *WriteFileHandle) Seek(offset int64, whence int) (ret int64, err error) { + fh.mu.Lock() + defer fh.mu.Unlock() + if fh.file.messagedWrite { + if fh.closed { + return 0, ECLOSED + } + if !fh.opened && offset == 0 && whence != 2 { + return 0, nil + } + if err = fh.openPending(); err != nil { + return ret, err + } + switch whence { + case io.SeekStart: + fh.offset = 0 + case io.SeekEnd: + fh.offset = fh.file.Size() + } + fh.offset += offset + // we don't check the offset - the next Read will + return fh.offset, nil + } else { + return 0, ENOSYS + } +} + // Truncate file to given size func (fh *WriteFileHandle) Truncate(size int64) (err error) { // defer log.Trace(fh.remote, "size=%d", size)("err=%v", &err) fh.mu.Lock() defer fh.mu.Unlock() + if fh.file.messagedWrite { + err = fh.pipeWriter.Truncate(size) + return + } if size != fh.offset { fs.Errorf(fh.remote, "WriteFileHandle: Truncate: Can't change size without --vfs-cache-mode >= writes") return EPERM