diff --git a/notebooks/02-ingestion/07-partitioning-while-streaming.ipynb b/notebooks/02-ingestion/07-partitioning-while-streaming.ipynb index 59435623..479e3724 100644 --- a/notebooks/02-ingestion/07-partitioning-while-streaming.ipynb +++ b/notebooks/02-ingestion/07-partitioning-while-streaming.ipynb @@ -153,12 +153,12 @@ "source": [ "from datetime import datetime, timedelta\n", "\n", - "#simulate events with timestamps starting at the top of the hour\n", - "gen_now = datetime.now().replace(microsecond=0, second=0, minute=0)\n", + "#simulate events with timestamps starting at two days ago\n", + "gen_now = datetime.now() - timedelta(days=2)\n", "gen_start_time = gen_now.strftime(\"%Y-%m-%d %H:%M:%S\")\n", "\n", "# generate 1 million events\n", - "total_row_count=1000000\n", + "total_row_count=100000\n", "headers = {\n", " 'Content-Type': 'application/json'\n", "}\n", @@ -167,7 +167,7 @@ " \"target\": { \"type\": \"kafka\", \"endpoint\": kafka_host, \"topic\": topic_name },\n", " \"config_file\": \"social/social_posts.json\", \n", " \"total_events\":total_row_count,\n", - " \"concurrency\":500,\n", + " \"concurrency\":100,\n", " \"time_type\": gen_start_time\n", "}\n", "datagen.post(\"/start\", json.dumps(datagen_request), headers=headers)" @@ -179,7 +179,7 @@ "metadata": {}, "source": [ "The next cell monitors the data generation until it completes publishing messages into the Kafka topic.\n", - "It will take a few minutes to generate 1 million messages. you can see the progress in the output field `total_records`." + "It will take a minute or so to generate 100,000 messages. you can see the progress in the output field `total_records`." ] }, { @@ -332,13 +332,13 @@ "```\n", "[\n", " {\n", - " \"count\": 1000000\n", + " \"count\": 100000\n", " }\n", "]\n", - "Total load time = 8.326123\n", + "Total load time = 2.137851\n", "```\n", "\n", - "Let's stop that ingestion to free up the worker slots and start the next one. \n", + "Stop that ingestion to free up the worker slots and start the next one. \n", "Stopping the ingestion involves waiting for the currently running tasks to finish building and publishing their segments, this could take up to a minute when the coordinator picks up the new segment and hands it off to the historical:" ] }, @@ -415,15 +415,15 @@ "```\n", "[\n", " {\n", - " \"count\": 1000000\n", + " \"count\": 100000\n", " }\n", "]\n", - "Total load time = 6.341758\n", + "Total load time = 2.096256\n", "```\n", "\n", - "I see some improvement which means that some of the work is being parallelized, but it is likely that I/O is is the long pole in the tent as I only have one drive. This would be different in a real cluster, but hey, 120,000+ msgs/second with 1 and 157,000+ with 2 isn't bad for my local setup.\n", + "At this volume of data there is little improvement. Try running with more messages to see the effects of parallelism.\n", "\n", - "Let's stop this ingestion and try with 4:" + "Stop this ingestion and try with 4:" ] }, { @@ -495,20 +495,9 @@ "id": "c9488652-df3e-44d9-a671-372df333d01a", "metadata": {}, "source": [ - "My result with 4 tasks:\n", - "```\n", - "[\n", - " {\n", - " \"count\": 1000000\n", - " }\n", - "]\n", - "Total load time = 5.541399\n", - "```\n", - "While it got a little faster, the returns are clearly diminishing and remember that testing performance with this laptop setup doesn't make much sense because all processes are sharing my laptop resources, so parallelism is limited and resource isolation is non-existent. As an example of this variability, in another run, this ingestion turned out to be 1 second slower than the 2 task run.\n", - "\n", "We are focusing on the effects that the number of tasks have on segment production so let's [look at the datasources](http://localhost:8888/unified-console.html#segments) in the Druid console. Notice that the number of segments produced is the number of tasks that we running to ingest. \n", "\n", - "![](assets/datasources-streaming-diif-tasks.png)\n", + "![](assets/datasources-streaming-diff-tasks.png)\n", "\n", "In general, streaming tasks will generate at least one segment file per task duration period. They could generate more. If the __time column of the data received spans multiple segment granularity time chunks, there will be at least one segment output for each time chunk touched by the data. There could be more than one segment per time chunk if the number of rows received exceeds the `maxRowsPerSegment`. So there are a few sources of data fragmentation when running streaming ingestion that are inherent to its scalable high troughput design. \n", "\n", @@ -562,9 +551,9 @@ "\n", "Compaction can also apply secondary partitioning to reorganize the data within each time chunk for better segment pruning at query time. \n", "\n", - "Since the table only has 1 million records and the default `targetRowsPerSegment` during compaction is 5 million, you'll need to lower that value to see the effects of secondary partitioning with this table.\n", + "Since the table only has 100,000 records and the default `targetRowsPerSegment` during compaction is 5 million, you'll need to lower that value to see the effects of secondary partitioning with this table.\n", "\n", - "Here's the compaction task, notice that the `partitionsSpec` in `tuningConfig` specifies `range` partitioning with `username` as the partitioning column. It also uses targetRowsPerSegment at 250k so that we'll see the partitioning. This is just for illustration, keeping it at 5 million or so is a good initial target in a real scenario." + "Here's the compaction task, notice that the `partitionsSpec` in `tuningConfig` specifies `range` partitioning with `username` as the partitioning column. It also uses targetRowsPerSegment at 25k so that we'll see the partitioning. This is just for illustration, keeping it at 5 million or so is a good initial target in a real scenario." ] }, { @@ -592,7 +581,7 @@ " \"partitionDimensions\": [\n", " \"username\"\n", " ],\n", - " \"targetRowsPerSegment\": 250000\n", + " \"targetRowsPerSegment\": 25000\n", " }\n", " } \n", "}\n", @@ -608,7 +597,7 @@ "id": "d50f3c20-e3f6-4f2f-aae8-670327d80eea", "metadata": {}, "source": [ - "[Look at the segments](http://localhost:8888/unified-console.html#segments/datasource=social_media_4task) again. You'll see that it is back to 4 segments but this time they have more information in the Shard Spec column. The compaction process sorted the data by `username` and split it up into ranges of the column's values attempting to keep each segment file as close to the 250k we requested. Given the low number of user names in the data and their slight skew, we ended up with uneven segment sizes. Try to use columns with enough cardinality to avoid this issue or add other columns to the partitioning spec that will help add cardinality. The columns used for secondary partitioning should be the most common filter criteria in your queries; other than time filters that is. This will help improve query performance because the Broker will prune the segments needed and only submit the query to the Historicals with the relevant segments. \n", + "[Look at the segments](http://localhost:8888/unified-console.html#segments/datasource=social_media_4task) again. You'll see that it is back to 4 segments but this time they have more information in the Shard Spec column. The compaction process sorted the data by `username` and split it up into ranges of the column's values attempting to keep each segment file as close to the 25k we requested. Given the low number of user names in the data and their slight skew, we may end up with uneven segment sizes. Try to use columns with enough cardinality to avoid this issue or add other columns to the partitioning spec that will help add cardinality. The columns used for secondary partitioning should be the most common filter criteria in your queries; other than time filters that is. This will help improve query performance because the Broker will prune the segments needed and only submit the query to the Historicals with the relevant segments. \n", "\n", "![](assets/segments-range-partitioned.png)" ] @@ -667,14 +656,6 @@ " * [Streaming Ingestion]()\n", " * [Compaction and Auto Compaction]()" ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a53b985e-f6a8-4dcc-81c0-e81234bfb94d", - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { diff --git a/notebooks/02-ingestion/assets/datasources-streaming-diff-tasks.png b/notebooks/02-ingestion/assets/datasources-streaming-diff-tasks.png new file mode 100644 index 00000000..546c7374 Binary files /dev/null and b/notebooks/02-ingestion/assets/datasources-streaming-diff-tasks.png differ diff --git a/notebooks/02-ingestion/assets/datasources-streaming-diif-tasks.png b/notebooks/02-ingestion/assets/datasources-streaming-diif-tasks.png deleted file mode 100644 index 53a06ee4..00000000 Binary files a/notebooks/02-ingestion/assets/datasources-streaming-diif-tasks.png and /dev/null differ diff --git a/notebooks/02-ingestion/assets/segments-range-partitioned.png b/notebooks/02-ingestion/assets/segments-range-partitioned.png index 170e81e9..19a008f8 100644 Binary files a/notebooks/02-ingestion/assets/segments-range-partitioned.png and b/notebooks/02-ingestion/assets/segments-range-partitioned.png differ