Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming BigQuery Loader modules #7

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 16 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# terraform-google-bigquery-loader-pubsub-ce

A Terraform module which deploys the requisite micro-services for loading BigQuery on Google running on top of Compute Engine. If you want to use a custom image for this deployment you will need to ensure it is based on top of Ubuntu 20.04.
A Terraform module which deploys the BigQuery Loader application on Google running on top of Compute Engine. If you want to use a custom image for this deployment you will need to ensure it is based on top of Ubuntu 20.04.

## Telemetry

Expand All @@ -20,13 +20,7 @@ For details on what information is collected please see this module: https://git

## Usage

This module will deploy three seperate instance groups:

1. `mutator`: Attempts to create the events table if it doesn't exist and then listens for new `types` to update the table with as custom events and entities are tracked
2. `repeater`: Events that were sent with custom `events` and `entities` that have not yet been added to the events table will be re-tried later by the repeater
3. `streamloader`: Core application which pulls data from an Enriched events topic and loads into BigQuery

The mutator is deployed as a `singleton` instance but both the `repeater` and `streamloader` can be scaled horizontally if higher throughput is needed.
The BigQuery Loader reads data from a Snowplow Enriched output PubSub topic and writes in realtime to BigQuery events table.

```hcl
# NOTE: Needs to be fed by the enrich module with valid Snowplow Events
Expand All @@ -49,12 +43,6 @@ resource "google_bigquery_dataset" "pipeline_db" {
location = var.region
}

resource "google_storage_bucket" "dead_letter_bucket" {
name = "bq-loader-dead-letter"
location = var.region
force_destroy = true
}

module "bigquery_loader_pubsub" {
source = "snowplow-devops/bigquery-loader-pubsub-ce/google"

Expand All @@ -69,7 +57,6 @@ module "bigquery_loader_pubsub" {

input_topic_name = module.enriched_topic.name
bad_rows_topic_name = module.bad_rows_topic.name
gcs_dead_letter_bucket_name = google_storage_bucket.dead_letter_bucket.name
bigquery_dataset_id = google_bigquery_dataset.pipeline_db.dataset_id

ssh_key_pairs = []
Expand Down Expand Up @@ -115,19 +102,14 @@ module "bigquery_loader_pubsub" {
| [google_bigquery_dataset_iam_member.dataset_bigquery_data_editor_binding](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/bigquery_dataset_iam_member) | resource |
| [google_compute_firewall.egress](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/compute_firewall) | resource |
| [google_compute_firewall.ingress_ssh](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/compute_firewall) | resource |
| [google_compute_firewall.ingress_health_check](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/compute_firewall) | resource |
| [google_project_iam_member.sa_bigquery_data_editor](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource |
| [google_project_iam_member.sa_logging_log_writer](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource |
| [google_project_iam_member.sa_pubsub_publisher](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource |
| [google_project_iam_member.sa_pubsub_subscriber](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource |
| [google_project_iam_member.sa_pubsub_viewer](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource |
| [google_project_iam_member.sa_storage_object_viewer](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource |
| [google_pubsub_subscription.failed_inserts](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_subscription) | resource |
| [google_pubsub_subscription.input](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_subscription) | resource |
| [google_pubsub_subscription.types](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_subscription) | resource |
| [google_pubsub_topic.failed_inserts](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_topic) | resource |
| [google_pubsub_topic.types](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_topic) | resource |
| [google_service_account.sa](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/service_account) | resource |
| [google_storage_bucket_iam_binding.dead_letter_storage_object_admin_binding](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/storage_bucket_iam_binding) | resource |

## Inputs

Expand All @@ -147,32 +129,39 @@ module "bigquery_loader_pubsub" {
| <a name="input_bigquery_partition_column"></a> [bigquery\_partition\_column](#input\_bigquery\_partition\_column) | The partition column to use in the dataset | `string` | `"collector_tstamp"` | no |
| <a name="input_bigquery_require_partition_filter"></a> [bigquery\_require\_partition\_filter](#input\_bigquery\_require\_partition\_filter) | Whether to require a filter on the partition column in all queries | `bool` | `true` | no |
| <a name="input_bigquery_table_id"></a> [bigquery\_table\_id](#input\_bigquery\_table\_id) | The ID of the table within a dataset to load data into (will be created if it doesn't exist) | `string` | `"events"` | no |
| <a name="input_service_account_json_b64"></a> [bigquery\_service\_account\_json\_b64](#input\_bigquery\_service\_account\_json\_b64) | Custom credentials (as base64 encoded service account key) instead of default service account assigned to the loader's compute group | `string` | `""` | no |
| <a name="input_custom_iglu_resolvers"></a> [custom\_iglu\_resolvers](#input\_custom\_iglu\_resolvers) | The custom Iglu Resolvers that will be used by the loader to resolve and validate events | <pre>list(object({<br> name = string<br> priority = number<br> uri = string<br> api_key = string<br> vendor_prefixes = list(string)<br> }))</pre> | `[]` | no |
| <a name="input_default_iglu_resolvers"></a> [default\_iglu\_resolvers](#input\_default\_iglu\_resolvers) | The default Iglu Resolvers that will be used by the loader to resolve and validate events | <pre>list(object({<br> name = string<br> priority = number<br> uri = string<br> api_key = string<br> vendor_prefixes = list(string)<br> }))</pre> | <pre>[<br> {<br> "api_key": "",<br> "name": "Iglu Central",<br> "priority": 10,<br> "uri": "http://iglucentral.com",<br> "vendor_prefixes": []<br> },<br> {<br> "api_key": "",<br> "name": "Iglu Central - Mirror 01",<br> "priority": 20,<br> "uri": "http://mirror01.iglucentral.com",<br> "vendor_prefixes": []<br> }<br>]</pre> | no |
| <a name="input_gcp_logs_enabled"></a> [gcp\_logs\_enabled](#input\_gcp\_logs\_enabled) | Whether application logs should be reported to GCP Logging | `bool` | `true` | no |
| <a name="input_iglu_cache_size"></a> [iglu\_cache\_size](#input\_iglu\_cache\_size) | The size of cache used by Iglu Resolvers | `number` | `500` | no |
| <a name="input_iglu_cache_ttl_seconds"></a> [iglu\_cache\_ttl\_seconds](#input\_iglu\_cache\_ttl\_seconds) | Duration in seconds, how long should entries be kept in Iglu Resolvers cache before they expire | `number` | `600` | no |
| <a name="input_java_opts"></a> [java\_opts](#input\_java\_opts) | Custom JAVA Options | `string` | `"-XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75"` | no |
| <a name="input_labels"></a> [labels](#input\_labels) | The labels to append to this resource | `map(string)` | `{}` | no |
| <a name="input_machine_type_mutator"></a> [machine\_type\_mutator](#input\_machine\_type\_mutator) | The machine type to use | `string` | `"e2-small"` | no |
| <a name="input_machine_type_repeater"></a> [machine\_type\_repeater](#input\_machine\_type\_repeater) | The machine type to use | `string` | `"e2-small"` | no |
| <a name="input_machine_type_streamloader"></a> [machine\_type\_streamloader](#input\_machine\_type\_streamloader) | The machine type to use | `string` | `"e2-small"` | no |
| <a name="input_machine_type"></a> [machine\_type](#input\_machine\_type) | The machine type to use | `string` | `"e2-small"` | no |
| <a name="input_network_project_id"></a> [network\_project\_id](#input\_network\_project\_id) | The project ID of the shared VPC in which the stack is being deployed | `string` | `""` | no |
| <a name="input_ssh_block_project_keys"></a> [ssh\_block\_project\_keys](#input\_ssh\_block\_project\_keys) | Whether to block project wide SSH keys | `bool` | `true` | no |
| <a name="input_ssh_ip_allowlist"></a> [ssh\_ip\_allowlist](#input\_ssh\_ip\_allowlist) | The list of CIDR ranges to allow SSH traffic from | `list(any)` | <pre>[<br> "0.0.0.0/0"<br>]</pre> | no |
| <a name="input_ssh_key_pairs"></a> [ssh\_key\_pairs](#input\_ssh\_key\_pairs) | The list of SSH key-pairs to add to the servers | <pre>list(object({<br> user_name = string<br> public_key = string<br> }))</pre> | `[]` | no |
| <a name="input_subnetwork"></a> [subnetwork](#input\_subnetwork) | The name of the sub-network to deploy within; if populated will override the 'network' setting | `string` | `""` | no |
| <a name="input_target_size_repeater"></a> [target\_size\_repeater](#input\_target\_size\_repeater) | The number of servers to deploy | `number` | `1` | no |
| <a name="input_target_size_streamloader"></a> [target\_size\_streamloader](#input\_target\_size\_streamloader) | The number of servers to deploy | `number` | `1` | no |
| <a name="input_target_size"></a> [target\_size](#input\_target\_size) | The number of servers to deploy | `number` | `1` | no |
| <a name="input_telemetry_enabled"></a> [telemetry\_enabled](#input\_telemetry\_enabled) | Whether or not to send telemetry information back to Snowplow Analytics Ltd | `bool` | `true` | no |
| <a name="input_ubuntu_20_04_source_image"></a> [ubuntu\_20\_04\_source\_image](#input\_ubuntu\_20\_04\_source\_image) | The source image to use which must be based of of Ubuntu 20.04; by default the latest community version is used | `string` | `""` | no |
| <a name="input_user_provided_id"></a> [user\_provided\_id](#input\_user\_provided\_id) | An optional unique identifier to identify the telemetry events emitted by this stack | `string` | `""` | no |
| <a name="input_webhook_collector"></a> [webhook\_collector](#input\_webhook\_collector) | Collector address used to gather monitoring alerts | `string` | `""` | no |
| <a name="input_skip_schemas"></a> [skip\_schemas](#input\_skip\_schemas) | The list of schema keys which should be skipped (not loaded) to the warehouse | `list(string)` | `[]` | no |
| <a name="input_legacy_columns"></a> [legacy\_columns](#input\_legacy\_columns) | Schemas for which to use the legacy column style used by the v1 BigQuery Loader. For these columns, there is a column per _minor_ version of each schema. | `list(string)` | `[]` | no |
| <a name="input_healthcheck_enabled"></a> [healthcheck\_enabled](#input\_healthcheck\_enabled) | Whether or not to enable health check probe for GCP instance group | `bool` | `true` | no |

## Outputs

| Name | Description |
|------|-------------|
| <a name="output_health_check_id"></a> [health\_check\_id](#output\_health\_check\_id) | Identifier for the health check on the instance group |
| <a name="output_health_check_self_link"></a> [health\_check\_self\_link](#output\_health\_check\_self\_link) | The URL for the health check on the instance group |
| <a name="output_instance_group_url"></a> [instance\_group\_url](#output\_instance\_group\_url) | The full URL of the instance group created by the manager |
| <a name="output_manager_id"></a> [manager\_id](#output\_manager\_id) | Identifier for the instance group manager |
| <a name="output_manager_self_link"></a> [manager\_self\_link](#output\_manager\_self\_link) | The URL for the instance group manager |
| <a name="output_named_port_http"></a> [named\_port\_http](#output\_named\_port\_http) | The name of the port exposed by the instance group |
| <a name="output_named_port_value"></a> [named\_port\_value](#output\_named\_port\_value) | The named port value (e.g. 8080) |

# Copyright and license

Expand Down
Loading
Loading