Skip to content

Commit

Permalink
[hibernate#1905] Reactive find with lock in Quarkus with reactive hib…
Browse files Browse the repository at this point in the history
…ernate
  • Loading branch information
dreab8 committed Jan 28, 2025
1 parent a25679d commit 5657047
Show file tree
Hide file tree
Showing 15 changed files with 288 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,24 @@
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.engine.spi.Status;
import org.hibernate.event.service.spi.EventListenerGroup;
import org.hibernate.event.spi.PostLoadEvent;
import org.hibernate.event.spi.PostLoadEventListener;
import org.hibernate.persister.collection.CollectionPersister;
import org.hibernate.persister.entity.EntityPersister;
import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl;
import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister;
import org.hibernate.reactive.session.ReactiveSession;
import org.hibernate.sql.exec.spi.Callback;
import org.hibernate.sql.results.graph.entity.EntityInitializer;
import org.hibernate.sql.results.jdbc.spi.JdbcValuesSourceProcessingState;
import org.hibernate.sql.results.spi.LoadContexts;

import static java.lang.invoke.MethodHandles.lookup;
import static org.hibernate.reactive.logging.impl.LoggerFactory.make;
import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture;
import static org.hibernate.reactive.util.impl.CompletionStages.loop;
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

/**
Expand Down Expand Up @@ -456,7 +462,7 @@ public EntityHolder removeEntityHolder(EntityKey key) {

@Override
public void postLoad(JdbcValuesSourceProcessingState processingState, Consumer<EntityHolder> loadedConsumer) {
delegate.postLoad( processingState, loadedConsumer );
throw LOG.nonReactiveMethodCall( "reactivePostLoad(JdbcValuesSourceProcessingState, Consumer<EntityHolder>) )" );
}

@Internal
Expand Down Expand Up @@ -710,4 +716,76 @@ public Iterator<Object> managedEntitiesIterator() {
public NaturalIdResolutions getNaturalIdResolutions() {
return delegate.getNaturalIdResolutions();
}

/**
* Reactive version of {@link StatefulPersistenceContext#postLoad(JdbcValuesSourceProcessingState, Consumer)}
*
*/
public CompletionStage<Void> reactivePostLoad(JdbcValuesSourceProcessingState processingState, Consumer<EntityHolder> holderConsumer) {
final ReactiveCallbackImpl callback = (ReactiveCallbackImpl) processingState.getExecutionContext().getCallback();

if ( processingState.getLoadingEntityHolders() != null ) {
final EventListenerGroup<PostLoadEventListener> listenerGroup =
getSession().getFactory().getEventListenerGroups().eventListenerGroup_POST_LOAD;
final PostLoadEvent postLoadEvent = processingState.getPostLoadEvent();
return loop(
processingState.getLoadingEntityHolders(), entityHolder ->
processLoadedEntityHolder(
entityHolder,
listenerGroup,
postLoadEvent,
callback,
holderConsumer
))
.thenAccept( v -> processingState.getLoadingEntityHolders().clear() );
}
if ( processingState.getReloadedEntityHolders() != null ) {
return loop(
processingState.getLoadingEntityHolders(), entityHolder ->
processLoadedEntityHolder(
entityHolder,
null,
null,
callback,
holderConsumer
))
.thenAccept( v -> processingState.getLoadingEntityHolders().clear() );
}
return voidFuture();
}

/**
* Reactive version of {@link StatefulPersistenceContext#processLoadedEntityHolder(EntityHolder, EventListenerGroup, PostLoadEvent, Callback, Consumer)}
*/
private CompletionStage<Void> processLoadedEntityHolder(
EntityHolder holder,
EventListenerGroup<PostLoadEventListener> listenerGroup,
PostLoadEvent postLoadEvent,
ReactiveCallbackImpl callback,
Consumer<EntityHolder> holderConsumer) {
if ( holderConsumer != null ) {
holderConsumer.accept( holder );
}
if ( holder.getEntity() == null ) {
// It's possible that we tried to load an entity and found out it doesn't exist,
// in which case we added an entry with a null proxy and entity.
// Remove that empty entry on post load to avoid unwanted side effects
getEntitiesByKey().remove( holder.getEntityKey() );
}
else {
if ( postLoadEvent != null ) {
postLoadEvent.reset();
postLoadEvent.setEntity( holder.getEntity() )
.setId( holder.getEntityKey().getIdentifier() )
.setPersister( holder.getDescriptor() );
listenerGroup.fireEventOnEachListener( postLoadEvent, PostLoadEventListener::onPostLoad );
if ( callback != null ) {
return callback
.invokeReactiveLoadActions( holder.getEntity(), holder.getDescriptor(), getSession() )
.thenAccept( v -> holder.resetEntityInitialier() );
}
}
}
return voidFuture();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.engine.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;

import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.loader.ast.spi.AfterLoadAction;
import org.hibernate.metamodel.mapping.EntityMappingType;
import org.hibernate.reactive.loader.ast.spi.ReactiveAfterLoadAction;
import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.sql.exec.spi.Callback;

import static java.lang.invoke.MethodHandles.lookup;
import static org.hibernate.reactive.logging.impl.LoggerFactory.make;
import static org.hibernate.reactive.util.impl.CompletionStages.loop;

/**
* Reactive equivalent of {@link org.hibernate.sql.exec.internal.CallbackImpl}
*/
public class ReactiveCallbackImpl implements Callback {
private static final Log LOG = make( Log.class, lookup() );

private final List<ReactiveAfterLoadAction> afterLoadActions;

public ReactiveCallbackImpl() {
this.afterLoadActions = new ArrayList<>( 1 );
}

@Override
public void registerAfterLoadAction(AfterLoadAction afterLoadAction) {
throw LOG.nonReactiveMethodCall( "registerReactiveAfterLoadAction(ReactiveCallbackImpl)" );
}

public void registerReactiveAfterLoadAction(ReactiveAfterLoadAction afterLoadAction) {
afterLoadActions.add( afterLoadAction );
}

@Override
public void invokeAfterLoadActions(
Object entity,
EntityMappingType entityMappingType,
SharedSessionContractImplementor session) {
throw LOG.nonReactiveMethodCall( "invokeAfterLoadActions(Object, EntityMappingType, SharedSessionContractImplementor)" );
}

/**
* Reactive version of {@link org.hibernate.sql.exec.internal.CallbackImpl#invokeAfterLoadActions(Object, EntityMappingType, SharedSessionContractImplementor)}
*/
public CompletionStage<Void> invokeReactiveLoadActions(
Object entity,
EntityMappingType entityMappingType,
SharedSessionContractImplementor session) {
return loop(
afterLoadActions, afterLoadAction ->
afterLoadAction.reactiveAfterLoad( entity, entityMappingType, session )
);
}

@Override
public boolean hasAfterLoadActions() {
return !afterLoadActions.isEmpty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.hibernate.metamodel.mapping.NaturalIdMapping;
import org.hibernate.query.internal.SimpleQueryOptions;
import org.hibernate.query.spi.QueryOptions;
import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl;
import org.hibernate.reactive.loader.ast.spi.ReactiveNaturalIdLoader;
import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor;
import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer;
Expand All @@ -38,7 +39,6 @@
import org.hibernate.sql.ast.tree.select.QuerySpec;
import org.hibernate.sql.ast.tree.select.SelectStatement;
import org.hibernate.sql.exec.internal.BaseExecutionContext;
import org.hibernate.sql.exec.internal.CallbackImpl;
import org.hibernate.sql.exec.internal.JdbcParameterBindingsImpl;
import org.hibernate.sql.exec.spi.Callback;
import org.hibernate.sql.exec.spi.JdbcOperationQuerySelect;
Expand Down Expand Up @@ -335,7 +335,7 @@ public NaturalIdLoaderWithOptionsExecutionContext(
QueryOptions queryOptions) {
super( session );
this.queryOptions = queryOptions;
callback = new CallbackImpl();
callback = new ReactiveCallbackImpl();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@
import org.hibernate.query.internal.SimpleQueryOptions;
import org.hibernate.query.spi.QueryOptions;
import org.hibernate.query.spi.QueryParameterBindings;
import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl;
import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor;
import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer;
import org.hibernate.resource.jdbc.spi.LogicalConnectionImplementor;
import org.hibernate.sql.ast.tree.select.SelectStatement;
import org.hibernate.sql.exec.internal.CallbackImpl;
import org.hibernate.sql.exec.internal.JdbcParameterBindingsImpl;
import org.hibernate.sql.exec.spi.Callback;
import org.hibernate.sql.exec.spi.ExecutionContext;
import org.hibernate.sql.exec.spi.JdbcParameterBindings;
import org.hibernate.sql.exec.spi.JdbcParametersList;

import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

public class ReactiveSingleIdLoadPlan<T> extends SingleIdLoadPlan<CompletionStage<T>> {

public ReactiveSingleIdLoadPlan(
Expand Down Expand Up @@ -61,7 +63,7 @@ public CompletionStage<T> load(Object restrictedValue, Object entityInstance, Bo
}
assert offset == getJdbcParameters().size();
final QueryOptions queryOptions = new SimpleQueryOptions( getLockOptions(), readOnly );
final Callback callback = new CallbackImpl();
final ReactiveCallbackImpl callback = new ReactiveCallbackImpl();
EntityMappingType loadable = (EntityMappingType) getLoadable();
ExecutionContext executionContext = executionContext(
restrictedValue,
Expand All @@ -74,17 +76,19 @@ public CompletionStage<T> load(Object restrictedValue, Object entityInstance, Bo
// FIXME: Should we get this from jdbcServices.getSelectExecutor()?
return StandardReactiveSelectExecutor.INSTANCE
.list( getJdbcSelect(), jdbcParameterBindings, executionContext, getRowTransformer(), resultConsumer( singleResultExpected ) )
.thenApply( this::extractEntity )
.thenApply( entity -> {
invokeAfterLoadActions( callback, session, entity );
return (T) entity;
} );
.thenCompose( list -> {
Object entity = extractEntity( list );
return invokeAfterLoadActions( callback, session, entity )
.thenApply( v -> (T) entity );
}
);
}

private <G> void invokeAfterLoadActions(Callback callback, SharedSessionContractImplementor session, G entity) {
if ( entity != null && getLoadable() != null) {
callback.invokeAfterLoadActions( entity, (EntityMappingType) getLoadable(), session );
private <G> CompletionStage<Void> invokeAfterLoadActions(ReactiveCallbackImpl callback, SharedSessionContractImplementor session, G entity) {
if ( entity != null && getLoadable() != null ) {
return callback.invokeReactiveLoadActions( entity, (EntityMappingType) getLoadable(), session );
}
return voidFuture();
}

private Object extractEntity(List<?> list) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import org.hibernate.metamodel.mapping.SingularAttributeMapping;
import org.hibernate.metamodel.mapping.internal.ToOneAttributeMapping;
import org.hibernate.query.spi.QueryOptions;
import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl;
import org.hibernate.reactive.loader.ast.spi.ReactiveSingleUniqueKeyEntityLoader;
import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor;
import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer;
import org.hibernate.sql.ast.SqlAstTranslatorFactory;
import org.hibernate.sql.ast.tree.select.SelectStatement;
import org.hibernate.sql.exec.internal.BaseExecutionContext;
import org.hibernate.sql.exec.internal.CallbackImpl;
import org.hibernate.sql.exec.internal.JdbcParameterBindingsImpl;
import org.hibernate.sql.exec.spi.Callback;
import org.hibernate.sql.exec.spi.JdbcOperationQuerySelect;
Expand Down Expand Up @@ -179,7 +179,7 @@ public SingleUKEntityLoaderExecutionContext(SharedSessionContractImplementor ses
super( session );
//Careful, readOnly is possibly null
this.queryOptions = readOnly == null ? QueryOptions.NONE : readOnly ? QueryOptions.READ_ONLY : QueryOptions.READ_WRITE;
callback = new CallbackImpl();
callback = new ReactiveCallbackImpl();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.loader.ast.spi;

import java.util.concurrent.CompletionStage;

import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.metamodel.mapping.EntityMappingType;

/**
* Reactive version of {@link org.hibernate.loader.ast.spi.AfterLoadAction}
*/
public interface ReactiveAfterLoadAction {
/**
* @see org.hibernate.loader.ast.spi.AfterLoadAction#afterLoad(Object, EntityMappingType, SharedSessionContractImplementor)
*
* The action trigger - the {@code entity} is being loaded
*/
CompletionStage<Void> reactiveAfterLoad(
Object entity,
EntityMappingType entityMappingType,
SharedSessionContractImplementor session);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
package org.hibernate.reactive.query.spi;

import java.lang.invoke.MethodHandles;
import java.util.*;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
Expand All @@ -30,12 +33,14 @@
import org.hibernate.query.sqm.internal.SqmInterpretationsKey.InterpretationsKeySource;
import org.hibernate.query.sqm.tree.SqmStatement;
import org.hibernate.query.sqm.tree.select.SqmSelectStatement;
import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl;
import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.reactive.logging.impl.LoggerFactory;
import org.hibernate.reactive.query.sqm.internal.AggregatedSelectReactiveQueryPlan;
import org.hibernate.reactive.query.sqm.internal.ConcreteSqmSelectReactiveQueryPlan;
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
import org.hibernate.reactive.sql.results.spi.ReactiveSingleResultConsumer;
import org.hibernate.sql.exec.spi.Callback;
import org.hibernate.sql.results.internal.TupleMetadata;

import jakarta.persistence.NoResultException;
Expand Down Expand Up @@ -76,6 +81,8 @@ public class ReactiveAbstractSelectionQuery<R> {
private final Function<List<R>, R> uniqueElement;
private final InterpretationsKeySource interpretationsKeySource;

private Callback callback;

// I'm sure we can avoid some of this by making some methods public in ORM,
// but this allows me to prototype faster. We can refactor the code later.
public ReactiveAbstractSelectionQuery(
Expand Down Expand Up @@ -363,4 +370,11 @@ public void enableFetchProfile(String profileName) {
}
fetchProfiles.add( profileName );
}

public Callback getCallback() {
if ( callback == null ) {
callback = new ReactiveCallbackImpl();
}
return callback;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.hibernate.reactive.query.sql.spi.ReactiveNativeQueryImplementor;
import org.hibernate.reactive.query.sql.spi.ReactiveNonSelectQueryPlan;
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
import org.hibernate.sql.exec.spi.Callback;
import org.hibernate.type.BasicTypeReference;

import jakarta.persistence.AttributeConverter;
Expand Down Expand Up @@ -192,6 +193,11 @@ public R getSingleResultOrNull() {
return selectionQueryDelegate.getSingleResultOrNull();
}

@Override
public Callback getCallback() {
return selectionQueryDelegate.getCallback();
}

@Override
public CompletionStage<R> getReactiveSingleResultOrNull() {
return selectionQueryDelegate.getReactiveSingleResultOrNull();
Expand Down
Loading

0 comments on commit 5657047

Please sign in to comment.