Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][client][PIP-389] Add a producer config to improve compression performance #23525

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public ByteBuf toByteBuf() {
}
}

ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload());
ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload(true));
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
messageMetadata, encryptedPayload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,21 @@
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.*;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -230,4 +227,47 @@ public void testRetentionPolicyByProducingMessages() throws Exception {
assertEquals(internalStats.ledgers.size(), 1);
});
}


@Test
public void testProducerCompressionMinMsgBodySize() throws PulsarClientException {
byte[] msg1022 = new byte[1022];
byte[] msg1025 = new byte[1025];
final String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topicName)
.producerName("producer")
.compressionType(CompressionType.LZ4)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("sub")
.subscribe();

producer.conf.setCompressMinMsgBodySize(1024);
producer.conf.setCompressionType(CompressionType.LZ4);
// disable batch
producer.conf.setBatchingEnabled(false);
producer.newMessage().value(msg1022).send();
MessageImpl<byte[]> message = (MessageImpl<byte[]>) consumer.receive();
CompressionType compressionType = message.getCompressionType();
assertEquals(compressionType, CompressionType.NONE);
producer.newMessage().value(msg1025).send();
message = (MessageImpl<byte[]>) consumer.receive();
compressionType = message.getCompressionType();
assertEquals(compressionType, CompressionType.LZ4);

// enable batch
producer.conf.setBatchingEnabled(true);
producer.newMessage().value(msg1022).send();
message = (MessageImpl<byte[]>) consumer.receive();
compressionType = message.getCompressionType();
assertEquals(compressionType, CompressionType.NONE);
producer.newMessage().value(msg1025).send();
message = (MessageImpl<byte[]>) consumer.receive();
compressionType = message.getCompressionType();
assertEquals(compressionType, CompressionType.LZ4);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public boolean add(MessageImpl<?> msg, SendCallback callback) {
return isBatchFull();
}

protected ByteBuf getCompressedBatchMetadataAndPayload() {
protected ByteBuf getCompressedBatchMetadataAndPayload(boolean isBrokerTwoPhaseCompactor) {
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
int batchWriteIndex = batchedMessageMetadataAndPayload.writerIndex();
int batchReadIndex = batchedMessageMetadataAndPayload.readerIndex();

Expand Down Expand Up @@ -169,9 +169,20 @@ protected ByteBuf getCompressedBatchMetadataAndPayload() {
}

int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes();
ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
batchedMessageMetadataAndPayload.release();
if (compressionType != CompressionType.NONE) {
ByteBuf compressedPayload;
boolean isCompressed = false;
if (!isBrokerTwoPhaseCompactor && producer != null){
if (uncompressedSize > producer.conf.getCompressMinMsgBodySize()) {
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
compressedPayload = producer.applyCompression(batchedMessageMetadataAndPayload);
isCompressed = true;
} else {
compressedPayload = batchedMessageMetadataAndPayload;
}
} else {
compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
batchedMessageMetadataAndPayload.release();
}
if (compressionType != CompressionType.NONE && isCompressed) {
messageMetadata.setCompression(compressionType);
messageMetadata.setUncompressedSize(uncompressedSize);
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down Expand Up @@ -252,7 +263,8 @@ public OpSendMsg createOpSendMsg() throws IOException {
if (messages.size() == 1) {
messageMetadata.clear();
messageMetadata.copyFrom(messages.get(0).getMessageBuilder());
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
getCompressedBatchMetadataAndPayload(false));
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(),
1, null, messageMetadata, encryptedPayload);
Expand Down Expand Up @@ -283,7 +295,8 @@ public OpSendMsg createOpSendMsg() throws IOException {
lowestSequenceId = -1L;
return op;
}
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
getCompressedBatchMetadataAndPayload(false));
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
if (encryptedPayload.readableBytes() > getMaxMessageSize()) {
producer.semaphoreRelease(messages.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.schema.AbstractSchema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
Expand Down Expand Up @@ -773,6 +769,10 @@ int getUncompressedSize() {
return uncompressedSize;
}

CompressionType getCompressionType() {
return CompressionType.valueOf(msgMetadata.getCompression().name());
}

SchemaState getSchemaState() {
return schemaState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,8 @@ CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transa
* @param payload
* @return a new payload
*/
private ByteBuf applyCompression(ByteBuf payload) {
@VisibleForTesting
public ByteBuf applyCompression(ByteBuf payload) {
ByteBuf compressedPayload = compressor.encode(payload);
payload.release();
return compressedPayload;
Expand All @@ -505,22 +506,27 @@ public void sendAsync(Message<?> message, SendCallback callback) {
boolean compressed = false;
// Batch will be compressed when closed
// If a message has a delayed delivery time, we'll always send it individually
if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {
compressedPayload = applyCompression(payload);
compressed = true;
if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()))) {
if (payload.readableBytes() < conf.getCompressMinMsgBodySize()) {

// validate msg-size (For batching this will be check at the batch completion size)
int compressedSize = compressedPayload.readableBytes();
if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
compressedPayload.release();
String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : "";
PulsarClientException.InvalidMessageException invalidMessageException =
new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds"
+ " %d bytes",
producerName, topic, compressedStr, compressedSize, getMaxMessageSize()));
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
return;
} else {
compressedPayload = applyCompression(payload);
compressed = true;

// validate msg-size (For batching this will be check at the batch completion size)
int compressedSize = compressedPayload.readableBytes();
if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
compressedPayload.release();
String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : "";
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
PulsarClientException.InvalidMessageException invalidMessageException =
new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds"
+ " %d bytes",
producerName, topic, compressedStr, compressedSize,
getMaxMessageSize()));
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
return;
}
}
}

Expand All @@ -542,7 +548,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {

// Update the message metadata before computing the payload chunk size to avoid a large message cannot be split
// into chunks.
updateMessageMetadata(msgMetadata, uncompressedSize);
updateMessageMetadata(msgMetadata, uncompressedSize, compressed);

// send in chunks
int totalChunks;
Expand Down Expand Up @@ -636,7 +642,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
* @param uncompressedSize
* @return the sequence id
*/
private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) {
private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize, boolean isCompressed) {
if (!msgMetadata.hasPublishTime()) {
msgMetadata.setPublishTime(client.getClientClock().millis());

Expand All @@ -646,7 +652,7 @@ private void updateMessageMetadata(final MessageMetadata msgMetadata, final int

// The field "uncompressedSize" is zero means the compression info were not set yet.
if (msgMetadata.getUncompressedSize() <= 0) {
if (conf.getCompressionType() != CompressionType.NONE) {
if (conf.getCompressionType() != CompressionType.NONE && isCompressed) {
msgMetadata
.setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
}
Expand Down Expand Up @@ -737,7 +743,7 @@ private void serializeAndSendMessage(MessageImpl<?> msg,
} else {
// in this case compression has not been applied by the caller
// but we have to compress the payload if compression is configured
if (!compressed) {
if (!compressed && chunkPayload.readableBytes() > conf.getCompressMinMsgBodySize()) {
chunkPayload = applyCompression(chunkPayload);
}
ByteBuf encryptedPayload = encryptMessage(msgMetadata, chunkPayload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
)
private CompressionType compressionType = CompressionType.NONE;

private int compressMinMsgBodySize = 4 * 1024; // 4kb
lhotari marked this conversation as resolved.
Show resolved Hide resolved

// Cannot use Optional<Long> since it's not serializable
private Long initialSequenceId = null;

Expand Down
Loading