Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChannelPausable not working with @RunOnVirtualThread #45795

Open
embds opened this issue Jan 22, 2025 · 3 comments
Open

ChannelPausable not working with @RunOnVirtualThread #45795

embds opened this issue Jan 22, 2025 · 3 comments
Labels
area/virtual-threads Issue related to Java's Virtual Threads env/windows Impacts Windows machines kind/bug Something isn't working

Comments

@embds
Copy link

embds commented Jan 22, 2025

Describe the bug

Hello,
I'm using Smallrye Messaging RabbitMQ to consume the RabbitMQ messages to do an external Http request which have a Rate Limit (error 429) or sometimes can be temporary unavailable(error 503)

To be able to process more messages in parallel, I tried to set up the following parameter :

mp.messaging.incoming.myincoming.max-concurrency=512

However, I have seen that this parameter creates as many connections as there are consumers for the queue.

I therefore tried to use the @RunOnVirtualThread annotation which works very well with a single connection, which can process 1024 messages in parallel.
However, I encounter a problem when I try to use the 'io.smallrye.reactive.messaging.PausableChannel' class to implement a pause mechanism. In order to be able to stop consuming messages when the external Http service returns us a rate limit error (429) or unavailability error (503)
Because when I use the @RunOnVirtualThread annotation, the messages are always consumed which does not happen when I do not use this annotation

However, if we initiate the service in pause mode - with the following parameter - the pause is taken into account:

mp.messaging.incoming.request-in-todo.initially-paused=true

Another case observed is when the RabbitMQ connection is automatically reset and the paused() method was previously called, the threads no longer consume the messages

I therefore see that the problem only occurs when I call the pause() method while the threads are consuming the messages.

Expected behavior

The messages consumption should stop when the ChannelPausable.pause() is called

Actual behavior

When using the @RunOnVirtualThread annotation and call ChannelPausable.pause(), the messages are always consumed

How to Reproduce?

To reproduce :

  1. Start consuming messages
  2. Call the pause() method while consuming messages

See below a code sample to reproduce:


import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.PausableChannel;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.jboss.logging.Logger;

import java.time.Duration;

@ApplicationScoped
public class PausableControllerService {

    private final ChannelRegistry channelRegistry;

    @Inject
    public PausableControllerService(final Logger log, final ChannelRegistry registry) {
        this.channelRegistry = registry;
    }

    private PausableChannel getPausableChannel(){
        return this.channelRegistry.getPausable("myincoming");
    }

    public void pause(){
        getPausableChannel().pause();
    }

    public void resume(){
        getPausableChannel().resume();
    }

    public Uni<Void> pauseUntil(final Duration time){
        return Uni.createFrom().voidItem().invoke(this::pause)
                  .onItem().delayIt().by(time)
                  .invoke(this::resume);
    }
}
@ApplicationScoped
public class ServiceIncomingConsumer {

    private final PausableControllerService pausableControllerService;

    ServiceIncomingConsumer (final PausableControllerService pausableControllerService) {
        this.pausableControllerService = pausableControllerService;
    }

    @Incoming("myincoming")
    @RunOnVirtualThread
    public CompletionStage<Void> consume(final Message<JsonObject> message) {
        // Do call Http of external service that returns a rate limit error (429) or unavailable service error (503)
        return doSomething();
    }

    private CompletionStage<Void> doSomenthing() {
        HttpResult httpResult = callHttp();
        if (httpResult.getStatus().equals(429) || httpResult.getStatus().equals(503) ){
            this.pausableControllerService.pauseUntil(nonNull(httpResult.getDuration() ? httpResult.getDuration() : Duration.ofHours(1));
        }
        return computeHttpResult(httpResult);
    }

}

Output of uname -a or ver

135-Ubuntu SMP Fri Sep 27 13:53:58 UTC 2024 / Microsoft Windows [version 10.0.19045.5371]

Output of java -version

21.0.2

Quarkus version or git rev

3.17.7

Build tool (ie. output of mvnw --version or gradlew --version)

3.9.9

Additional information

My goal is to be able to stop or resume consuming messages as I wish.

The ChannelPausable is the best solution I have found at the moment!
If there are other ways to achieve this, I'm interested.

@embds embds added the kind/bug Something isn't working label Jan 22, 2025
@quarkus-bot quarkus-bot bot added area/virtual-threads Issue related to Java's Virtual Threads env/windows Impacts Windows machines labels Jan 22, 2025
Copy link

quarkus-bot bot commented Jan 22, 2025

/cc @cescoffier (virtual-threads), @ozangunalp (virtual-threads)

@ozangunalp
Copy link
Contributor

This is indeed a known behaviour for @RunOnVirtualThread and pausable channels.

The concurrency level of virtual threads is controlled by requesting multiple messages from the consumer. The current version of pausable channels doesn't deal with messages that have already been requested. Ideally, it'd need to buffer those messages and dispatch them once the channel is resumed. This improvement is in the works but I need to test a couple of things, including the API changes.

@embds
Copy link
Author

embds commented Jan 23, 2025

Thank you for your reply.

I understand about messages already consumed, but the problem is that after processing the messages in progress (at the moment we pause) it continues to consume other messages without ever stopping.

Is it also a known behaviour ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/virtual-threads Issue related to Java's Virtual Threads env/windows Impacts Windows machines kind/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants