A connector tool to extract data from SQL databases and import into GCS using Apache Beam.
This tool is runnable locally, or on any other backend supported by Apache Beam, e.g. Cloud Dataflow.
DEVELOPMENT STATUS: Alpha. Usable in production already.
DBeam is tool based that reads all the data from single SQL database table, converts the data into Avro and stores it into appointed location, usually in GCS. It runs as a single threaded Apache Beam pipeline.
DBeam requires the database credentials, the database table name to read, and the output location to store the extracted data into. DBeam first makes a single select into the target table with limit one to infer the table schema. After the schema is created the job will be launched which simply streams the table contents via JDBC into target location as Avro.
- Support both PostgreSQL and MySQL JDBC connectors
- Supports Google CloudSQL managed databases
- Currently output only to Avro format
- Reads database from an external password file (
--passwordFile
) or an external KMS encrypted password file (--passwordFileKmsEncrypted
) - Can filter only records of the current day with the
--partitionColumn
parameter - Check and fail on too old partition dates. Snapshot dumps are not filtered by a given date/partition, when running for a too old partition, the job fails to avoid new data in old partitions. (can be disabled with
--skipPartitionCheck
) - Implemented as Apache Beam SDK pipeline, supporting any of its runners (tested with
DirectRunner
andDataflowRunner
)
--connectionUrl
: the JDBC connection url to perform the dump--table
: the database table to query and perform the dump--sqlFile
: a path to a file containing a SQL query (used instead a generated query based ontable
parameter)--output
: the path to store the output--username
: the database user name--password
: the database password--passwordFile
: a path to a local file containing the database password--limit
: limit the output number of rows, indefinite by default--avroSchemaNamespace
: the namespace of the generated avro schema,"dbeam_generated"
by default--exportTimeout
: maximum time the export can take, after this timeout the job is cancelled. Default isP7D
(long timeout).--partitionColumn
: the name of a date/timestamp column to filter data based on current partition--partition
: the date of the current partition, parsed using ISODateTimeFormat.localDateOptionalTimeParser--partitionPeriod
: the period in which dbeam runs, used to filter based on current partition and also to check if executions are being run for a too old partition--skipPartitionCheck
: when partition column is not specified, by default fail when the partition parameter is not too old; use this avoid this behavior--minPartitionPeriod
: the minimum partition required for the job not to fail (when partition column is not specified), by defaultnow() - 2*partitionPeriod
--queryParallelism
: max number of queries to generate to extract in parallel. Generates one query if nothing is specified. Split columnsplitColumn
must be defined.--splitColumn
: a long / integer type column which is used to determine bounds for generating parallel queries. Must be used with parallelism defined.--avroSchemaFilePath
: a path to a file containing an input avro schema file (optional)
If provided an input Avro schema file, dbeam will read input schema file and use some of the properties when an output Avro schema is created.
record.doc
record.namespace
record.field.doc
This is a pre-alpha feature currently under development and experimentation.
Read queries used by dbeam to extract data generally don't place any locks, and hence multiple read queries
can run in parallel. When running in parallel mode with --queryParallelism
specified, dbeam looks for
--splitColumn
argument to find the max and min values in that column. The max and min are then used
as range bounds for generating queryParallelism
number of queries which are then run in parallel to read data.
Since the splitColumn is used to calculate the query bounds, and dbeam needs to calculate intermediate
bounds for each query, the type of the column must be long / int. It is assumed that the distribution of values on the splitColumn
is sufficiently random and sequential. Example if the min and max of the split column is divided equally into query parallelism parts, each part would contain approximately equal number of records. Having skews in this data would result in straggling queries, and hence wont provide much improvement. Having the records sequential would help in having the queries run faster and it would reduce random disk seeks.
Recommended usage:
Beam would run each query generated by DBeam in 1 dedicated vCPU (when running with Dataflow Runner), thus for best performance it is recommended that the total number of vCPU available for a given job should be equal to the queryParallelism
specified. Hence if workerMachineType
for Dataflow is n1-standard-w
and numWorkers
is n
then queryParallelism
q
should be a multiple of n*w
and the job would be fastest if q = n * w
.
For an export of a table running from a dedicated PostgresQL replica, we have seen best performance over vCPU time and wall time when having a queryParallelism
of 16. Bumping queryParallelism
further increases the vCPU time without offering much gains on the wall time of the complete export. It is probably good to use queryParallelism
less than 16 for experimenting.
Building and testing can be achieved with mvn
:
mvn validate
In order to create a jar with all dependencies under ./dbeam-core/target/dbeam-core-shaded.jar
run the following:
mvn clean package -Ppack
Using java from the command line:
java -cp ./dbeam-core/target/dbeam-core-shaded.jar \
com.spotify.dbeam.jobs.JdbcAvroJob \
--output=gs://my-testing-bucket-name/ \
--username=my_database_username \
--password=secret \
--connectionUrl=jdbc:postgresql://some.database.uri.example.org:5432/my_database \
--table=my_table
For CloudSQL:
java -cp ./dbeam-core/target/dbeam-core-shaded.jar \
com.spotify.dbeam.jobs.JdbcAvroJob \
--output=gs://my-testing-bucket-name/ \
--username=my_database_username \
--password=secret \
--connectionUrl=jdbc:postgresql://google/database?socketFactory=com.google.cloud.sql.postgres.SocketFactory&socketFactoryArg=project:region:cloudsql-instance \
--table=my_table
- Replace postgres with mysql if you are using MySQL.
- More details can be found at CloudSQL JDBC SocketFactory
To run a cheap data extraction, as a way to validate, one can run:
java -cp ./dbeam-core/target/dbeam-core-shaded.jar \
com.spotify.dbeam.jobs.JdbcAvroJob \
--output=gs://my-testing-bucket-name/ \
--username=my_database_username \
--password=secret \
--connectionUrl=jdbc:postgresql://some.database.uri.example.org:5432/my_database \
--table=my_table \
--limit=10 \
--skipPartitionCheck
Database password can be configured by simply passing --password=writepasswordhere
, --passwordFile=/path/to/file/containing/password
or --passwordFile=gs://gcs-bucket/path/to/file/containing/password
.
A more robust configuration is to point to a Google KMS encrypted file.
DBeam will try to decrypt using KMS if the file ends with .encrypted
(e.g. --passwordFileKmsEncrypted=gs://gcs-bucket/path/to/db-password.encrypted
).
The file should contain a base64 encoded encrypted content.
It can be generated using gcloud
like the following:
echo "super_secret_password" \
| gcloud kms encrypt \
--location "global" \
--keyring "dbeam" \
--key "default" \
--project "mygcpproject" \
--plaintext-file - \
--ciphertext-file - \
| base64 \
| gsutil cp - gs://gcs-bucket/path/to/db-password.encrypted
KMS location, keyring, and key can be configured via Java Properties, defaults are:
java \
-DKMS_KEYRING=dbeam \
-DKMS_KEY=default \
-DKMS_LOCATION=global \
-DKMS_PROJECT=default_gcp_project \
-cp ./dbeam-core/target/dbeam-core-shaded.jar \
com.spotify.dbeam.jobs.JdbcAvroJob \
...
To include DBeam library in a mvn project add the following dependency in pom.xml
:
<dependency>
<groupId>com.spotify</groupId>
<artifactId>dbeam-core</artifactId>
<version>${dbeam.version}</version>
</dependency>
To include DBeam library in a SBT project add the following dependency in build.sbt
:
libraryDependencies ++= Seq(
"com.spotify" % "dbeam-core" % dbeamVersion
)
Make sure you have mvn installed. For editor, IntelliJ IDEA is recommended.
To test and verify changes during development, run:
mvn validate
Or:
mvn validate -Pcoverage
This project adheres to the Open Code of Conduct. By participating, you are expected to honor this code.
Every push to master will deploy a snapshot version to Sonatype. You can check the deployment in the following links:
Copyright 2016-2019 Spotify AB.
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0