Skip to content

Commit

Permalink
revert rename of registry serializer factory
Browse files Browse the repository at this point in the history
Signed-off-by: Shivesh Ranjan <[email protected]>
  • Loading branch information
shiveshr committed Jul 16, 2020
1 parent 174c4e6 commit fe0974f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.concurrent.ConcurrentHashMap;

/**
* Serializer Config class that is passed to {@link RegistrySerializerFactory} for creating serializer.
* Serializer Config class that is passed to {@link SerializerFactory} for creating serializer.
*/
@Data
@Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
import static io.pravega.schemaregistry.serializers.WithSchema.NO_TRANSFORM;

@Slf4j
public class RegistrySerializerFactory {
public static final String PRAVEGA_EVENT_HEADER = "PravegaEventHeader";

public class SerializerFactory {
// region avro
/**
* Creates a typed avro serializer for the Schema. The serializer implementation returned from this method is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,24 +90,24 @@ public void testAvroSerializers() {
doAnswer(x -> new EncodingId(2)).when(client).getEncodingId(anyString(), eq(versionInfo3), any());
doAnswer(x -> new EncodingInfo(versionInfo3, of.getSchemaInfo(), Codecs.None.getCodec().getCodecType())).when(client).getEncodingInfo(anyString(), eq(new EncodingId(2)));

Serializer<Object> serializerStr = RegistrySerializerFactory.avroSerializer(config, of);
Serializer<Object> serializerStr = SerializerFactory.avroSerializer(config, of);
GenericData.EnumSymbol enumSymbol = new GenericData.EnumSymbol(of.getSchema(), "a");
ByteBuffer serialized1 = serializerStr.serialize(enumSymbol);

Serializer<Object> deserializer1 = RegistrySerializerFactory.avroDeserializer(config, of);
Serializer<Object> deserializer1 = SerializerFactory.avroDeserializer(config, of);
Object deserializedEnum = deserializer1.deserialize(serialized1);
assertEquals(deserializedEnum, enumSymbol);

Serializer<Test1> serializer = RegistrySerializerFactory.avroSerializer(config, schema1);
Serializer<Test1> serializer = SerializerFactory.avroSerializer(config, schema1);
Test1 test1 = new Test1("name", 1);
ByteBuffer serialized = serializer.serialize(test1);

Serializer<Test1> deserializer = RegistrySerializerFactory.avroDeserializer(config, schema1);
Serializer<Test1> deserializer = SerializerFactory.avroDeserializer(config, schema1);
Test1 deserialized = deserializer.deserialize(serialized);
assertEquals(deserialized, test1);

serialized = serializer.serialize(test1);
Serializer<Object> genericDeserializer = RegistrySerializerFactory.avroGenericDeserializer(config, null);
Serializer<Object> genericDeserializer = SerializerFactory.avroGenericDeserializer(config, null);
Object genericDeserialized = genericDeserializer.deserialize(serialized);
assertTrue(genericDeserialized instanceof GenericRecord);
assertEquals(((GenericRecord) genericDeserialized).get("name").toString(), "name");
Expand All @@ -121,9 +121,9 @@ public void testAvroSerializers() {
Map<Class<? extends SpecificRecordBase>, AvroSchema<SpecificRecordBase>> map = new HashMap<>();
map.put(Test1.class, schema1Base);
map.put(Test2.class, schema2Base);
Serializer<SpecificRecordBase> multiSerializer = RegistrySerializerFactory.avroMultiTypeSerializer(config, map);
Serializer<SpecificRecordBase> multiSerializer = SerializerFactory.avroMultiTypeSerializer(config, map);
serialized = multiSerializer.serialize(test1);
Serializer<SpecificRecordBase> multiDeserializer = RegistrySerializerFactory.avroMultiTypeDeserializer(config, map);
Serializer<SpecificRecordBase> multiDeserializer = SerializerFactory.avroMultiTypeDeserializer(config, map);
SpecificRecordBase deserialized2 = multiDeserializer.deserialize(serialized);
assertEquals(deserialized2, test1);

Expand All @@ -133,7 +133,7 @@ public void testAvroSerializers() {

Map<Class<? extends SpecificRecordBase>, AvroSchema<SpecificRecordBase>> map2 = new HashMap<>();
map2.put(Test1.class, schema1Base);
Serializer<Either<SpecificRecordBase, Object>> fallbackDeserializer = RegistrySerializerFactory.avroTypedOrGenericDeserializer(config, map2);
Serializer<Either<SpecificRecordBase, Object>> fallbackDeserializer = SerializerFactory.avroTypedOrGenericDeserializer(config, map2);

serialized = multiSerializer.serialize(test1);
Either<SpecificRecordBase, Object> fallback = fallbackDeserializer.deserialize(serialized);
Expand Down Expand Up @@ -164,10 +164,10 @@ public void testAvroSerializersReflect() {
doAnswer(x -> new EncodingInfo(versionInfo1, schema1.getSchemaInfo(), Codecs.None.getCodec().getCodecType())).when(client).getEncodingInfo(anyString(), eq(new EncodingId(0)));
doAnswer(x -> true).when(client).canReadUsing(anyString(), any());

Serializer<TestClass> serializer = RegistrySerializerFactory.avroSerializer(config, schema1);
Serializer<TestClass> serializer = SerializerFactory.avroSerializer(config, schema1);
ByteBuffer serialized = serializer.serialize(test1);

Serializer<TestClass> deserializer = RegistrySerializerFactory.avroDeserializer(config, schema1);
Serializer<TestClass> deserializer = SerializerFactory.avroDeserializer(config, schema1);
TestClass deserialized = deserializer.deserialize(serialized);
assertEquals(deserialized, test1);
}
Expand All @@ -194,16 +194,16 @@ public void testProtobufSerializers() throws IOException {
doAnswer(x -> new EncodingInfo(versionInfo2, schema2.getSchemaInfo(), Codecs.None.getCodec().getCodecType())).when(client).getEncodingInfo(anyString(), eq(new EncodingId(1)));
doAnswer(x -> true).when(client).canReadUsing(anyString(), any());

Serializer<ProtobufTest.Message2> serializer = RegistrySerializerFactory.protobufSerializer(config, schema1);
Serializer<ProtobufTest.Message2> serializer = SerializerFactory.protobufSerializer(config, schema1);
ProtobufTest.Message2 message = ProtobufTest.Message2.newBuilder().setName("name").setField1(1).build();
ByteBuffer serialized = serializer.serialize(message);

Serializer<ProtobufTest.Message2> deserializer = RegistrySerializerFactory.protobufDeserializer(config, schema1);
Serializer<ProtobufTest.Message2> deserializer = SerializerFactory.protobufDeserializer(config, schema1);
ProtobufTest.Message2 deserialized = deserializer.deserialize(serialized);
assertEquals(deserialized, message);

serialized = serializer.serialize(message);
Serializer<DynamicMessage> genericDeserializer = RegistrySerializerFactory.protobufGenericDeserializer(config, null);
Serializer<DynamicMessage> genericDeserializer = SerializerFactory.protobufGenericDeserializer(config, null);
DynamicMessage generic = genericDeserializer.deserialize(serialized);
assertEquals(generic.getAllFields().size(), 2);

Expand All @@ -215,9 +215,9 @@ public void testProtobufSerializers() throws IOException {
Map<Class<? extends GeneratedMessageV3>, ProtobufSchema<GeneratedMessageV3>> map = new HashMap<>();
map.put(ProtobufTest.Message2.class, schema1Base);
map.put(ProtobufTest.Message3.class, schema2Base);
Serializer<GeneratedMessageV3> multiSerializer = RegistrySerializerFactory.protobufMultiTypeSerializer(config, map);
Serializer<GeneratedMessageV3> multiSerializer = SerializerFactory.protobufMultiTypeSerializer(config, map);
serialized = multiSerializer.serialize(message);
Serializer<GeneratedMessageV3> multiDeserializer = RegistrySerializerFactory.protobufMultiTypeDeserializer(config, map);
Serializer<GeneratedMessageV3> multiDeserializer = SerializerFactory.protobufMultiTypeDeserializer(config, map);
GeneratedMessageV3 deserialized2 = multiDeserializer.deserialize(serialized);
assertEquals(deserialized2, message);

Expand All @@ -227,7 +227,7 @@ public void testProtobufSerializers() throws IOException {

Map<Class<? extends GeneratedMessageV3>, ProtobufSchema<GeneratedMessageV3>> map2 = new HashMap<>();
map2.put(ProtobufTest.Message2.class, schema1Base);
Serializer<Either<GeneratedMessageV3, DynamicMessage>> fallbackDeserializer = RegistrySerializerFactory.protobufTypedOrGenericDeserializer(config, map2);
Serializer<Either<GeneratedMessageV3, DynamicMessage>> fallbackDeserializer = SerializerFactory.protobufTypedOrGenericDeserializer(config, map2);
serialized = multiSerializer.serialize(message);
Either<GeneratedMessageV3, DynamicMessage> fallback = fallbackDeserializer.deserialize(serialized);
assertTrue(fallback.isLeft());
Expand Down Expand Up @@ -258,22 +258,22 @@ public void testJsonSerializers() throws JsonProcessingException {
doAnswer(x -> new EncodingInfo(versionInfo2, schema2.getSchemaInfo(), Codecs.None.getCodec().getCodecType())).when(client).getEncodingInfo(anyString(), eq(new EncodingId(1)));
doAnswer(x -> true).when(client).canReadUsing(anyString(), any());

Serializer<DerivedUser1> serializer = RegistrySerializerFactory.jsonSerializer(config, schema1);
Serializer<DerivedUser1> serializer = SerializerFactory.jsonSerializer(config, schema1);
DerivedUser1 user1 = new DerivedUser1("user", new Address("street", "city"), 2, "user1");
ByteBuffer serialized = serializer.serialize(user1);

Serializer<DerivedUser1> deserializer = RegistrySerializerFactory.jsonDeserializer(config, schema1);
Serializer<DerivedUser1> deserializer = SerializerFactory.jsonDeserializer(config, schema1);
DerivedUser1 deserialized = deserializer.deserialize(serialized);
assertEquals(deserialized, user1);

serialized = serializer.serialize(user1);
Serializer<WithSchema<JsonNode>> genericDeserializer = RegistrySerializerFactory.jsonGenericDeserializer(config);
Serializer<WithSchema<JsonNode>> genericDeserializer = SerializerFactory.jsonGenericDeserializer(config);
WithSchema<JsonNode> generic = genericDeserializer.deserialize(serialized);
assertEquals(((JSONSchema) generic.getSchema()).getSchema(), schema1.getSchema());
assertEquals(((JsonNode) generic.getObject()).size(), 4);

serialized = serializer.serialize(user1);
Serializer<String> stringDeserializer = RegistrySerializerFactory.jsonStringDeserializer(config);
Serializer<String> stringDeserializer = SerializerFactory.jsonStringDeserializer(config);
String str = stringDeserializer.deserialize(serialized);
assertFalse(Strings.isNullOrEmpty(str));

Expand All @@ -285,7 +285,7 @@ public void testJsonSerializers() throws JsonProcessingException {
doAnswer(x -> new EncodingId(2)).when(client).getEncodingId(anyString(), eq(versionInfo3), any());
doAnswer(x -> new EncodingInfo(versionInfo3, myData.getSchemaInfo(), Codecs.None.getCodec().getCodecType())).when(client).getEncodingInfo(anyString(), eq(new EncodingId(2)));

Serializer<Object> serializer2 = RegistrySerializerFactory.jsonSerializer(config, myData);
Serializer<Object> serializer2 = SerializerFactory.jsonSerializer(config, myData);
Map<String, String> jsonObject = new HashMap<>();
jsonObject.put("content", "mxx");

Expand All @@ -300,9 +300,9 @@ public void testJsonSerializers() throws JsonProcessingException {
doAnswer(x -> new EncodingId(3)).when(client).getEncodingId(anyString(), eq(versionInfo4), any());
doAnswer(x -> new EncodingInfo(versionInfo4, strSchema.getSchemaInfo(), Codecs.None.getCodec().getCodecType())).when(client).getEncodingInfo(anyString(), eq(new EncodingId(3)));

Serializer<Object> serializer3 = RegistrySerializerFactory.jsonSerializer(config, strSchema);
Serializer<Object> deserializer3 = RegistrySerializerFactory.jsonDeserializer(config, strSchema);
Serializer<WithSchema<JsonNode>> generic3 = RegistrySerializerFactory.jsonGenericDeserializer(config);
Serializer<Object> serializer3 = SerializerFactory.jsonSerializer(config, strSchema);
Serializer<Object> deserializer3 = SerializerFactory.jsonDeserializer(config, strSchema);
Serializer<WithSchema<JsonNode>> generic3 = SerializerFactory.jsonGenericDeserializer(config);
String string = "a";
s = serializer3.serialize(string);
Object x = deserializer3.deserialize(s);
Expand All @@ -320,9 +320,9 @@ public void testJsonSerializers() throws JsonProcessingException {
Map<Class<?>, JSONSchema<Object>> map = new HashMap<>();
map.put(DerivedUser1.class, schema1Base);
map.put(DerivedUser2.class, schema2Base);
Serializer<Object> multiSerializer = RegistrySerializerFactory.jsonMultiTypeSerializer(config, map);
Serializer<Object> multiSerializer = SerializerFactory.jsonMultiTypeSerializer(config, map);
serialized = multiSerializer.serialize(user1);
Serializer<Object> multiDeserializer = RegistrySerializerFactory.jsonMultiTypeDeserializer(config, map);
Serializer<Object> multiDeserializer = SerializerFactory.jsonMultiTypeDeserializer(config, map);
Object deserialized2 = multiDeserializer.deserialize(serialized);
assertEquals(deserialized2, user1);

Expand All @@ -332,7 +332,7 @@ public void testJsonSerializers() throws JsonProcessingException {

Map<Class<?>, JSONSchema<Object>> map2 = new HashMap<>();
map2.put(DerivedUser1.class, schema1Base);
Serializer<Either<Object, WithSchema<JsonNode>>> fallbackDeserializer = RegistrySerializerFactory.jsonTypedOrGenericDeserializer(config, map2);
Serializer<Either<Object, WithSchema<JsonNode>>> fallbackDeserializer = SerializerFactory.jsonTypedOrGenericDeserializer(config, map2);
serialized = multiSerializer.serialize(user1);
Either<Object, WithSchema<JsonNode>> fallback = fallbackDeserializer.deserialize(serialized);
assertTrue(fallback.isLeft());
Expand Down Expand Up @@ -373,27 +373,27 @@ public void testMultiformatDeserializers() throws IOException {
doAnswer(x -> new EncodingInfo(versionInfo3, schema3.getSchemaInfo(), Codecs.None.getCodec().getCodecType())).when(client).getEncodingInfo(anyString(), eq(new EncodingId(2)));
doAnswer(x -> true).when(client).canReadUsing(anyString(), any());

Serializer<Test1> avroSerializer = RegistrySerializerFactory.avroSerializer(config, schema1);
Serializer<Test1> avroSerializer = SerializerFactory.avroSerializer(config, schema1);
Test1 test1 = new Test1("name", 1);
ByteBuffer serializedAvro = avroSerializer.serialize(test1);

Serializer<ProtobufTest.Message2> protobufSerializer = RegistrySerializerFactory.protobufSerializer(config, schema2);
Serializer<ProtobufTest.Message2> protobufSerializer = SerializerFactory.protobufSerializer(config, schema2);
ProtobufTest.Message2 message = ProtobufTest.Message2.newBuilder().setName("name").setField1(1).build();
ByteBuffer serializedProto = protobufSerializer.serialize(message);

Serializer<DerivedUser1> jsonSerializer = RegistrySerializerFactory.jsonSerializer(config, schema3);
Serializer<DerivedUser1> jsonSerializer = SerializerFactory.jsonSerializer(config, schema3);
DerivedUser1 user1 = new DerivedUser1("user", new Address("street", "city"), 2, "user1");
ByteBuffer serializedJson = jsonSerializer.serialize(user1);

Serializer<Object> deserializer = RegistrySerializerFactory.genericDeserializer(config);
Serializer<Object> deserializer = SerializerFactory.genericDeserializer(config);
Object deserialized = deserializer.deserialize(serializedAvro);
assertTrue(deserialized instanceof GenericRecord);
deserialized = deserializer.deserialize(serializedProto);
assertTrue(deserialized instanceof DynamicMessage);
deserialized = deserializer.deserialize(serializedJson);
assertTrue(deserialized instanceof WithSchema);

Serializer<String> jsonStringDeserializer = RegistrySerializerFactory.deserializeAsJsonString(config);
Serializer<String> jsonStringDeserializer = SerializerFactory.deserializeAsJsonString(config);
serializedAvro.position(0);
String jsonString = jsonStringDeserializer.deserialize(serializedAvro);
assertNotNull(jsonString);
Expand Down Expand Up @@ -423,24 +423,24 @@ public void testNoEncodingProto() throws IOException {
doAnswer(x -> new SchemaWithVersion(schema1.getSchemaInfo(), versionInfo1)).when(client).getLatestSchemaVersion(anyString(), any());
doAnswer(x -> true).when(client).canReadUsing(anyString(), any());

Serializer<ProtobufTest.Message2> serializer = RegistrySerializerFactory.protobufSerializer(config, schema1);
Serializer<ProtobufTest.Message2> serializer = SerializerFactory.protobufSerializer(config, schema1);
verify(client, never()).getEncodingId(anyString(), any(), any());

ProtobufTest.Message2 message = ProtobufTest.Message2.newBuilder().setName("name").setField1(1).build();
ByteBuffer serialized = serializer.serialize(message);

Serializer<ProtobufTest.Message2> deserializer = RegistrySerializerFactory.protobufDeserializer(config, schema1);
Serializer<ProtobufTest.Message2> deserializer = SerializerFactory.protobufDeserializer(config, schema1);
verify(client, never()).getEncodingInfo(anyString(), any());

ProtobufTest.Message2 deserialized = deserializer.deserialize(serialized);
assertEquals(deserialized, message);

serialized = serializer.serialize(message);
AssertExtensions.assertThrows(IllegalArgumentException.class, () -> RegistrySerializerFactory.protobufGenericDeserializer(config, null));
AssertExtensions.assertThrows(IllegalArgumentException.class, () -> SerializerFactory.protobufGenericDeserializer(config, null));

SchemaInfo latestSchema = client.getLatestSchemaVersion("groupId", null).getSchemaInfo();
ProtobufSchema<DynamicMessage> schemaDynamic = ProtobufSchema.of(latestSchema.getType(), descriptorSet);
Serializer<DynamicMessage> genericDeserializer = RegistrySerializerFactory.protobufGenericDeserializer(config, schemaDynamic);
Serializer<DynamicMessage> genericDeserializer = SerializerFactory.protobufGenericDeserializer(config, schemaDynamic);

DynamicMessage generic = genericDeserializer.deserialize(serialized);
assertEquals(generic.getAllFields().size(), 2);
Expand All @@ -461,19 +461,19 @@ public void testNoEncodingJson() throws IOException {
doAnswer(x -> new SchemaWithVersion(schema1.getSchemaInfo(), versionInfo1)).when(client).getLatestSchemaVersion(anyString(), any());
doAnswer(x -> true).when(client).canReadUsing(anyString(), any());

Serializer<DerivedUser1> serializer = RegistrySerializerFactory.jsonSerializer(config, schema1);
Serializer<DerivedUser1> serializer = SerializerFactory.jsonSerializer(config, schema1);
verify(client, never()).getEncodingId(anyString(), any(), any());
DerivedUser1 user1 = new DerivedUser1("user", new Address("street", "city"), 2, "user1");
ByteBuffer serialized = serializer.serialize(user1);

Serializer<DerivedUser1> deserializer = RegistrySerializerFactory.jsonDeserializer(config, schema1);
Serializer<DerivedUser1> deserializer = SerializerFactory.jsonDeserializer(config, schema1);
verify(client, never()).getEncodingInfo(anyString(), any());
DerivedUser1 deserialized = deserializer.deserialize(serialized);
assertEquals(deserialized, user1);

serialized = serializer.serialize(user1);

Serializer<WithSchema<JsonNode>> genericDeserializer = RegistrySerializerFactory.jsonGenericDeserializer(config);
Serializer<WithSchema<JsonNode>> genericDeserializer = SerializerFactory.jsonGenericDeserializer(config);

WithSchema<JsonNode> generic = genericDeserializer.deserialize(serialized);
assertNotNull(generic.getObject());
Expand Down

0 comments on commit fe0974f

Please sign in to comment.