Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #3076 parEvalMap resource scoping #3515

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

yurique
Copy link
Contributor

@yurique yurique commented Dec 31, 2024

This is a continuation of #3512

Original description

Updates parEvalMap*, broadcastThrough and prefetch to extend the resource scope past the channel/topic used to implement concurrency for these operators.

In all cases the solution is the same:

  • use underlying.uncons.flatMap to get into a Pull context with access to the source stream
  • setup the new foreground stream with Pull.extendScopeTo to pull the scope across to the new stream

I have abstracted this solution to a new extendScopeThrough method, which works as through except propagating the scope to the new stream. It appears we should be doing this for any stream combinator where the resulting stream is not directly derived from the current stream (e.g. any combinator that uses an internal buffer, channel, topic, or such).

I also had to fix the cancallation safety of extendScopeTo (see #3474). The StreamSuite "resource safety test 4" started failing after my changes, presumably because the scope started propagating far enough to actually get hit by cancellation.

There is at least conflateChunks broken the same way but I'd like to validate this solution is correct before I continue trying to chase down all the places the fix should be applied.


In this PR, the implementation of the extendScopeThrough is changed (because @reardonj found that the original one was not handling more complicated cases correctly):

  • instead of extendScopeTo, a new method is defined and used: Pull.bindAcquireToScope
  • bindAcquireToScope "walks" the tree of pulls and updates every Pull.Acquire to use scope.acquireResource, binding all acquisitions to the provided scope.

I can say I don't really know what I'm doing here :) - it was a desperate attempt to fix the issue. But it seems to have worked.

Given that - this PR really needs a review by someone who knows how all this works (scopes, pulls, etc).

  • does this approach even make sense (walking the Pull tree)?
  • am I missing something?
  • is there a better way?

There is one kind of Pull that I do not know how to process: Pull.Translate. At the very least, we don't have a MonadThrow for G, which is needed to call bindAcquireToScope recursively.

Pull.StepLeg is another one that I don't know how to handle correctly.


TODO

  • clean up debug printlns
  • make it compile with Scala 3
  • figure out Pull.Translate
  • figure out Pull.StepLeg
  • figure out Outcome.Succeeded(Left(id)) returned from scope.acquireResource

reardonj and others added 7 commits December 26, 2024 00:25
Updates parEvalMap* and broadcastThrough to extend the resource scope past the channel/topic used to implement concurrency for these operators.
Make extendScopeTo cancellation safe (see typelevel#3474)
@mpilquist
Copy link
Member

Awesome, thanks for digging in to a gnarly part of the library. Some questions below. Not sure if this is the right approach or not but definitely worth exploring.

In this PR, the implementation of the extendScopeThrough is changed (because @reardonj found that the original one was not handling more complicated cases correctly):

Any idea why leasing the main scope is insufficient? Why do we need to walk the tree, manipulating acquire nodes?

bindAcquireToScope "walks" the tree of pulls and updates every Pull.Acquire to use scope.acquireResource, binding all acquisitions to the provided scope.

does this approach even make sense (walking the Pull tree)?

Does this solution have a different problem, where the manipulated acquires happen in the source pull's scope tree instead of the scope tree of the downstream/inner pull?

Also, does this PR have the result of flattening all scopes of the downstream pull in to the scope of the parent pull at the time of bindAcquireToScope?

There is one kind of Pull that I do not know how to process: Pull.Translate. At the very least, we don't have a MonadThrow for G, which is needed to call bindAcquireToScope recursively.

To fix this, I think bindAcquireToScope would need to move to a new element of the pull algebra and be handled directly in the interpreter. By doing so, we'd have access to the current translation.

@yurique
Copy link
Contributor Author

yurique commented Jan 12, 2025

Any idea why leasing the main scope is insufficient? Why do we need to walk the tree, manipulating acquire nodes?

Unfortunately, I don't have a clear idea (I don't understand the scopes well enough).

But the way it manifested was as follows.

With @reardonj's initial approach, this would work fine:

fs2.Stream.bracket { 
    IO.println("making resource").as("TempFile") 
  } { res =>
    IO.println(s"!! releasing resource: $res") 
  }
  .parEvalMap(2) { _ => 
    IO.println("using the resource")
  }
  .compile
  .drain 
  .unsafeRunSync()

but it would start failing in scenarios like this (we have this in the added test):

Stream(1, 2, 3)
  .flatMap { i => 
    fs2.Stream.bracket { 
        IO.println(s"making resource $i").as(s"TempFile $i") 
      } { res =>
        IO.println(s"!! releasing resource: $res") 
      }
  }
  .parEvalMap(2) { _ => 
    IO.println("using the resource")
  }
  .compile
  .drain 
  .unsafeRunSync()

Does this solution have a different problem, where the manipulated acquires happen in the source pull's scope tree instead of the scope tree of the downstream/inner pull?

Yes, that's what is happening here. Otherwise the source doesn't know to keep scopes open until the downstream pulls are done.

Pull.Aquires are replaced with this:

Pull
    .eval(
      scope.acquireResource(
        poll =>
          if (p.cancelable) poll(p.resource)
          else p.resource,
        p.release
      )
    )

and scope here is "connected" to the source stream.

And this is the part that is the most "scary" to me, and feels like it's too simplistic (I'm struggling to reason about it) - what would happen to intermediate brackets? Are there scenarios where intermediate brackets could be safely closed, but with this approach they will have to stay open until the whole thing finishes? What happens if the stream is "infinite"?..

@mpilquist
Copy link
Member

Yeah, I'd expect the current state of this PR to have a problem with streams like this:

source.prefetch.flatMap(x => Stream.resource(r1).flatMap(...) ++ s)

In this example, we need the resource acquired by r1 to be released prior to s being evaluated, but I suspect the lifetime of r1 will be promoted by this PR.

@yurique
Copy link
Contributor Author

yurique commented Jan 12, 2025

In this example, we need the resource acquired by r1 to be released prior to s being evaluated, but I suspect the lifetime of r1 will be promoted by this PR.

Without .parEvalMap(2) or some othe combinator that is (will be) using the new extendScopeThrough - this example should behave as before.

So it depends on where .parEvalMap(2) is.

Here I'd expect r1 to be released before s:

source.prefetch.flatMap(x => Stream.resource(r1).flatMap(...).parEvalMap(2)(...) ++ s)

But here:

source.prefetch.flatMap(x => (Stream.resource(r1).flatMap(...) ++ s).parEvalMap(2)(...))

r1 will survive through s lifetime (as well as the effects run by .parEvalMap) (ideally it doesn't necessarily need to.. but it gets complicated).


Stream(1).covary[IO].prefetch.flatMap { x => 
  (
    fs2.Stream.bracket { 
      IO.println("making resource").as("TempFile") 
    } { res =>
      IO.println(s"!! releasing resource: $res") 
    }
    ++ Stream("Not-a-resource")
  )
  .parEvalMap(2) { r => 
    IO.println(s"eval: $r")
  }
  
}
.compile
.drain 
.unsafeRunSync()
making resource
eval: TempFile
eval: Not-a-resource
!! releasing resource: TempFile

Stream(1).covary[IO].prefetch.flatMap { x => 
  fs2.Stream.bracket { 
    IO.println("making resource").as("TempFile") 
  } { res =>
    IO.println(s"!! releasing resource: $res") 
  }
  .parEvalMap(2) { r => 
    IO.println(s"eval: $r")
  } ++
  Stream.eval(IO.println("after ++"))
}
.compile
.drain 
.unsafeRunSync()
making resource
eval: TempFile
!! releasing resource: TempFile
after ++

@reardonj
Copy link

Awesome, thanks for digging in to a gnarly part of the library. Some questions below. Not sure if this is the right approach or not but definitely worth exploring.

In this PR, the implementation of the extendScopeThrough is changed (because @reardonj found that the original one was not handling more complicated cases correctly):

Any idea why leasing the main scope is insufficient? Why do we need to walk the tree, manipulating acquire nodes?

My solution worked fine for singleton streams, but failed when the stream produced multiple resources (eg. TCP server). My understanding was this happened because we captured the scope on that first pull, but new scopes are opened for each subsequent resource and those aren't being propagated.

I wonder if the best 'fix' here is documenting more clearly when scopes are propagated vs not. The whole setup feels very magical. As in: .map(…).parJoin(…) does what we usually want, but .parEvalMap's documentation is silent on resource handling. Maybe the correct semantics of it would be to behave like .map(…).parJoin(…)? They feel very similar at least. (not that we attempted a fix like this in our PRs so far)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants