Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34996][Connectors/Kafka] Use UserCodeCL to instantiate Deserializer #89

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

hugogu
Copy link

@hugogu hugogu commented Apr 3, 2024

So that when connector-kafka installed into Flink Libs, it can work with a Customer Deserializer class defines in User Code.

This is useful with work with Spring Kafka JsonDeserializer. JsonDeserializer is a generic class and can't be used to build a KafkaSource directly because the Generic Type information required would lost.

A typical use case of JsonDeserializer looks like following

  • Declare a sub class of JsonDeserializer with customized object mapper (which is in User Jar for sure)
    class AccountBalanceActivityDeserializer :
        JsonDeserializer<AccountBalanceActivity>(SpringObjectMapperSupplier.INSTANCE)
  • Use it to build a KafkaSource
        @Bean
        fun source(): KafkaSource<AccountBalanceActivity> {
            return KafkaSource.builder<AccountBalanceActivity>()
                .setBootstrapServers("kafka:9092")
                .setTopics("BalanceActivity")
                .setGroupId("FraudDetection")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(AccountBalanceActivityDeserializer::class.java))
                .build()
        }

This fix is to make above sample use pattern work when connector-kafka installed into Flink Libs rather than included in user jar.

Copy link

boring-cyborg bot commented Apr 3, 2024

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

Copy link
Contributor

@MartijnVisser MartijnVisser left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we somehow add a test for this? I'm not too fond of single line changes without actually adding a test

@hugogu
Copy link
Author

hugogu commented Apr 3, 2024

@MartijnVisser Thanks for the quick response. There are existing tests covering this class and exactly this line of code and I also verified it pass. I'm happy to add more tests to ensure the user code class loader is being used.

image

@MartijnVisser
Copy link
Contributor

There are existing tests covering this class and exactly this line of code and I also verified it pass. I'm happy to add more tests to ensure the user code class loader is being used.

I don't think that's correct, because if there were existing tests I would have expected those to fail because of your change.

@hugogu
Copy link
Author

hugogu commented Apr 3, 2024

@MartijnVisser True. I added a couple of tests that would fail without my change. Hope it looks good now. Please let me know if anything else is needed.

@hugogu hugogu force-pushed the deserializer-fix branch from f55e257 to 68da758 Compare April 3, 2024 14:00
@hugogu
Copy link
Author

hugogu commented Apr 3, 2024

@MartijnVisser May I have your advice on the use of TemporaryClassLoaderContext in the implementation? It looks unnecessary to me. Do you think I should just remove it in this PR?

    public void open(InitializationContext context) throws Exception {
        final ClassLoader userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader();
        try (TemporaryClassLoaderContext ignored =
                TemporaryClassLoaderContext.of(userCodeClassLoader)) {
            serializer =
                    InstantiationUtil.instantiate(
                            serializerClass.getName(),
                            Serializer.class,
                            userCodeClassLoader);

            if (serializer instanceof Configurable) {
                ((Configurable) serializer).configure(config);
            } else {
                serializer.configure(config, isKey);
            }
        } catch (Exception e) {
            throw new IOException("Failed to instantiate the serializer of class " + serializer, e);
        }
    }

@hugogu hugogu force-pushed the deserializer-fix branch 2 times, most recently from 2631db7 to e308757 Compare April 4, 2024 13:52
@hugogu
Copy link
Author

hugogu commented Apr 5, 2024

@MartijnVisser I also fix a few style errors, please let me know if it is good now or any other change to make. Thanks for your review.

@hugogu hugogu force-pushed the deserializer-fix branch from e308757 to 8986f77 Compare April 7, 2024 10:08
@morazow
Copy link

morazow commented Apr 8, 2024

Thanks @hugogu for the PR! I have added some comments, please have a look.

Do you know why the CI is failing?

@morazow
Copy link

morazow commented Apr 9, 2024

Looks good, thanks @hugogu for quick update! Hopefully the CI is green this time 🤞

@hugogu hugogu force-pushed the deserializer-fix branch from 461a3e3 to 1f24b62 Compare April 9, 2024 09:18
@hugogu hugogu force-pushed the deserializer-fix branch from 1f24b62 to b288080 Compare April 11, 2024 13:45
@hugogu
Copy link
Author

hugogu commented Apr 11, 2024

@morazow Thanks for your approval. I have just rebased this PR to include the build fix made in main branch. Hopefully the build would success this time.

I also noticed that v3.1 branch, may I know if this fix can be included? Shall I raise another PR for that? Not quite sure about the release procedure yet.

cc @MartijnVisser

@MartijnVisser MartijnVisser requested review from uce and rmetzger April 11, 2024 16:48
@hugogu
Copy link
Author

hugogu commented Apr 19, 2024

Hi @uce and @rmetzger, do you mind reviewing the change and let me know if any change is required before merging. Thanks.

@hugogu
Copy link
Author

hugogu commented Jun 23, 2024

Is there any other suggestions or problems I can fix before getting this PR merged please?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants