Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding support for partitioned s3 source #4

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,23 @@ shouldSortFiles|true|whether to sort files based on timestamp while listing them
useInstanceProfileCredentials|false|Whether to use EC2 instance profile credentials for connecting to Amazon SQS
maxFilesPerTrigger|no default value|maximum number of files to process in a microbatch
maxFileAge|7d|Maximum age of a file that can be found in this directory
basePath|no default value|Base path in case of partitioned S3 data. Eg. `s3://bucket/basedDir/part1=10/part2=20/file.json` will have basePath as `s3://bucket/basedDir/`

## Using Parrtitioned S3 Bucket

In case your S3 bucket is partitioned, your schema must contain both data columns as well as partition
columns. Moreover, partition columns need to have `isPartitioned` set to `true` in their metadata.
Copy link
Collaborator

@itsvikramagr itsvikramagr Aug 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this constraint which this module is adding or is it normally expected that the partition column will have isPartitioned in their metadata.


Example:
```
val metaData = (new MetadataBuilder).putString("isPartitioned", "true").build()

val partitionedSchema = new StructType().add(StructField(
"col1", IntegerType, true, metaData))
```

Also, `basePath` needs to be specified in the options in case of partitioned S3 bucket.
Specifying partitioned columns without specifying the `basePath` will throw an error.

## Example

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.streaming.sqs

import java.net.URI

import scala.util.Try

import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
Expand All @@ -34,6 +36,8 @@ class SqsSource(sparkSession: SparkSession,
options: Map[String, String],
override val schema: StructType) extends Source with Logging {

import SqsSource._

private val sourceOptions = new SqsSourceOptions(options)

private val hadoopConf = sparkSession.sessionState.newHadoopConf()
Expand All @@ -50,6 +54,22 @@ class SqsSource(sparkSession: SparkSession,

private val shouldSortFiles = sourceOptions.shouldSortFiles

private val partitionColumnNames = {
schema.fields.filter(field => field.metadata.contains(IS_PARTITIONED) &&
Try(field.metadata.getBoolean(IS_PARTITIONED)).toOption.getOrElse(throw new
IllegalArgumentException(s"$IS_PARTITIONED for column ${field.name} must be true or " +
s"false"))
).map(_.name)
}

private val optionsWithBasePath = if (!partitionColumnNames.isEmpty) {
val basePartitionsPath = sourceOptions.basePath.getOrElse(throw new IllegalArgumentException(
s"$BASE_PATH is mandatory if schema contains partitionColumns"))
options + (BASE_PATH -> basePartitionsPath)
} else {
options
}

private val sqsClient = new SqsClient(sourceOptions, hadoopConf)

metadataLog.allFiles().foreach { entry =>
Expand All @@ -75,8 +95,9 @@ class SqsSource(sparkSession: SparkSession,
sparkSession,
paths = files.map(f => new Path(new URI(f.path)).toString),
userSpecifiedSchema = Some(schema),
partitionColumns = partitionColumnNames,
className = fileFormatClassName,
options = options)
options = optionsWithBasePath)
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
checkFilesExist = false), isStreaming = true))
}
Expand Down Expand Up @@ -138,3 +159,9 @@ class SqsSource(sparkSession: SparkSession,

}

object SqsSource {

val IS_PARTITIONED = "isPartitioned"
val BASE_PATH = "basePath"
}

Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class SqsSourceOptions(parameters: CaseInsensitiveMap[String]) extends Logging {
}
}

val basePath: Option[String] = parameters.get("basePath")

/**
* Maximum age of a file that can be found in this directory, before it is ignored. For the
* first batch all files will be considered valid.
Expand Down
112 changes: 112 additions & 0 deletions src/test/scala/org/apache/spark/sql/streaming/sqs/SqsSourceSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.streaming.sqs

import java.util.Locale

import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest}
import org.apache.spark.sql.types._

class SqsSourceSuite extends StreamTest {

import org.apache.spark.sql.streaming.sqs.SqsSource._

test("partitioned data source - base path not specified") {

var query: StreamingQuery = null

val expectedMsg = s"$BASE_PATH is mandatory if schema contains partitionColumns"

try {
val errorMessage = intercept[StreamingQueryException] {

val metaData = (new MetadataBuilder).putBoolean(IS_PARTITIONED, true).build()

val partitionedSchema = new StructType().add(StructField(
"col1", IntegerType, true, metaData))

val reader = spark
.readStream
.format("s3-sqs")
.option("sqsUrl", "https://DUMMY_URL")
.option("fileFormat", "json")
.option("region", "us-east-1")
.schema(partitionedSchema)
.load()

query = reader.writeStream
.queryName("testQuery")
.format("memory")
.start()

query.processAllAvailable()
}.getMessage
assert(errorMessage.toLowerCase(Locale.ROOT).contains(expectedMsg.toLowerCase(Locale.ROOT)))
} finally {
if (query != null) {
// terminating streaming query if necessary
query.stop()
}
}

}

test("isPartitioned doesn't contain true or false") {

var query: StreamingQuery = null

val columName = "col1"

val expectedMsg = s"$IS_PARTITIONED for column $columName must be true or false"

try {
val errorMessage = intercept[StreamingQueryException] {

val metaData = (new MetadataBuilder).putString(IS_PARTITIONED, "x").build()

val partitionedSchema = new StructType().add(StructField(
"col1", IntegerType, true, metaData))

val reader = spark
.readStream
.format("s3-sqs")
.option("sqsUrl", "https://DUMMY_URL")
.option("fileFormat", "json")
.option("region", "us-east-1")
.schema(partitionedSchema)
.load()

query = reader.writeStream
.format("memory")
.queryName("testQuery")
.start()

query.processAllAvailable()
}.getMessage
assert(errorMessage.toLowerCase(Locale.ROOT).contains(expectedMsg.toLowerCase(Locale.ROOT)))
} finally {
if (query != null) {
// terminating streaming query if necessary
query.stop()
}
}

}

}