Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hibernate Reactive doesn't play well with async Azure SDK's it seems #1911

Open
RJJdeVries opened this issue May 20, 2024 · 0 comments
Open

Comments

@RJJdeVries
Copy link

Hi there,

I'm having some trouble implementing Hibernate Reactive Panache while also using the async client of Azure Blob SDK (which uses Project Reaktor under the hood).

I have a handler which does a couple of things in order:

  1. First store an entity in the database
  2. Then upload blobs to storage container
  3. Then send service bus message to queue

Now my Azure Blob adapter integration test is working fine. I am able to upload one or more blobs to an Azurite container running locally.

It gets interesting when I run the integration test for my handler. I get the following error message:

org.hibernate.HibernateException: java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [265]: 'vert.x-eventloop-thread-1' current Thread [243]: 'reactor-http-nio-4'

So I have implemented a BlobHandler which uses the async client to upload blobs. In a nutshell, I use the AdaptersToFlow.publisher() to wrap the async call.

private Uni<Void> storeSingle(BlobData blob) {
        System.out.println("Going to store a single blob.");
        return Uni
                .createFrom()
                .publisher(
                        AdaptersToFlow.publisher(
                                blobContainerClient
                                        .getBlobAsyncClient(blob.getBlobName())
                                        .uploadFromFile(blob.getFile().getPath()))
                )
                .onFailure()
                .recoverWithUni(() -> Uni.createFrom().failure(new BlobProblem(
                        new ProblemDetail(
                                "BLOB_DOWNLOAD_PROBLEM",
                                "A problem occurred while downloading a blob.",
                                Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
                                List.of()
                        )
                )));

    }

    @Override
    public Uni<Void> storeMultiple(List<BlobData> blobs) {
        System.out.println("Going to store multiple blobs.");
        return Uni
                .join()
                .all(blobs.stream().map(this::storeSingle).toList())
                .usingConcurrencyOf(blobConfig.uploadConcurrency())
                .andFailFast()
                .onItem()
                .ignore()
                .andContinueWithNull()
                .onFailure()
                .recoverWithUni(exception -> {
                    System.out.println("Failure occurred while trying to store multiple blobs.");
                    Log.error("Upload multiple blobs", "A problem occurred while uploading one or more blobs.", exception);

                    return Uni.createFrom().failure(new BlobProblem(
                            new ProblemDetail(
                                    "MULTIPLE_BLOBS_UPLOAD_PROBLEM",
                                    "A problem occurred while uploading one or more blobs.",
                                    Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
                                    List.of()
                            )
                    ));
                });
    }

I am using the blob handler in my main handler, like this:

return Uni
                .join()
                .all(listOfUnis)
                .andFailFast()
                .chain(() -> storeConversionRequest.store(conversionRequest))
                .map(ignored -> {
                    System.out.println("After storing conversion request");

                    return null;
                })
                .chain(() -> storeBlob.storeMultiple(newListOfBlobData))
                .map(ignored -> {
                    System.out.println("After uploading all the blobs");

                    return null;
                })
                .chain(() -> sendConversionRequest.send(new ConversionRequestCommand(
                                conversionRequest.getId(),
                                blobs.stream().map(BlobData::getBlobName).toList(),
                                "",
                                targetFileType
                        )
                ))
                .map(ignored -> {
                    System.out.println("After sending the command");

                    return null;
                })
                .map(ignored -> conversionRequest.getId())
                .onFailure()
                .recoverWithUni(exception -> {
                    System.out.println("Error occurred while handling a conversion request.");
                    Log.error(
                            conversionRequest.getId().toString(),
                            "An error occurred while handling new conversion request.",
                            exception
                    );

                    return Uni.createFrom().failure(exception);
                });

These are some excerpts, since its company code.

Now I have tried some stuff with the Mutiny method .runSubscriptionOn(MutinyHelper.executor(vertx.getOrCreateContext())) , but I then get an error message stating 'not using duplicated context'.

I don't think this is an issue with Hibernate per see, but I just don't get it to work and I'm starting to get a bit frustrated as well. And since the error message originates from Hibernate Reactive I'm trying my luck here!

@DavideD DavideD added problem A limitation or source of discomfort and removed problem A limitation or source of discomfort labels Dec 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants