Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A question regarding parsing algorithm #8

Open
vitalvi opened this issue Nov 9, 2021 · 4 comments
Open

A question regarding parsing algorithm #8

vitalvi opened this issue Nov 9, 2021 · 4 comments

Comments

@vitalvi
Copy link

vitalvi commented Nov 9, 2021

Hello,
I have one question regarding the implementation of parser when schema registry is being used. If I understand correctly on a first message parsing you query for schema and use that schema for parsing all further messages unless there was a parsing error, correct me if I'm wrong. If so, wouldn't it make sense to parse messages with particular schema corresponding to message's schemaId? Because currently, if schema got evolved, let's say a new optional field's been added, it won't appear in the output because message is parseable with old schema.
Thank you!

@kenhys
Copy link
Contributor

kenhys commented Nov 10, 2021

It seems that it can detect changed schema. does not work for you?
https://github.com/fluent-plugins-nursery/fluent-plugin-parser-avro/blob/master/lib/fluent/plugin/parser_avro.rb#L139-L161

If not, it is helpful showing the reproducible steps about issue.

@vitalvi
Copy link
Author

vitalvi commented Nov 10, 2021

First of all a small disclaimer - I'm just trying to figure out how it works, so maybe I totally misunderstand that. So sorry for talking nonsense in advance :)

It seems to be able to detect changed schema, but only if schemas are incompatible. But take a look on this example:

    def test_confluent_registry
      conf = Fluent::Config::Element.new(
        '', '', {'@type' => 'avro'}, [
          Fluent::Config::Element.new('confluent_registry', '', {
                                        'url' => 'http://localhost:8081',
                                        'subject' => 'persons-avro-value',
                                        'schema_key' => 'schema',
                                      }, [])
        ])
      d = create_driver(conf)
      datum = {"firstName" => "Aleen","lastName" => "Terry","birthDate" => 155555555555}
      datum2 = {"firstName" => "Aleen","lastName" => "Terry","birthDate" => 166666666666, "verified" => true}
      schema = Yajl.load(File.read(File.join(__dir__, "..", "data", "persons-avro-value.avsc")))
      schema2 = Yajl.load(File.read(File.join(__dir__, "..", "data", "persons-avro-value2.avsc")))
      encoded = encode_datum(datum, schema.fetch("schema"), true, 1)
      encoded2 = encode_datum(datum2, schema2.fetch("schema"), true, 21)

      # case 1
      d.instance.parse(encoded) do |_time, record|
        assert_equal datum, record
      end

      # case 2
      d.instance.parse(encoded2) do |_time, record|
        assert_equal datum2, record
      end
    end

It is able to parse encoded2 in case2 with verified field set, but after parsing it loses that field because previous schema is still being used.

@kenhys
Copy link
Contributor

kenhys commented Nov 18, 2021

Hmm, something weird.

$ bundle exec ruby -Ilib -Itest test/plugin/test_parser_avro.rb -v  -n /test_confluent_registry$/
Failure: test_confluent_registry(AvroParserTest::SchemaURLTest)
test/plugin/test_parser_avro.rb:480:in `block in test_confluent_registry'
     477:       
     478:       # case 2
     479:       d.instance.parse(encoded2) do |_time, record|
  => 480:         assert_equal datum2, record
     481:       end
     482:     end
     483:   end
/work/fluentd/plugins/fluent-plugin-parser-avro/lib/fluent/plugin/parser_avro.rb:125:in `parse'
test/plugin/test_parser_avro.rb:479:in `test_confluent_registry'
<{"birthDate"=>166666666666,
 "firstName"=>"Aleen",
 "lastName"=>"Terry",
 "verified"=>true}> expected but was
<{"birthDate"=>166666666666, "firstName"=>"Aleen", "lastName"=>"Terry"}>

diff:
? {"birthDate"=>166666666666, "firstName"=>"Aleen", "lastName"=>"Terry"}
-  "firstName"=>"Aleen",
-  "lastName"=>"Terry",
-  "verified"=>true}

@vitalvi
Copy link
Author

vitalvi commented Nov 18, 2021

yes, exactly, record in the second case doesn't contain verified field even though we datum2 does contain it and we used schema2 which contains it as well. I suspect that it's used first schema for decoding in the second case, however in my understanding it should have used second one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants