Skip to content

Commit

Permalink
⚡ Remove CompletableFuture.supplyAsync() from graphql fetchers to han…
Browse files Browse the repository at this point in the history
…dle requests by RESTHeart threads pool
  • Loading branch information
ujibang committed Oct 19, 2023
1 parent 90ac832 commit 0be8d25
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

package org.restheart.graphql.datafetchers;

import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import com.mongodb.client.AggregateIterable;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.restheart.configuration.Configuration;
Expand All @@ -34,11 +34,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.client.AggregateIterable;

import graphql.schema.DataFetchingEnvironment;

public class GQLAggregationDataFetcher extends GraphQLDataFetcher {

private final Logger logger = LoggerFactory.getLogger(GQLAggregationDataFetcher.class);
private final Logger LOGGER = LoggerFactory.getLogger(GQLAggregationDataFetcher.class);

private static final String AGGREGATION_TIME_LIMIT_KEY = "aggregation-time-limit";
private long aggregationTimeLimit;
Expand All @@ -65,46 +67,43 @@ public GQLAggregationDataFetcher(AggregationMapping aggregationMapping) {
}

@Override
public Object get(DataFetchingEnvironment environment) throws Exception {
return CompletableFuture.supplyAsync(() -> {
var aggregation = (AggregationMapping) this.fieldMapping;

try {
aggregation.getResolvedStagesAsList(environment);
} catch (QueryVariableNotBoundException e) {
logger.info("Something went wrong while trying to resolve stages {}", e.getMessage());
throw new RuntimeException(e);
}
public Object get(DataFetchingEnvironment env) throws Exception {
var aggregation = (AggregationMapping) this.fieldMapping;

try {
aggregation.getResolvedStagesAsList(env);
} catch (QueryVariableNotBoundException e) {
LOGGER.info("Something went wrong while trying to resolve stages {}", e.getMessage());
throw new RuntimeException(e);
}

AggregateIterable<BsonDocument> res = null;
try {
var aggregationList = aggregation.getResolvedStagesAsList(env);

AggregateIterable<BsonDocument> res = null;
try {
var aggregationList = aggregation.getResolvedStagesAsList(environment);

// If user does not pass any stage return an empty array
if(aggregationList.size() == 0 ) {
return new BsonArray();
}

res = mongoClient
.getDatabase(aggregation.getDb().getValue())
.getCollection(aggregation.getCollection().getValue())
.withDocumentClass(BsonDocument.class)
.aggregate(aggregationList)
.allowDiskUse(aggregation.getAllowDiskUse().getValue())
.maxTime(this.aggregationTimeLimit, TimeUnit.MILLISECONDS);

} catch (QueryVariableNotBoundException e) {
logger.error("Aggregation pipeline has failed! {}", e.getMessage());
e.printStackTrace();
// If user does not pass any stage return an empty array
if(aggregationList.isEmpty() ) {
return new BsonArray();
}

var stageOutput = new ArrayList<BsonDocument>();
res = mongoClient
.getDatabase(aggregation.getDb().getValue())
.getCollection(aggregation.getCollection().getValue())
.withDocumentClass(BsonDocument.class)
.aggregate(aggregationList)
.allowDiskUse(aggregation.getAllowDiskUse().getValue())
.maxTime(this.aggregationTimeLimit, TimeUnit.MILLISECONDS);

if(res != null) {
res.forEach(doc -> stageOutput.add(doc));
}
} catch (QueryVariableNotBoundException e) {
LOGGER.error("Field-to-aggregation mapping has failed! {}", e.getMessage(), e);
}

var stageOutput = new ArrayList<BsonDocument>();

if(res != null) {
res.forEach(doc -> stageOutput.add(doc));
}

return stageOutput;
});
return stageOutput;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* =========================LICENSE_END==================================
Expand All @@ -25,7 +25,6 @@
import org.restheart.graphql.models.FieldMapping;

public abstract class GraphQLDataFetcher implements DataFetcher<Object> {

protected static MongoClient mongoClient;
protected FieldMapping fieldMapping;

Expand All @@ -36,5 +35,4 @@ public static void setMongoClient(MongoClient mClient){
public GraphQLDataFetcher(FieldMapping fieldMapping){
this.fieldMapping = fieldMapping;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* =========================LICENSE_END==================================
Expand Down Expand Up @@ -53,31 +53,29 @@ public static void setMongoClient(MongoClient mClient) {

@Override
public CompletionStage<List<BsonValue>> load(List<BsonValue> pipelines) {
return CompletableFuture.supplyAsync(() -> {
var res = new ArrayList<BsonValue>();
var res = new ArrayList<BsonValue>();

var listOfFacets = pipelines.stream()
.map(pipeline -> new Facet(String.valueOf(pipeline.hashCode()), toBson(pipeline)))
.toList();
var listOfFacets = pipelines.stream()
.map(pipeline -> new Facet(String.valueOf(pipeline.hashCode()), toBson(pipeline)))
.toList();

var iterable = mongoClient.getDatabase(this.db)
.getCollection(this.collection, BsonValue.class)
.aggregate(List.of(Aggregates.facet(listOfFacets)));
var iterable = mongoClient.getDatabase(this.db)
.getCollection(this.collection, BsonValue.class)
.aggregate(List.of(Aggregates.facet(listOfFacets)));

var aggResult = new BsonArray();
var aggResult = new BsonArray();

iterable.into(aggResult);
iterable.into(aggResult);

var resultDoc = aggResult.get(0).asDocument();
var resultDoc = aggResult.get(0).asDocument();

pipelines.forEach(query -> {
BsonValue queryResult = resultDoc.get(String.valueOf(query.hashCode()));
res.add(queryResult);
});

return res;
pipelines.forEach(query -> {
BsonValue queryResult = resultDoc.get(String.valueOf(query.hashCode()));
res.add(queryResult);
});

return CompletableFuture.completedFuture(res);

}

private List<Bson> toBson(BsonValue pipeline) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* =========================LICENSE_END==================================
Expand Down Expand Up @@ -76,57 +76,54 @@ public QueryBatchLoader(String db, String collection) {

@Override
public CompletionStage<List<BsonValue>> load(List<BsonValue> queries) {
return CompletableFuture.supplyAsync(() -> {
var res = new ArrayList<BsonValue>();
List<Bson> stages = new ArrayList<>();
var res = new ArrayList<BsonValue>();
List<Bson> stages = new ArrayList<>();

// if there are at least 2 queries within the batch
if (queries.size() > 1){
var mergedCond = new BsonArray();
var listOfFacets = new ArrayList<Facet>();
// if there are at least 2 queries within the batch
if (queries.size() > 1){
var mergedCond = new BsonArray();
var listOfFacets = new ArrayList<Facet>();

// foreach query within the batch...
queries.forEach(query -> {
// add find condition to merged array
BsonDocument findClause = query.asDocument().containsKey("find") ? query.asDocument().getDocument("find") : new BsonDocument();
mergedCond.add(findClause);
// foreach query within the batch...
queries.forEach(query -> {
// add find condition to merged array
BsonDocument findClause = query.asDocument().containsKey("find") ? query.asDocument().getDocument("find") : new BsonDocument();
mergedCond.add(findClause);

// create a new sub-pipeline with query stages
listOfFacets.add(new Facet(String.valueOf(query.hashCode()), getQueryStages(query.asDocument())));
});
// create a new sub-pipeline with query stages
listOfFacets.add(new Facet(String.valueOf(query.hashCode()), getQueryStages(query.asDocument())));
});

// 1° stage --> $match with conditions merged by $or operator
stages.add(Aggregates.match(new BsonDocument("$or", mergedCond)));
// 1° stage --> $match with conditions merged by $or operator
stages.add(Aggregates.match(new BsonDocument("$or", mergedCond)));

// 2° stage --> $facet with one sub-pipeline for each query within the batch
stages.add(Aggregates.facet(listOfFacets));
// 2° stage --> $facet with one sub-pipeline for each query within the batch
stages.add(Aggregates.facet(listOfFacets));

var iterable = mongoClient.getDatabase(this.db).getCollection(this.collection, BsonValue.class).aggregate(stages);
var iterable = mongoClient.getDatabase(this.db).getCollection(this.collection, BsonValue.class).aggregate(stages);

BsonArray aggResult = new BsonArray();
BsonArray aggResult = new BsonArray();

iterable.into(aggResult);
iterable.into(aggResult);

var resultDoc = aggResult.get(0).asDocument();
queries.forEach(query -> {
BsonValue queryResult = resultDoc.get(String.valueOf(query.hashCode()));
res.add(queryResult);
});
// ... otherwise merging is not needed and sub-pipelines neither
} else {
var query = queries.get(0).asDocument();
stages = getQueryStages(query);
var iterable = mongoClient.getDatabase(this.db).getCollection(this.collection, BsonValue.class).aggregate(stages);
var aggResult = new BsonArray();
var resultDoc = aggResult.get(0).asDocument();
queries.forEach(query -> {
BsonValue queryResult = resultDoc.get(String.valueOf(query.hashCode()));
res.add(queryResult);
});
// ... otherwise merging is not needed and sub-pipelines neither
} else {
var query = queries.get(0).asDocument();
stages = getQueryStages(query);
var iterable = mongoClient.getDatabase(this.db).getCollection(this.collection, BsonValue.class).aggregate(stages);
var aggResult = new BsonArray();

iterable.into(aggResult);
iterable.into(aggResult);

res.add(aggResult);
}

return res;
});
res.add(aggResult);
}

return CompletableFuture.completedFuture(res);
}

private List<Bson> getQueryStages(BsonDocument queryDoc){
Expand Down

0 comments on commit 0be8d25

Please sign in to comment.