Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dataflow): Dataflow to Apache Iceberg with dynamic destinations #9645

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dataflow/snippets/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<apache_beam.version>2.58.0</apache_beam.version>
<apache_beam.version>2.60.0</apache_beam.version>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a little bit old version. cc @ahmedabu98

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 but noting that Dataflow will be upgrading Iceberg to the latest version anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bumped the version

<slf4j.version>2.0.12</slf4j.version>
<parquet.version>1.14.0</parquet.version>
<iceberg.version>1.4.2</iceberg.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.dataflow;

// [START dataflow_apache_iceberg_dynamic_destinations]
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;

public class ApacheIcebergDynamicDestinations {

// The schema for the table rows.
public static final Schema SCHEMA = new Schema.Builder()
.addInt64Field("id")
.addStringField("name")
.addStringField("airport")
.build();

// The data to write to table, formatted as JSON strings.
static final List<String> TABLE_ROWS = Arrays.asList(
"{\"id\":0, \"name\":\"Alice\", \"airport\": \"ORD\" }",
"{\"id\":1, \"name\":\"Bob\", \"airport\": \"SYD\" }",
"{\"id\":2, \"name\":\"Charles\", \"airport\": \"ORD\" }"
);

// [END dataflow_apache_iceberg_dynamic_destinations]
public interface Options extends PipelineOptions {
@Description("The URI of the Apache Iceberg warehouse location")
String getWarehouseLocation();

void setWarehouseLocation(String value);

@Description("The name of the Apache Iceberg catalog")
String getCatalogName();

void setCatalogName(String value);
}

public static PipelineResult.State main(String[] args) {
// Parse the pipeline options passed into the application. Example:
// --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = createPipeline(options);
return pipeline.run().waitUntilFinish();
}

// [START dataflow_apache_iceberg_dynamic_destinations]
public static Pipeline createPipeline(Options options) {
Pipeline pipeline = Pipeline.create(options);

// Configure the Iceberg source I/O
Map catalogConfig = ImmutableMap.<String, Object>builder()
.put("warehouse", options.getWarehouseLocation())
.put("type", "hadoop")
.build();

ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("catalog_name", options.getCatalogName())
.put("catalog_properties", catalogConfig)
// Route the incoming records based on the value of the "airport" field.
.put("table", "flights-{airport}")
// Specify which fields to keep from the input data.
.put("keep", Arrays.asList("name", "id"))
.build();

// Build the pipeline.
pipeline.apply(Create.of(TABLE_ROWS))
.apply(JsonToRow.withSchema(SCHEMA))
.apply(Managed.write(Managed.ICEBERG).withConfig(config));

return pipeline;
}
}
// [END dataflow_apache_iceberg_dynamic_destinations]
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

package com.example.dataflow;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.ImmutableMap;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.UUID;
import org.apache.beam.sdk.PipelineResult;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
Expand Down Expand Up @@ -52,25 +52,19 @@
import org.junit.Test;

public class ApacheIcebergIT {
private ByteArrayOutputStream bout;
private final PrintStream originalOut = System.out;

private static final String CATALOG_NAME = "local";
private static final String TABLE_NAME = "table1";
private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(TABLE_NAME);

// The output file that the Dataflow pipeline writes.
private static final String OUTPUT_FILE_NAME_PREFIX = UUID.randomUUID().toString();
private static final String OUTPUT_FILE_NAME = OUTPUT_FILE_NAME_PREFIX + "-00000-of-00001.txt";

private Configuration hadoopConf = new Configuration();
private java.nio.file.Path warehouseDirectory;
private String warehouseLocation;
private Catalog catalog;
private Table table;
private static final String CATALOG_NAME = "local";

String outputFileNamePrefix = UUID.randomUUID().toString();
String outputFileName = outputFileNamePrefix + "-00000-of-00001.txt";

private void createIcebergTable(Catalog catalog, TableIdentifier tableId) {
private Table createIcebergTable(String name) {

TableIdentifier tableId = TableIdentifier.of(name);

// This schema represents an Iceberg table schema. It needs to match the
// org.apache.beam.sdk.schemas.Schema that is defined in ApacheIcebergWrite. However, these
Expand All @@ -79,10 +73,10 @@ private void createIcebergTable(Catalog catalog, TableIdentifier tableId) {
NestedField.required(1, "id", Types.LongType.get()),
NestedField.optional(2, "name", Types.StringType.get()));

table = catalog.createTable(tableId, schema);
return catalog.createTable(tableId, schema);
}

private void writeTableRecord()
private void writeTableRecord(Table table)
throws IOException {
GenericRecord record = GenericRecord.create(table.schema());
record.setField("id", 0L);
Expand All @@ -109,72 +103,95 @@ private void writeTableRecord()
.commit();
}

private boolean tableContainsRecord(Table table, String data) {
CloseableIterable<Record> records = IcebergGenerics.read(table).build();
for (Record r : records) {
if (r.toString().contains(data)) {
return true;
}
}
return false;
}

@Before
public void setUp() throws IOException {
bout = new ByteArrayOutputStream();
System.setOut(new PrintStream(bout));

// Create an Apache Iceberg catalog with a table.
warehouseDirectory = Files.createTempDirectory("test-warehouse");
warehouseLocation = "file:" + warehouseDirectory.toString();
System.out.println(warehouseLocation);
catalog =
CatalogUtil.loadCatalog(
CatalogUtil.ICEBERG_CATALOG_HADOOP,
CATALOG_NAME,
ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation),
hadoopConf);
createIcebergTable(catalog, TABLE_IDENTIFIER);

}

@After
public void tearDown() throws IOException {
Files.deleteIfExists(Paths.get(OUTPUT_FILE_NAME));
System.setOut(originalOut);
Files.deleteIfExists(Paths.get(outputFileName));
}

@Test
public void testApacheIcebergWrite() {
String tableName = "write_table";
final Table table = createIcebergTable("write_table");

// Run the Dataflow pipeline.
ApacheIcebergWrite.main(
new String[] {
"--runner=DirectRunner",
"--warehouseLocation=" + warehouseLocation,
"--catalogName=" + CATALOG_NAME,
"--tableName=" + TABLE_NAME
"--tableName=" + tableName
});

// Verify that the pipeline wrote records to the table.
Table table = catalog.loadTable(TABLE_IDENTIFIER);
CloseableIterable<Record> records = IcebergGenerics.read(table)
.build();
for (Record r : records) {
System.out.println(r);
}
assertTrue(tableContainsRecord(table, "0, Alice"));
assertTrue(tableContainsRecord(table, "1, Bob"));
assertTrue(tableContainsRecord(table, "2, Charles"));
}

@Test
public void testApacheIcebergDynamicDestinations() {
final Table tableORD = createIcebergTable("flights-ORD");
final Table tableSYD = createIcebergTable("flights-SYD");

// Run the Dataflow pipeline.
PipelineResult.State state = ApacheIcebergDynamicDestinations.main(
new String[] {
"--runner=DirectRunner",
"--warehouseLocation=" + warehouseLocation,
"--catalogName=" + CATALOG_NAME
});
assertEquals(PipelineResult.State.DONE, state);

String got = bout.toString();
assertTrue(got.contains("0, Alice"));
assertTrue(got.contains("1, Bob"));
assertTrue(got.contains("2, Charles"));
// Verify that the pipeline wrote records to the correct tables.
assertTrue(tableContainsRecord(tableORD, "0, Alice"));
assertTrue(tableContainsRecord(tableORD, "2, Charles"));
assertTrue(tableContainsRecord(tableSYD, "1, Bob"));
}

@Test
public void testApacheIcebergRead() throws IOException {
String tableName = "read_table";
final Table table = createIcebergTable(tableName);

// Seed the Apache Iceberg table with data.
writeTableRecord();
writeTableRecord(table);

// Run the Dataflow pipeline.
ApacheIcebergRead.main(
new String[] {
"--runner=DirectRunner",
"--warehouseLocation=" + warehouseLocation,
"--catalogName=" + CATALOG_NAME,
"--tableName=" + TABLE_NAME,
"--outputPath=" + OUTPUT_FILE_NAME_PREFIX
"--tableName=" + tableName,
"--outputPath=" + outputFileNamePrefix
});

// Verify the pipeline wrote the table data to a local file.
String output = Files.readString(Paths.get(OUTPUT_FILE_NAME));
// Verify the pipeline wrote the table data to a text file.
String output = Files.readString(Paths.get(outputFileName));
assertTrue(output.contains("0:Person-0"));
}
}