Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KTOR-7327 Support conversion between byte channel interfaces and kotlinx-io primitives #4594

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

e5l
Copy link
Member

@e5l e5l commented Jan 10, 2025

Fix KTOR-7327 Support conversion between byte channel interfaces and kotlinx-io primitives

@e5l e5l requested a review from osipxd January 10, 2025 12:38
@e5l e5l self-assigned this Jan 10, 2025
override val writeBuffer: Sink
get() {
if (isClosedForWrite) throw closedCause ?: IOException("Channel is closed for write")
return origin as? Sink ?: origin.buffered()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done eagerly as a single instance returned for every call. If you do not store the results of buffer(), you're liable to lose bytes if you forget to call flush on every instance created.

Consider code that does something as simple as

writeBuffer.writeString("hi")
writeBuffer.apply {
  writeString("hey")
  flush()
}

If the origin is truly a RawSink, it will never see "hi" as those buffered bytes are dropped on the floor.

In general I would recommend always buffering rather than conditionally doing anything. The cost of buffering something that is already buffered is near zero.

Additionally, test using object : RawSink by buffer to ensure nothing can cheat by doing things like conditionally casting to Sink or Buffer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, it was caching the result before introducing exception

* channel.writeString("hello, world!")
* channel.flushAndClose()

* (rawSink as Buffer).readString() // "Hello, world"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love that we're showing people this is okay.

How about doing a local function like

fun stuff(raw: RawSink) {
  val channel = raw.asByteWriteChannel()
  ...
}

val buffer = Buffer()
stuff(buffer)
buffer.readString() // ...

Nothing inherently unsafe here

@OptIn(InternalAPI::class)
override suspend fun flushAndClose() {
if (!closed.compareAndSet(expect = null, update = CLOSED)) return
writeBuffer.flush()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we close writeBuffer it will release all buffers and we can miss the content

import kotlinx.io.RawSink

/**
* Converts the current `ByteWriteChannel` instance into a `RawSink`.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would warn this implementation uses runBlocking since nesting calls to that function is problematic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, will note that in the KDoc. The alternative is a busy loop which is even worse

*
* @return a `RawSource` implementation that wraps the `ByteReadChannel`.
*/
public fun ByteReadChannel.asSource(): RawSource = ByteReadChannelSource(this)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would warn this implementation uses runBlocking since nesting calls to that function is problematic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, and same here

public fun RawSink.asByteWriteChannel(): ByteWriteChannel = SinkByteWriteChannel(this).apply {
}

private suspend fun x() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete? Seems to have been for writing docs above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the notice, missed to drop my playground

Copy link
Member Author

@e5l e5l left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @JakeWharton, thanks for the review! I'll check on the comments

override val writeBuffer: Sink
get() {
if (isClosedForWrite) throw closedCause ?: IOException("Channel is closed for write")
return origin as? Sink ?: origin.buffered()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, it was caching the result before introducing exception

@OptIn(InternalAPI::class)
override suspend fun flushAndClose() {
if (!closed.compareAndSet(expect = null, update = CLOSED)) return
writeBuffer.flush()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we close writeBuffer it will release all buffers and we can miss the content

import kotlinx.io.RawSink

/**
* Converts the current `ByteWriteChannel` instance into a `RawSink`.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, will note that in the KDoc. The alternative is a busy loop which is even worse

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants