From a8df8c41fca52b80dd8dc14a8f74696d4015b52b Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Sun, 11 Jul 2021 18:15:40 +0200 Subject: [PATCH] feat: add zstd compression support --- README.md | 22 ++++++++++++++-------- changelog.md | 3 +++ include/kpro_private.hrl | 6 ++++++ rebar.config | 3 ++- src/kpro.erl | 6 +++--- src/kpro_compress.erl | 22 ++++++++++++++-------- test/kpro_batch_tests.erl | 2 +- test/kpro_produce_tests.erl | 13 +++++++++++-- 8 files changed, 54 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 49aaec2..aac80c5 100644 --- a/README.md +++ b/README.md @@ -11,14 +11,18 @@ This library provides: See [brod](https://github.com/kafka4beam/brod) for a complete kafka client implementation. -## Compression Support +## Compression -Since 4.0, this lib no longer includes [snappyer](https://github.com/kafka4beam/snappyer) and -[lz4b](https://github.com/kafka4beam/lz4b) as rebar dependencies. -However `kafka_protocol` still defaults to use `snappyer` and `lz4b_frame` for compress and -decompress. +Since version 4.0, `kafka_protocol` no longer includes compression libraries as dependencies. +You must add your desired dependencies to the wrapping project's rebar or mix config. -### Provide compression module overrides +| Compression Algorithm | Default Library | +|-----------------------|----------------------------------------------------| +| Snappy | [snappyer](https://github.com/kafka4beam/snappyer) | +| Lz4 | [lz4b](https://github.com/kafka4beam/lz4b) | +| Zstd | [zstd](https://github.com/silviucpp/ezstd) | + +### Override default compression dependencies User may override default compression libs with modules having below APIs implemented: @@ -31,12 +35,14 @@ There are two approaches to inject such dynamic dependencies to `kakfa_protocol` #### Set application environment -e.g. Set `{provide_compression, [{snappy, my_snappy_module}, {lz4, my_lz4_module}]}` +e.g. Set `{provide_compression, [{snappy, my_snappy_module}, {lz4, my_lz4_module}, {zstd, my_zstd_module}]}` in `kafka_protocol` application environment, (or provide from sys.config). +Starting from 4.2, the compression modules are cached in `persistent_term`, which can be overridden by calling `kpro:provide_compression`. + #### Call `kpro:provide_compression` -e.g. `kpro:provide_compression([{snappy, my_snappy_module}, {lz4, my_lz4_module}]).` +e.g. `kpro:provide_compression([{snappy, my_snappy_module}, {lz4, my_lz4_module}, {zstd, my_zstd_module}]).` ## Test (`make eunit`) diff --git a/changelog.md b/changelog.md index 88dad00..26a3549 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,6 @@ +* 4.2.0 + - Add support for `zstd` compression codec + * 4.1.10 - Resolve timeout value for discover and connect - partition leader diff --git a/include/kpro_private.hrl b/include/kpro_private.hrl index d80e17f..4e4bbbe 100644 --- a/include/kpro_private.hrl +++ b/include/kpro_private.hrl @@ -24,12 +24,14 @@ -define(gzip, gzip). -define(snappy, snappy). -define(lz4, lz4). +-define(zstd, zstd). %% Compression attributes -define(KPRO_COMPRESS_NONE, 0). -define(KPRO_COMPRESS_GZIP, 1). -define(KPRO_COMPRESS_SNAPPY, 2). -define(KPRO_COMPRESS_LZ4, 3). +-define(KPRO_COMPRESS_ZSTD, 4). -define(KPRO_COMPRESSION_MASK, 2#111). -define(KPRO_IS_GZIP_ATTR(ATTR), @@ -38,6 +40,9 @@ ((?KPRO_COMPRESSION_MASK band ATTR) =:= ?KPRO_COMPRESS_SNAPPY)). -define(KPRO_IS_LZ4_ATTR(ATTR), ((?KPRO_COMPRESSION_MASK band ATTR) =:= ?KPRO_COMPRESS_LZ4)). +-define(KPRO_IS_ZSTD_ATTR(ATTR), + ((?KPRO_COMPRESSION_MASK band ATTR) =:= ?KPRO_COMPRESS_ZSTD)). + -define(KPRO_TS_TYPE_CREATE, 0). -define(KPRO_TS_TYPE_APPEND, 2#1000). @@ -95,6 +100,7 @@ -define(KAFKA_0_11, 11). -define(KAFKA_1_0, 100). -define(KAFKA_1_1, 110). +-define(KAFKA_2_1, 210). -ifdef(OTP_RELEASE). -define(BIND_STACKTRACE(Var), :Var). diff --git a/rebar.config b/rebar.config index 8e3bc05..0789ff8 100644 --- a/rebar.config +++ b/rebar.config @@ -4,7 +4,8 @@ [ { test, [ {deps, [ {snappyer, "1.2.9"}, - {lz4b, "0.0.11"} + {lz4b, "0.0.11"}, + {ezstd, "1.1.0"} ]} ] } diff --git a/src/kpro.erl b/src/kpro.erl index a9c34a5..cedfd79 100644 --- a/src/kpro.erl +++ b/src/kpro.erl @@ -267,9 +267,9 @@ %%%_* APIs ===================================================================== -%% @doc Set snappy or lz4 compression modules. -%% This should override the default usage of `snappyer' and `lz4b_frame'. --spec provide_compression([{?snappy | ?lz4, module()}]) -> ok. +%% @doc Set `snappy', `lz4' or `zstd' compression modules. +%% The module must implement `compress/1' and and `decompress/1'. +-spec provide_compression([{?snappy | ?lz4 | ?zstd, module()}]) -> ok. provide_compression(Modules) -> kpro_compress:provide(Modules). %% Get batch magic version from produce API version. diff --git a/src/kpro_compress.erl b/src/kpro_compress.erl index 1176a74..dc63ab2 100644 --- a/src/kpro_compress.erl +++ b/src/kpro_compress.erl @@ -1,4 +1,5 @@ %%% Copyright (c) 2018-2021, Klarna Bank AB (publ) +%%% Copyright (c) 2022-2025, Kafka4beam contributors. %%% %%% Licensed under the Apache License, Version 2.0 (the "License"); %%% you may not use this file except in compliance with the License. @@ -24,9 +25,9 @@ -include("kpro_private.hrl"). -%% @doc Set snappy or lz4 compression modules. -%% This should override the default usage of `snappyer' and `lz4b_frame'. --spec provide([{snappy | lz4, module()}]) -> ok. +%% @doc Set snappy, lz4 or zstd compression modules. +%% This should override the default usage of `snappyer', `lz4b_frame' and `ezstd'. +-spec provide([{snappy | lz4 | zstd, module()}]) -> ok. provide(Libs) -> lists:foreach(fun({Name, Module}) -> persistent_term:put({?MODULE, Name}, Module) @@ -37,12 +38,14 @@ provide(Libs) -> codec_to_method(A) when ?KPRO_IS_GZIP_ATTR(A) -> ?gzip; codec_to_method(A) when ?KPRO_IS_SNAPPY_ATTR(A) -> ?snappy; codec_to_method(A) when ?KPRO_IS_LZ4_ATTR(A) -> ?lz4; +codec_to_method(A) when ?KPRO_IS_ZSTD_ATTR(A) -> ?zstd; codec_to_method(_) -> ?no_compression. %% @doc Translate compression method to bits for kafka batch attributes. method_to_codec(?gzip) -> ?KPRO_COMPRESS_GZIP; method_to_codec(?snappy) -> ?KPRO_COMPRESS_SNAPPY; method_to_codec(?lz4) -> ?KPRO_COMPRESS_LZ4; +method_to_codec(?zstd) -> ?KPRO_COMPRESS_ZSTD; method_to_codec(?no_compression) -> ?KPRO_COMPRESS_NONE. %% @doc Compress encoded batch. @@ -56,7 +59,8 @@ compress(Name, IoData) -> do_compress(Name, IoData). decompress(?no_compression, Bin) -> Bin; decompress(?gzip, Bin) -> zlib:gunzip(Bin); decompress(?snappy, Bin) -> java_snappy_unpack(Bin); -decompress(?lz4, Bin) -> do_decompress(?lz4, Bin). +decompress(?lz4, Bin) -> do_decompress(?lz4, Bin); +decompress(?zstd, Bin) -> do_decompress(?zstd, Bin). %%%_* Internals ================================================================ @@ -90,10 +94,12 @@ do_decompress(Name, Bin) -> Module = get_module(Name), iodata(Module:decompress(Bin)). -get_module(snappy) -> - get_module(snappy, snappyer); -get_module(lz4) -> - get_module(lz4, lz4b_frame). +get_module(?snappy) -> + get_module(?snappy, snappyer); +get_module(?lz4) -> + get_module(?lz4, lz4b_frame); +get_module(?zstd) -> + get_module(?zstd, ezstd). get_module(Name, Default) -> persistent_term:get({?MODULE, Name}, Default). diff --git a/test/kpro_batch_tests.erl b/test/kpro_batch_tests.erl index 69a16f1..fa053e9 100644 --- a/test/kpro_batch_tests.erl +++ b/test/kpro_batch_tests.erl @@ -40,7 +40,7 @@ encode_decode_test_() -> ?assertMatch(<<"v">>, Value) end, MagicVersions = [0, 1, 2], - CompressionOpts = [no_compression, gzip, snappy, lz4], + CompressionOpts = [no_compression, gzip, snappy, lz4, zstd], [{atom_to_list(CompressionOpt), " magic v" ++ integer_to_list(MagicV), fun() -> F(MagicV, CompressionOpt) end} || CompressionOpt <- CompressionOpts, diff --git a/test/kpro_produce_tests.erl b/test/kpro_produce_tests.erl index 2352671..9cead4f 100644 --- a/test/kpro_produce_tests.erl +++ b/test/kpro_produce_tests.erl @@ -96,13 +96,13 @@ non_monotoic_ts_in_batch_test() -> end. %% batches can be encoded by caller before making a produce request -encode_batch_beforehand_test() -> +encode_batch_beforehand(Compression) -> {_, Vsn} = get_api_vsn_range(), Batch = [#{ts => kpro_lib:now_ts(), value => make_value(?LINE), headers => []}], Magic = kpro_lib:produce_api_vsn_to_magic_vsn(Vsn), - Bin = kpro:encode_batch(Magic, Batch, no_compression), + Bin = kpro:encode_batch(Magic, Batch, Compression), Req = kpro_req_lib:produce(Vsn, topic(), ?PARTI, Bin), with_connection( fun(Pid) -> @@ -110,6 +110,15 @@ encode_batch_beforehand_test() -> ?ASSERT_RESPONSE_NO_ERROR(Vsn, Rsp) end). +encode_batch_beforehand_test_() -> + Methods0 = [?no_compression, ?gzip, ?snappy], + Methods = case kpro_test_lib:get_kafka_version() >= ?KAFKA_2_1 of + true -> Methods0 ++ [?zstd]; + false -> Methods0 + end, + [{atom_to_list(Method), fun() -> encode_batch_beforehand(Method) end} + || Method <- Methods]. + %% async send test async_send_test() -> {_, Vsn} = get_api_vsn_range(),