Skip to content

Commit

Permalink
use 1MB tcp socket buffer for control connections and server-defined …
Browse files Browse the repository at this point in the history
…tcp socket buffer size for redirect-to-resource data connections
  • Loading branch information
iychoi committed Oct 21, 2024
1 parent fef56ea commit d7bc2e3
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 54 deletions.
31 changes: 14 additions & 17 deletions irods/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

const (
TCPBufferSizeDefault int = 4 * 1024 * 1024
TCPBufferSizeDefault int = 1 * 1024 * 1024
)

// IRODSConnection connects to iRODS
Expand Down Expand Up @@ -182,27 +182,24 @@ func (conn *IRODSConnection) setSocketOpt(socket net.Conn, bufferSize int) {

if tcpSocket, ok := socket.(*net.TCPConn); ok {
// TCP socket

// nodelay is default
//tcpSocket.SetNoDelay(true)
tcpSocket.SetNoDelay(true)

tcpSocket.SetKeepAlive(true)
tcpSocket.SetLinger(5)

// TCP buffer size
if bufferSize <= 0 {
bufferSize = TCPBufferSizeDefault
}

sockErr := tcpSocket.SetReadBuffer(bufferSize)
if sockErr != nil {
sockBuffErr := xerrors.Errorf("failed to set tcp read buffer size %d: %w", bufferSize, sockErr)
logger.Errorf("%+v", sockBuffErr)
}
if bufferSize > 0 {
sockErr := tcpSocket.SetReadBuffer(bufferSize)
if sockErr != nil {
sockBuffErr := xerrors.Errorf("failed to set tcp read buffer size %d: %w", bufferSize, sockErr)
logger.Errorf("%+v", sockBuffErr)
}

sockErr = tcpSocket.SetWriteBuffer(bufferSize)
if sockErr != nil {
sockBuffErr := xerrors.Errorf("failed to set tcp write buffer size %d: %w", bufferSize, sockErr)
logger.Errorf("%+v", sockBuffErr)
sockErr = tcpSocket.SetWriteBuffer(bufferSize)
if sockErr != nil {
sockBuffErr := xerrors.Errorf("failed to set tcp write buffer size %d: %w", bufferSize, sockErr)
logger.Errorf("%+v", sockBuffErr)
}
}
}
}
Expand Down
31 changes: 14 additions & 17 deletions irods/connection/resource_server_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,27 +114,24 @@ func (conn *IRODSResourceServerConnection) setSocketOpt(socket net.Conn, bufferS

if tcpSocket, ok := socket.(*net.TCPConn); ok {
// TCP socket

// nodelay is default
//tcpSocket.SetNoDelay(true)
tcpSocket.SetNoDelay(true)

tcpSocket.SetKeepAlive(true)
tcpSocket.SetLinger(5)

// TCP buffer size
if bufferSize <= 0 {
bufferSize = TCPBufferSizeDefault
}

sockErr := tcpSocket.SetReadBuffer(bufferSize)
if sockErr != nil {
sockBuffErr := xerrors.Errorf("failed to set tcp read buffer size %d: %w", bufferSize, sockErr)
logger.Errorf("%+v", sockBuffErr)
}

sockErr = tcpSocket.SetWriteBuffer(bufferSize)
if sockErr != nil {
sockBuffErr := xerrors.Errorf("failed to set tcp write buffer size %d: %w", bufferSize, sockErr)
logger.Errorf("%+v", sockBuffErr)
if bufferSize > 0 {
sockErr := tcpSocket.SetReadBuffer(bufferSize)
if sockErr != nil {
sockBuffErr := xerrors.Errorf("failed to set tcp read buffer size %d: %w", bufferSize, sockErr)
logger.Errorf("%+v", sockBuffErr)
}

sockErr = tcpSocket.SetWriteBuffer(bufferSize)
if sockErr != nil {
sockBuffErr := xerrors.Errorf("failed to set tcp write buffer size %d: %w", bufferSize, sockErr)
logger.Errorf("%+v", sockBuffErr)
}
}
}
}
Expand Down
40 changes: 20 additions & 20 deletions irods/fs/data_object_resource_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,13 @@ func downloadDataObjectChunkFromResourceServer(sess *session.IRODSSession, taskI

if transferHeader.OperationType == int(common.OPER_TYPE_DONE) {
// break
logger.Debugf("done downloading file chunk for %s at offset %d, length %d", handle.Path, transferHeader.Offset, transferHeader.Length)
logger.Debugf("done downloading file chunk for %s, task %d, offset %d, length %d", handle.Path, taskID, transferHeader.Offset, transferHeader.Length)
break
} else if transferHeader.OperationType != int(common.OPER_TYPE_GET_DATA_OBJ) {
return xerrors.Errorf("invalid operation type %d received for transfer", transferHeader.OperationType)
}

logger.Debugf("downloading file chunk for %s at offset %d, length %d", handle.Path, transferHeader.Offset, transferHeader.Length)
logger.Debugf("downloading file chunk for %s, task %d, offset %d, length %d", handle.Path, taskID, transferHeader.Offset, transferHeader.Length)

toGet := transferHeader.Length
curOffset := transferHeader.Offset
Expand Down Expand Up @@ -318,7 +318,7 @@ func downloadDataObjectChunkFromResourceServer(sess *session.IRODSSession, taskI

_, writeErr := f.WriteAt(dataBuffer[:decryptedDataLen], curOffset)
if writeErr != nil {
return xerrors.Errorf("failed to write data to %q, offset %d: %w", localPath, curOffset, writeErr)
return xerrors.Errorf("failed to write data to %q, task %d, offset %d: %w", localPath, taskID, curOffset, writeErr)
}

toGet -= int64(decryptedDataLen)
Expand All @@ -327,23 +327,23 @@ func downloadDataObjectChunkFromResourceServer(sess *session.IRODSSession, taskI

if err != nil {
if err == io.EOF {
logger.Debugf("received EOF for downloading file chunk for %s at offset %d, length %d", handle.Path, transferHeader.Offset, transferHeader.Length)
logger.Debugf("received EOF for downloading file chunk for %s, task %d, offset %d, length %d", handle.Path, taskID, transferHeader.Offset, transferHeader.Length)
cont = false
break
}

return xerrors.Errorf("failed to read data %q, offset %d: %w", handle.Path, curOffset, err)
return xerrors.Errorf("failed to read data %q, task %d, offset %d: %w", handle.Path, taskID, curOffset, err)
}
} else {
// normal
// read data
newOffset, err := f.Seek(curOffset, io.SeekStart)
if err != nil {
return xerrors.Errorf("failed to seek to offset %d for file %q: %w", curOffset, localPath, err)
return xerrors.Errorf("failed to seek to offset %d for file %q, task %d: %w", curOffset, localPath, taskID, err)
}

if newOffset != curOffset {
return xerrors.Errorf("failed to seek to offset %d for file %q, new offset %d: %w", curOffset, localPath, newOffset, err)
return xerrors.Errorf("failed to seek to offset %d for file %q, task %d, new offset %d: %w", curOffset, localPath, taskID, newOffset, err)
}

readLen, err := conn.RecvToWriter(f, toGet)
Expand All @@ -359,12 +359,12 @@ func downloadDataObjectChunkFromResourceServer(sess *session.IRODSSession, taskI

if err != nil {
if err == io.EOF {
logger.Debugf("received EOF for downloading file chunk for %s at offset %d, length %d", handle.Path, transferHeader.Offset, transferHeader.Length)
logger.Debugf("received EOF for downloading file chunk for %s, task %d, offset %d, length %d", handle.Path, taskID, transferHeader.Offset, transferHeader.Length)
cont = false
break
}

return xerrors.Errorf("failed to read data %q, offset %d: %w", handle.Path, curOffset, err)
return xerrors.Errorf("failed to read data %q, task %d, offset %d: %w", handle.Path, taskID, curOffset, err)
}
}
}
Expand Down Expand Up @@ -445,13 +445,13 @@ func uploadDataObjectChunkToResourceServer(sess *session.IRODSSession, taskID in

if transferHeader.OperationType == int(common.OPER_TYPE_DONE) {
// break
logger.Debugf("done uploading file chunk for %s at offset %d, length %d", handle.Path, transferHeader.Offset, transferHeader.Length)
logger.Debugf("done uploading file chunk for %s, task %d, offset %d, length %d", handle.Path, taskID, transferHeader.Offset, transferHeader.Length)
break
} else if transferHeader.OperationType != int(common.OPER_TYPE_PUT_DATA_OBJ) {
return xerrors.Errorf("invalid operation type %d received for transfer", transferHeader.OperationType)
}

logger.Debugf("uploading file chunk for %s at offset %d, length %d", handle.Path, transferHeader.Offset, transferHeader.Length)
logger.Debugf("uploading file chunk for %s, task %d, offset %d, length %d", handle.Path, taskID, transferHeader.Offset, transferHeader.Length)

toPut := transferHeader.Length
curOffset := transferHeader.Offset
Expand Down Expand Up @@ -498,12 +498,12 @@ func uploadDataObjectChunkToResourceServer(sess *session.IRODSSession, taskID in

if err != nil {
if err == io.EOF {
logger.Debugf("received EOF for uploading file chunk for %s at offset %d, length %d", handle.Path, transferHeader.Offset, transferHeader.Length)
logger.Debugf("received EOF for uploading file chunk for %s, task %d, offset %d, length %d", handle.Path, taskID, transferHeader.Offset, transferHeader.Length)
cont = false
break
}

return xerrors.Errorf("failed to read data %q, offset %d: %w", localPath, curOffset, err)
return xerrors.Errorf("failed to read data %q, task %d, offset %d: %w", localPath, taskID, curOffset, err)
}

encryptionHeaderBuffer, err := encryptionHeader.GetBytes()
Expand All @@ -521,7 +521,7 @@ func uploadDataObjectChunkToResourceServer(sess *session.IRODSSession, taskID in
encryptedDataLen := encryptionHeader.Length - encKeysize
writeErr := conn.Send(encryptedDataBuffer, encryptedDataLen)
if writeErr != nil {
return xerrors.Errorf("failed to write data to %q, offset %d: %w", handle.Path, curOffset, writeErr)
return xerrors.Errorf("failed to write data to %q, task %d, offset %d: %w", handle.Path, taskID, curOffset, writeErr)
}

//logger.Debugf("sent encrypted data")
Expand All @@ -538,11 +538,11 @@ func uploadDataObjectChunkToResourceServer(sess *session.IRODSSession, taskID in
// write data
newOffset, err := f.Seek(curOffset, io.SeekStart)
if err != nil {
return xerrors.Errorf("failed to seek to offset %d for file %q: %w", curOffset, localPath, err)
return xerrors.Errorf("failed to seek to offset %d for file %q, task %d: %w", curOffset, localPath, taskID, err)
}

if newOffset != curOffset {
return xerrors.Errorf("failed to seek to offset %d for file %q, new offset %d: %w", curOffset, localPath, newOffset, err)
return xerrors.Errorf("failed to seek to offset %d for file %q, task %d, new offset %d: %w", curOffset, localPath, taskID, newOffset, err)
}

err = conn.SendFromReader(f, toPut)
Expand All @@ -556,12 +556,12 @@ func uploadDataObjectChunkToResourceServer(sess *session.IRODSSession, taskID in

if err != nil {
if err == io.EOF {
logger.Debugf("received EOF for uploading file chunk for %s at offset %d, length %d", handle.Path, transferHeader.Offset, transferHeader.Length)
logger.Debugf("received EOF for uploading file chunk for %s, task %d, offset %d, length %d", handle.Path, taskID, transferHeader.Offset, transferHeader.Length)
cont = false
break
}

return xerrors.Errorf("failed to read data %q, offset %d: %w", localPath, transferHeader.Offset, err)
return xerrors.Errorf("failed to read data %q, task %d, offset %d: %w", localPath, taskID, transferHeader.Offset, err)
}
}
}
Expand Down Expand Up @@ -623,7 +623,7 @@ func DownloadDataObjectFromResourceServer(session *session.IRODSSession, irodsPa
}
return "", nil
} else if handle.RedirectionInfo != nil {
logger.Debugf("Redirect to resource: path %q, threads %d, addr %q, port %d, cookie %d", handle.Path, handle.Threads, handle.RedirectionInfo.Host, handle.RedirectionInfo.Port, handle.RedirectionInfo.Cookie)
logger.Debugf("Redirect to resource: path %q, threads %d, addr %q, port %d, window size %d, cookie %d", handle.Path, handle.Threads, handle.RedirectionInfo.Host, handle.RedirectionInfo.Port, handle.RedirectionInfo.WindowSize, handle.RedirectionInfo.Cookie)
// get from portal

// create an empty file
Expand Down Expand Up @@ -745,7 +745,7 @@ func UploadDataObjectToResourceServer(session *session.IRODSSession, localPath s
}
return nil
} else if handle.RedirectionInfo != nil {
logger.Debugf("Redirect to resource: path %q, threads %d, addr %q, port %d, cookie %d", handle.Path, handle.Threads, handle.RedirectionInfo.Host, handle.RedirectionInfo.Port, handle.RedirectionInfo.Cookie)
logger.Debugf("Redirect to resource: path %q, threads %d, addr %q, port %d, window size %d, cookie %d", handle.Path, handle.Threads, handle.RedirectionInfo.Host, handle.RedirectionInfo.Port, handle.RedirectionInfo.WindowSize, handle.RedirectionInfo.Cookie)
// put to portal
errChan := make(chan error, handle.Threads)
taskWaitGroup := sync.WaitGroup{}
Expand Down

0 comments on commit d7bc2e3

Please sign in to comment.