Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Cache Schema instances for classes in a weak reference cache since creating an instance could be CPU intensive #23777

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,12 @@ default Optional<Object> getNativeSchema() {
* @return a Schema instance
*/
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Class<T> clazz) {
return DefaultImplementation.getDefaultImplementation()
Schema<T> schema = DefaultImplementation.getDefaultImplementation().getProtobufSchemaCache(clazz);
if(schema != null) return schema.clone();
schema = DefaultImplementation.getDefaultImplementation()
.newProtobufSchema(SchemaDefinition.builder().withPojo(clazz).build());
DefaultImplementation.getDefaultImplementation().setProtobufSchemaCache(clazz,schema);
return schema.clone();
}

/**
Expand Down Expand Up @@ -327,8 +331,12 @@ static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF_NAT
* @return a Schema instance
*/
static <T> Schema<T> AVRO(Class<T> pojo) {
return DefaultImplementation.getDefaultImplementation()
Schema<T> schema = DefaultImplementation.getDefaultImplementation().getAvroSchemaCache(pojo);
if(schema != null) return schema.clone();
schema = DefaultImplementation.getDefaultImplementation()
.newAvroSchema(SchemaDefinition.builder().withPojo(pojo).build());
DefaultImplementation.getDefaultImplementation().setAvroSchemaCache(pojo,schema);
return schema.clone();
}

/**
Expand All @@ -348,8 +356,12 @@ static <T> Schema<T> AVRO(SchemaDefinition<T> schemaDefinition) {
* @return a Schema instance
*/
static <T> Schema<T> JSON(Class<T> pojo) {
return DefaultImplementation.getDefaultImplementation()
Schema<T> schema = DefaultImplementation.getDefaultImplementation().getJsonSchemaCache(pojo);
if(schema != null) return schema.clone();
schema = DefaultImplementation.getDefaultImplementation()
.newJSONSchema(SchemaDefinition.builder().withPojo(pojo).build());
DefaultImplementation.getDefaultImplementation().setJsonSchemaCache(pojo,schema);
return schema.clone();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.internal;

import java.util.WeakHashMap;

/**
* This class loads the implementation for {@link PulsarClientImplementationBinding}
* and allows you to decouple the API from the actual implementation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@
* The actual implementation of this class is loaded from {@link DefaultImplementation}.
*/
public interface PulsarClientImplementationBinding {
public Schema getAvroSchemaCache(Class clazz);
public Schema getProtobufSchemaCache(Class clazz);
public Schema getJsonSchemaCache(Class clazz);
public void setAvroSchemaCache(Class clazz,Schema schema);
public void setProtobufSchemaCache(Class clazz,Schema schema);
public void setJsonSchemaCache(Class clazz,Schema schema);
Comment on lines +58 to +63
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache should be encapsulated in the PulsarClientImplementationBindingImpl and not exposed outside of the interface.

<T> SchemaDefinitionBuilder<T> newSchemaDefinitionBuilder();

ClientBuilder newClientBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.LocalTime;
import java.util.Date;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.BatcherBuilder;
Expand Down Expand Up @@ -90,7 +91,27 @@
*/
@SuppressWarnings("unchecked")
public final class PulsarClientImplementationBindingImpl implements PulsarClientImplementationBinding {

private volatile WeakHashMap<Class<?>,Schema> AvroCache = new WeakHashMap<>();
private volatile WeakHashMap<Class<?>,Schema> ProtobufCache = new WeakHashMap<>();
private volatile WeakHashMap<Class<?>,Schema> JsonCache = new WeakHashMap<>();
public Schema getAvroSchemaCache(Class clazz){
return AvroCache.get(clazz);
}
public Schema getProtobufSchemaCache(Class clazz){
return ProtobufCache.get(clazz);
}
public Schema getJsonSchemaCache(Class clazz){
return JsonCache.get(clazz);
}
public void setAvroSchemaCache(Class clazz,Schema schema){
AvroCache.put(clazz,schema);
}
public void setProtobufSchemaCache(Class clazz,Schema schema){
ProtobufCache.put(clazz,schema);
}
public void setJsonSchemaCache(Class clazz,Schema schema){
JsonCache.put(clazz,schema);
}
public <T> SchemaDefinitionBuilder<T> newSchemaDefinitionBuilder() {
return new SchemaDefinitionBuilderImpl();
}
Expand Down
Loading