Skip to content

Commit

Permalink
Merge pull request #84 from sajinieKavindya/main
Browse files Browse the repository at this point in the history
Add Avro Logical Type support
  • Loading branch information
sajinieKavindya authored Jul 11, 2024
2 parents d9ac560 + 4e9fa53 commit 217d791
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 26 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ data to topics and consumers read from topics. For more information on Apache Ka

| Connector version | Supported Kafka version | Supported WSO2 ESB/EI version |
|-----------------------------------------------------------------------------------------------------------------------| ---------------|------------- |
| [3.2.0](https://github.com/wso2-extensions/esb-connector-kafka/tree/v3.2.0) | kafka_2.12-2.8.2 | MI 4.x.x |
| [3.3.0](https://github.com/wso2-extensions/esb-connector-kafka/tree/v3.2.0) | kafka_2.12-2.8.2 | MI 4.x.x |
| [3.2.0](https://github.com/wso2-extensions/esb-connector-kafka/tree/v3.3.0) | kafka_2.12-2.8.2 | MI 4.x.x |
| [3.1.1](https://github.com/wso2-extensions/esb-connector-kafka/tree/v3.1.1) | kafka_2.12-1.0.0 | EI 7.1.0, EI 7.0.x, EI 6.6.0 |
| [3.1.0](https://github.com/wso2-extensions/esb-connector-kafka/tree/v3.1.0) | kafka_2.12-1.0.0 | EI 7.1.0, EI 7.0.x, EI 6.6.0 |
| [3.0.0](https://github.com/wso2-extensions/esb-connector-kafka/tree/v3.0.0) | kafka_2.12-1.0.0 | EI 6.6.0 |
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.wso2.carbon.connector</groupId>
<artifactId>org.wso2.carbon.connector.${connector.name}</artifactId>
<version>3.2.0</version>
<version>3.3.0</version>
<packaging>jar</packaging>
<name>WSO2 Carbon - Mediation Library Connector For kafkaTransport</name>
<url>http://wso2.org</url>
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/org/wso2/carbon/connector/KafkaConnectConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.wso2.carbon.connector;

import java.util.TimeZone;

public class KafkaConnectConstants {

public static final String CONNECTOR_NAME = "kafka";
Expand Down Expand Up @@ -230,4 +232,12 @@ public class KafkaConnectConstants {
public static final String AUTO_REGISTER_SCHEMAS = "auto.register.schemas";

public static final String SUBJECT_NAME_STRATEGY = "value.subject.name.strategy";

public static final TimeZone LOCAL_TZ = TimeZone.getDefault();
public static final String REGEX_FOR_MILLIS_PART_WITH_TIME_ZONE = "(\\.\\d{1,3})(?=Z|[-+]\\d{2}:?\\d{2}$|$)";
public static final String REGEX_FOR_MICROS_PART_WITH_TIME_ZONE = "(\\.\\d{1,6})(?=Z|[-+]\\d{2}:?\\d{2}$|$)";
public static final String REGEX_FOR_MILLIS_PART_WITHOUT_TIME_ZONE = "(\\.\\d{1,3})(?=\\D|$)";
public static final String REGEX_FOR_MICROS_PART_WITHOUT_TIME_ZONE = "(\\.\\d{1,6})(?=\\D|$)";
public static final String LOGICAL_TYPE_DECIMAL = "decimal";

}
187 changes: 163 additions & 24 deletions src/main/java/org/wso2/carbon/connector/KafkaProduceConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.gson.Gson;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -35,6 +36,7 @@
import org.apache.axis2.util.MessageProcessorSelector;
import org.apache.commons.io.output.WriterOutputStream;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand All @@ -59,7 +61,18 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringWriter;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.text.ParseException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand All @@ -69,6 +82,8 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -383,19 +398,9 @@ private Object convertToAvroObject(String jsonString, Schema schema) {
case BOOLEAN:
return Boolean.valueOf(jsonString);
case INT:
try {
return Integer.valueOf(jsonString);
} catch (NumberFormatException e) {
throw new SerializationException(
"Error serializing Avro message of type int for input: " + jsonString, e);
}
return handleTypeInt(jsonString, schema);
case LONG:
try {
return Long.valueOf(jsonString);
} catch (NumberFormatException e) {
throw new SerializationException(
"Error serializing Avro message of type long for input: " + jsonString, e);
}
return handleTypeLong(jsonString, schema);
case FLOAT:
try {
return Float.valueOf(jsonString);
Expand Down Expand Up @@ -442,6 +447,14 @@ private Object handleComplexAvroTypes(Object jsonString, Schema schema) {
boolean usingDefault = false;
switch (schema.getType()) {
case BYTES:
if (KafkaConnectConstants.LOGICAL_TYPE_DECIMAL.equals(schema.getLogicalType().getName())) {
try {
return Utils.convertDecimalToBytes(schema, new BigDecimal(jsonString.toString()));
} catch (NumberFormatException e) {
throw new SerializationException(
"Error serializing Avro message of type bytes with logicalType decimal for input: " + jsonString);
}
}
return String.valueOf(jsonString).getBytes();
case RECORD:
return convertToGenericRecord(String.valueOf(jsonString), getNonNullUnionSchema(schema));
Expand Down Expand Up @@ -535,21 +548,19 @@ private Object handleComplexAvroTypes(Object jsonString, Schema schema) {
}
return new GenericData.EnumSymbol(schema, jsonString);
case FIXED:
if (KafkaConnectConstants.LOGICAL_TYPE_DECIMAL.equals(schema.getLogicalType().getName())) {
try {
return Utils.convertDecimalToFixed(schema, new BigDecimal(jsonString.toString()));
} catch (NumberFormatException e) {
throw new SerializationException(
"Error serializing Avro message of type fixed with logicalType decimal for input: " + jsonString);
}
}
return new GenericData.Fixed(schema, String.valueOf(jsonString).getBytes());
case LONG:
try {
return Long.valueOf(jsonString.toString());
} catch (NumberFormatException e) {
throw new SerializationException(
"Error serializing Avro message of type long for input: " + jsonString, e);
}
return handleTypeLong(jsonString.toString(), schema);
case INT:
try {
return new Integer(jsonString.toString());
} catch (NumberFormatException e) {
throw new SerializationException(
"Error serializing Avro message of type int for input: " + jsonString, e);
}
return handleTypeInt(jsonString.toString(), schema);
case FLOAT:
try {
return Float.valueOf(jsonString.toString());
Expand Down Expand Up @@ -584,6 +595,134 @@ private Object handleComplexAvroTypes(Object jsonString, Schema schema) {
}
}

private Object handleTypeInt(String input, Schema schema) {
if (LogicalTypes.date().equals(schema.getLogicalType())) {
try {
Date date = DateUtils.parseDate(input, "yyyy-MM-dd");
return Utils.convertFromDate(date);
} catch (ParseException e) {
throw new SerializationException("Error serializing Avro message of type int with logicalType "
+ "date for input: " + input + ". The input needs to be in the 'yyyy-MM-dd' format.");
}
}
if (LogicalTypes.timeMillis().equals(schema.getLogicalType())) {
try {
String timeString = preprocessTimeString(input, Utils.PATTERN_FOR_MILLIS_PART_WITHOUT_TIME_ZONE, "3");
Date date = DateUtils.parseDate(timeString, "HH:mm:ss.SSS");
return Utils.convertFromTimeMillis(date);
} catch (ParseException e) {
throw new SerializationException("Error serializing Avro message of type int with logicalType "
+ "time-millis for input: " + input + ". The input needs to be in the 'HH:mm:ss.SSS' format.");
}
}
try {
return new Integer(input);
} catch (NumberFormatException e) {
throw new SerializationException(
"Error serializing Avro message of type int for input: " + input, e);
}
}

private Object handleTypeLong(String input, Schema schema) {
if (LogicalTypes.timeMicros().equals(schema.getLogicalType())) {
try {
String timeString = preprocessTimeString(input, Utils.PATTERN_FOR_MICROS_PART_WITHOUT_TIME_ZONE, "6");
Date date = DateUtils.parseDate(timeString, "HH:mm:ss.SSSSSS");
return Utils.convertFromTimeMicros(date);
} catch (ParseException e) {
throw new SerializationException("Error serializing Avro message of type long with logicalType "
+ "time-micros for input: " + input + ". The input needs to be in the 'HH:mm:ss.SSSSSS' format.");
}
}
if (LogicalTypes.timestampMillis().equals(schema.getLogicalType())) {
try {
String timestampString = preprocessTimeString(input, Utils.PATTERN_FOR_MILLIS_PART_WITH_TIME_ZONE, "3");
// Parse the date string to an Instant
Instant instant = Instant.parse(timestampString);
// Convert the Instant to milliseconds since the Unix epoch
return instant.toEpochMilli();

} catch (DateTimeParseException e) {
throw new SerializationException("Error serializing Avro message of type long with logicalType "
+ "timestamp-millis for input: " + input
+ ". The input needs to be in the yyyy-MM-dd'T'HH:mm:ss.SSS'Z' format.");
}
}
if (LogicalTypes.timestampMicros().equals(schema.getLogicalType())) {
try {
String timestampString = preprocessTimeString(input, Utils.PATTERN_FOR_MICROS_PART_WITH_TIME_ZONE, "6");
// Parse the date string to an Instant
Instant instant = Instant.parse(timestampString);
// Convert the Instant to microseconds since the Unix epoch
return ChronoUnit.MICROS.between(Instant.EPOCH, instant);

} catch (DateTimeParseException e) {
throw new SerializationException("Error serializing Avro message of type long with logicalType "
+ "timestamp-micros for input: " + input
+ ". The input needs to be in the yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z' format.");
}
}
if (LogicalTypes.localTimestampMillis().equals(schema.getLogicalType())) {
try {
String timestampString = preprocessTimeString(input, Utils.PATTERN_FOR_MILLIS_PART_WITHOUT_TIME_ZONE, "3");
return Utils.getInstantForLocalTimestamp(timestampString, "yyyy-MM-dd'T'HH:mm:ss.SSS").toEpochMilli();

} catch (DateTimeParseException e) {
throw new SerializationException("Error serializing Avro message of type long with logicalType "
+ "local-timestamp-millis for input: " + input
+ ". The input needs to be in the 'yyyy-MM-dd'T'HH:mm:ss.SSS' format.");
}
}
if (LogicalTypes.localTimestampMicros().equals(schema.getLogicalType())) {
try {
String timestampString = preprocessTimeString(input, Utils.PATTERN_FOR_MICROS_PART_WITHOUT_TIME_ZONE, "6");
return ChronoUnit.MICROS.between(Instant.EPOCH, Utils.getInstantForLocalTimestamp(timestampString,
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS"));

} catch (DateTimeParseException e) {
throw new SerializationException("Error serializing Avro message of type long with logicalType "
+ "local-timestamp-micros for input: " + input
+ ". The input needs to be in the 'yyyy-MM-dd'T'HH:mm:ss.SSSSSS' format.");
}
}
try {
return Long.valueOf(input);
} catch (NumberFormatException e) {
throw new SerializationException(
"Error serializing Avro message of type long for input: " + input, e);
}
}

/**
* Preprocesses a time string to ensure that the milliseconds or microseconds part has exactly
* the specified number of digits.
* <p>
* This method searches for the milliseconds or microseconds part in the given time string using
* the provided regular expression. If the part is found, it pads it with zeros to ensure it has exactly
* the specified number of digits.
*
* @param timeString the original time string to be processed.
* @param pattern the pattern to find the milliseconds or microseconds part in the time string.
* @param noOfDigit the number of digits that the milliseconds or microseconds part should have after processing.
* @return the processed time string with the milliseconds or microseconds part formatted to the specified
* number of digits, or the original time string if the part is not found.
*/
private static String preprocessTimeString(String timeString, Pattern pattern, String noOfDigit) {
Matcher matcher = pattern.matcher(timeString);

if (matcher.find()) {
// Extract milli or microseconds part without the dot
String matchingPart = matcher.group(1).substring(1);
// Pad with zeros to ensure it has exactly the given no of digits
matchingPart = String.format("%-" + noOfDigit + "s", matchingPart).replace(' ', '0');
// Replace the original milli/microseconds part with the formatted one
return matcher.replaceFirst("." + matchingPart);
} else {
// If there's no milli/microseconds part, return the original string
return timeString;
}
}

/**
* Check whether the string is boolean type
*
Expand Down
Loading

0 comments on commit 217d791

Please sign in to comment.