Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Dec 11, 2024
1 parent bfef801 commit a02bef1
Show file tree
Hide file tree
Showing 28 changed files with 48 additions and 989 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ At a minimum, to configure this, you will need:
* `API_KEY`
* `API_SECRET`

More specifically the plugin has to be configured as follow:
More specifically the procedures has to be configured as follows:

.neo4j.conf
[source,ini]
Expand Down
115 changes: 10 additions & 105 deletions docs/asciidoc/modules/ROOT/pages/database-integration/kafka/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,30 @@

[[kafka]]

ifdef::env-docs[]
[abstract]
--
Get started fast for common scenarios, using APOC Kafka plugin or Kafka Connect plugin
--
endif::env-docs[]

[[apoc_neo4j_plugin_quickstart]]
== APOC Kafka Plugin
== APOC Kafka Procedures

Any configuration option that starts with `apoc.kafka.` controls how the plugin itself behaves. For a full
list of options available, see the documentation subsections on the xref:database-integration/kafka/producer.adoc[source] and xref:database-integration/kafka/consumer.adoc#apoc_kafka_sink[sink].
NOTE: to enable the Kafka dependencies we need to set the APOC configuration `apoc.kafka.enabled=true`

=== Install the Plugin
Any configuration option that starts with `apoc.kafka.` controls how the procedures itself behaves.

This dependency is included in https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/{apoc-release}/apoc-kafka-dependencies-{apoc-release}-all.jar[apoc-kafka-dependencies-{apoc-release}-all.jar^], which can be downloaded from the https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/tag/{apoc-release}[releases page^].
=== Install dependencies

The Kafka dependencies are included in https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/{apoc-release}/apoc-kafka-dependencies-{apoc-release}-all.jar[apoc-kafka-dependencies-{apoc-release}-all.jar^], which can be downloaded from the https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/tag/{apoc-release}[releases page^].
Once that file is downloaded, it should be placed in the `plugins` directory and the Neo4j Server restarted.

[[kafka-settings]]
=== Kafka settings

Any configuration option that starts with `apoc.kafka.` will be passed to the underlying Kafka driver. Neo4j
Kafka plugin uses the official Confluent Kafka producer and consumer java clients.
Kafka procedures uses the official Confluent Kafka producer and consumer java clients.
Configuration settings which are valid for those connectors will also work for APOC Kafka.

For example, in the Kafka documentation linked below, the configuration setting named `batch.size` should be stated as
`apoc.kafka.batch.size` in APOC Kafka.

The following are common configuration settings you may wish to use. _This is not a complete
list_. The full list of configuration options and reference material is available from Confluent's
site for link:{url-confluent-install}/configuration/consumer-configs.html[sink configurations] and
link:{url-confluent-install}/configuration/producer-configs.html[source configurations].

The following are common configuration settings you may wish to use.
.Most Common Needed Configuration Settings
|===
|Setting Name |Description |Default Value
Expand All @@ -46,7 +37,7 @@ larger transactions in Neo4j memory and may improve throughput.

|apoc.kafka.buffer.memory
|The total bytes of memory the producer can use to buffer records waiting. Use this to adjust
how much memory the plugin may require to hold messages not yet delivered to Neo4j
how much memory the procedures may require to hold messages not yet delivered to Neo4j
|33554432

|apoc.kafka.batch.size
Expand Down Expand Up @@ -75,94 +66,8 @@ apoc.kafka.bootstrap.servers=localhost:9092
If you are using Confluent Cloud (managed Kafka), you can connect to Kafka as described in
the xref:database-integration/kafka/cloud.adoc#confluent_cloud[Confluent Cloud] section

=== Decide: Sink, Source, or Both

Configuring APOC Neo4j plugin comes in three different parts, depending on your need:

. *Required*: Configuring a connection to Kafka

.neo4j.conf
[source,ini]
----
apoc.kafka.bootstrap.servers=localhost:9092
----

. _Optional_: Configuring Neo4j to produce records to Kafka (xref:database-integration/kafka/producer.adoc[Source])
. _Optional_: Configuring Neo4j to ingest from Kafka (xref:database-integration/kafka/consumer.adoc#apoc_kafka_sink[Sink])

Follow one or both subsections according to your use case and need:

==== Sink

Take data from Kafka and store it in Neo4j (Neo4j as a data consumer) by adding configuration such as:

.neo4j.conf
[source,ini]
----
apoc.kafka.sink.enabled=true
apoc.kafka.sink.topic.cypher.my-ingest-topic=MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties
----

This will process every message that comes in on `my-ingest-topic` with the given cypher statement. When
that cypher statement executes, the `event` variable that is referenced will be set to the message received,
so this sample cypher will create a `(:Label)` node in the graph with the given ID, copying all of the
properties in the source message.

For full details on what you can do here, see the xref:database-integration/kafka/consumer.adoc#apoc_kafka_sink[Sink] section of the documentation.

==== Source

Produce data from Neo4j and send it to a Kafka topic (Neo4j as a data producer) by adding configuration such as:

.neo4j.conf
[source,ini]
----
apoc.kafka.source.topic.nodes.my-nodes-topic=Person{*}
apoc.kafka.source.topic.relationships.my-rels-topic=BELONGS-TO{*}
apoc.kafka.source.enabled=true
apoc.kafka.source.schema.polling.interval=10000
----

This will produce all graph nodes labeled `(:Person)` on to the topic `my-nodes-topic` and all
relationships of type `-[:BELONGS-TO]->` to the topic named `my-rels-topic`. Further, schema changes will
be polled every 10,000 ms, which affects how quickly the database picks up new indexes/schema changes.
Please note that if not specified a value for `apoc.kafka.source.schema.polling.interval` property then Streams plugin will use
300,000 ms as default.

The expressions `Person{\*}` and `BELONGS-TO{*}` are _patterns_. You can find documentation on how to change
these in the xref:database-integration/kafka/producer.adoc#source-patterns[Patterns] section.

For full details on what you can do here, see the xref:database-integration/kafka/producer.adoc[Source] section of the documentation.

==== Restart Neo4j

Once the plugin is installed and configured, restarting the database will make it active.
If you have configured Neo4j to consume from kafka, it will begin immediately processing messages.

[NOTE]

====
When installing the latest version of the APOC Kafka plugin into Neo4j 4.x, watching to logs you could find something
similar to the following:
[source,logs]
----
2020-03-25 20:13:50.606+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.max.partition.fetch.bytes
2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.log.include.messages
2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.auto.offset.reset
2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.bootstrap.servers
2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.max.poll.records
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.log.enable
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.source.enabled
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.topic.cypher.boa.to.kafkaTest
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.tolerance
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.group.id
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.deadletterqueue.context.headers.enable
2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.deadletterqueue.context.header.prefix
2020-03-25 20:13:50.610+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.deadletterqueue.topic.name
2020-03-25 20:13:50.610+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.enabled.to.kafkaTest
----
*These are not errors*. They comes from the new Neo4j 4 Configuration System, which warns that it doesn't recognize those
properties. Despite these warnings the plugin will work properly.
====
If you have configured Neo4j to consume from kafka, it will begin immediately processing messages.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ ifdef::env-docs[]
[abstract]
--
This chapter describes the APOC Kafka Procedures in the APOC Kafka Library.
Use this section to configure Neo4j to know how procedures allow the functionality of the plugin
Use this section to configure Neo4j to know how procedures allow the functionality of the procedures
to be used ad-hoc in any Cypher query.
--
endif::env-docs[]

The APOC Kafka plugin comes out with a list of procedures.
The APOC Kafka procedures comes out with a list of procedures.

== Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ In case you Kafka broker is configured with `auto.create.topics.enable` to `fals
all the messages sent to topics that don't exist are discarded;
this because the `KafkaProducer.send()` method blocks the execution, as explained in https://issues.apache.org/jira/browse/KAFKA-3539[KAFKA-3539].
You can tune the custom property `kafka.topic.discovery.polling.interval` in order to
periodically check for new topics into the Kafka cluster so the plugin will be able
periodically check for new topics into the Kafka cluster so the procedures will be able
to send events to the defined topics.


Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit a02bef1

Please sign in to comment.