Skip to content

Commit

Permalink
fixed failing test in notebook by reducing the data generated.
Browse files Browse the repository at this point in the history
  • Loading branch information
sergioferragut committed Feb 27, 2024
1 parent ed1f4c8 commit 42e61e3
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 36 deletions.
53 changes: 17 additions & 36 deletions notebooks/02-ingestion/07-partitioning-while-streaming.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,12 @@
"source": [
"from datetime import datetime, timedelta\n",
"\n",
"#simulate events with timestamps starting at the top of the hour\n",
"gen_now = datetime.now().replace(microsecond=0, second=0, minute=0)\n",
"#simulate events with timestamps starting at two days ago\n",
"gen_now = datetime.now() - timedelta(days=2)\n",
"gen_start_time = gen_now.strftime(\"%Y-%m-%d %H:%M:%S\")\n",
"\n",
"# generate 1 million events\n",
"total_row_count=1000000\n",
"total_row_count=100000\n",
"headers = {\n",
" 'Content-Type': 'application/json'\n",
"}\n",
Expand All @@ -167,7 +167,7 @@
" \"target\": { \"type\": \"kafka\", \"endpoint\": kafka_host, \"topic\": topic_name },\n",
" \"config_file\": \"social/social_posts.json\", \n",
" \"total_events\":total_row_count,\n",
" \"concurrency\":500,\n",
" \"concurrency\":100,\n",
" \"time_type\": gen_start_time\n",
"}\n",
"datagen.post(\"/start\", json.dumps(datagen_request), headers=headers)"
Expand All @@ -179,7 +179,7 @@
"metadata": {},
"source": [
"The next cell monitors the data generation until it completes publishing messages into the Kafka topic.\n",
"It will take a few minutes to generate 1 million messages. you can see the progress in the output field `total_records`."
"It will take a minute or so to generate 100,000 messages. you can see the progress in the output field `total_records`."
]
},
{
Expand Down Expand Up @@ -332,13 +332,13 @@
"```\n",
"[\n",
" {\n",
" \"count\": 1000000\n",
" \"count\": 100000\n",
" }\n",
"]\n",
"Total load time = 8.326123\n",
"Total load time = 2.137851\n",
"```\n",
"\n",
"Let's stop that ingestion to free up the worker slots and start the next one. \n",
"Stop that ingestion to free up the worker slots and start the next one. \n",
"Stopping the ingestion involves waiting for the currently running tasks to finish building and publishing their segments, this could take up to a minute when the coordinator picks up the new segment and hands it off to the historical:"
]
},
Expand Down Expand Up @@ -415,15 +415,15 @@
"```\n",
"[\n",
" {\n",
" \"count\": 1000000\n",
" \"count\": 100000\n",
" }\n",
"]\n",
"Total load time = 6.341758\n",
"Total load time = 2.096256\n",
"```\n",
"\n",
"I see some improvement which means that some of the work is being parallelized, but it is likely that I/O is is the long pole in the tent as I only have one drive. This would be different in a real cluster, but hey, 120,000+ msgs/second with 1 and 157,000+ with 2 isn't bad for my local setup.\n",
"At this volume of data there is little improvement. Try running with more messages to see the effects of parallelism.\n",
"\n",
"Let's stop this ingestion and try with 4:"
"Stop this ingestion and try with 4:"
]
},
{
Expand Down Expand Up @@ -495,20 +495,9 @@
"id": "c9488652-df3e-44d9-a671-372df333d01a",
"metadata": {},
"source": [
"My result with 4 tasks:\n",
"```\n",
"[\n",
" {\n",
" \"count\": 1000000\n",
" }\n",
"]\n",
"Total load time = 5.541399\n",
"```\n",
"While it got a little faster, the returns are clearly diminishing and remember that testing performance with this laptop setup doesn't make much sense because all processes are sharing my laptop resources, so parallelism is limited and resource isolation is non-existent. As an example of this variability, in another run, this ingestion turned out to be 1 second slower than the 2 task run.\n",
"\n",
"We are focusing on the effects that the number of tasks have on segment production so let's [look at the datasources](http://localhost:8888/unified-console.html#segments) in the Druid console. Notice that the number of segments produced is the number of tasks that we running to ingest. \n",
"\n",
"![](assets/datasources-streaming-diif-tasks.png)\n",
"![](assets/datasources-streaming-diff-tasks.png)\n",
"\n",
"In general, streaming tasks will generate at least one segment file per task duration period. They could generate more. If the __time column of the data received spans multiple segment granularity time chunks, there will be at least one segment output for each time chunk touched by the data. There could be more than one segment per time chunk if the number of rows received exceeds the `maxRowsPerSegment`. So there are a few sources of data fragmentation when running streaming ingestion that are inherent to its scalable high troughput design. \n",
"\n",
Expand Down Expand Up @@ -562,9 +551,9 @@
"\n",
"Compaction can also apply secondary partitioning to reorganize the data within each time chunk for better segment pruning at query time. \n",
"\n",
"Since the table only has 1 million records and the default `targetRowsPerSegment` during compaction is 5 million, you'll need to lower that value to see the effects of secondary partitioning with this table.\n",
"Since the table only has 100,000 records and the default `targetRowsPerSegment` during compaction is 5 million, you'll need to lower that value to see the effects of secondary partitioning with this table.\n",
"\n",
"Here's the compaction task, notice that the `partitionsSpec` in `tuningConfig` specifies `range` partitioning with `username` as the partitioning column. It also uses targetRowsPerSegment at 250k so that we'll see the partitioning. This is just for illustration, keeping it at 5 million or so is a good initial target in a real scenario."
"Here's the compaction task, notice that the `partitionsSpec` in `tuningConfig` specifies `range` partitioning with `username` as the partitioning column. It also uses targetRowsPerSegment at 25k so that we'll see the partitioning. This is just for illustration, keeping it at 5 million or so is a good initial target in a real scenario."
]
},
{
Expand Down Expand Up @@ -592,7 +581,7 @@
" \"partitionDimensions\": [\n",
" \"username\"\n",
" ],\n",
" \"targetRowsPerSegment\": 250000\n",
" \"targetRowsPerSegment\": 25000\n",
" }\n",
" } \n",
"}\n",
Expand All @@ -608,7 +597,7 @@
"id": "d50f3c20-e3f6-4f2f-aae8-670327d80eea",
"metadata": {},
"source": [
"[Look at the segments](http://localhost:8888/unified-console.html#segments/datasource=social_media_4task) again. You'll see that it is back to 4 segments but this time they have more information in the Shard Spec column. The compaction process sorted the data by `username` and split it up into ranges of the column's values attempting to keep each segment file as close to the 250k we requested. Given the low number of user names in the data and their slight skew, we ended up with uneven segment sizes. Try to use columns with enough cardinality to avoid this issue or add other columns to the partitioning spec that will help add cardinality. The columns used for secondary partitioning should be the most common filter criteria in your queries; other than time filters that is. This will help improve query performance because the Broker will prune the segments needed and only submit the query to the Historicals with the relevant segments. \n",
"[Look at the segments](http://localhost:8888/unified-console.html#segments/datasource=social_media_4task) again. You'll see that it is back to 4 segments but this time they have more information in the Shard Spec column. The compaction process sorted the data by `username` and split it up into ranges of the column's values attempting to keep each segment file as close to the 25k we requested. Given the low number of user names in the data and their slight skew, we may end up with uneven segment sizes. Try to use columns with enough cardinality to avoid this issue or add other columns to the partitioning spec that will help add cardinality. The columns used for secondary partitioning should be the most common filter criteria in your queries; other than time filters that is. This will help improve query performance because the Broker will prune the segments needed and only submit the query to the Historicals with the relevant segments. \n",
"\n",
"![](assets/segments-range-partitioned.png)"
]
Expand Down Expand Up @@ -667,14 +656,6 @@
" * [Streaming Ingestion]()\n",
" * [Compaction and Auto Compaction]()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a53b985e-f6a8-4dcc-81c0-e81234bfb94d",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file not shown.
Binary file modified notebooks/02-ingestion/assets/segments-range-partitioned.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 42e61e3

Please sign in to comment.