Skip to content

Commit

Permalink
Implement write feature
Browse files Browse the repository at this point in the history
closes #3
  • Loading branch information
jagedn authored Dec 9, 2024
1 parent c87b67b commit 0a8b064
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 19 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ record MyRecord( int id, String name){}
splitParquet( [record: MyRecord] )
```

`toParquet( path )`
`toParquet( path, [record: MyRecord] )`

: Write each item in a source channel to a Parquet file.

A `record` class is required to let know the plugin about the structure of the file

## Development

Refer to the [nf-hello](https://github.com/nextflow-io/nf-hello) README for instructions on how to build, test, and publish Nextflow plugins.
2 changes: 1 addition & 1 deletion plugins/nf-parquet/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ dependencies {
compileOnly 'org.pf4j:pf4j:3.4.1'

// add here plugins depepencies
implementation 'com.jerolba:carpet-record:0.1.0'
implementation 'com.jerolba:carpet-record:0.2.0'

// test configuration
testImplementation "io.nextflow:nextflow:$nextflowVersion"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package nextflow.parquet

import nextflow.plugin.extension.Factory
import nextflow.plugin.extension.Function

import java.nio.file.Path

Expand All @@ -18,7 +16,6 @@ import nextflow.plugin.extension.PluginExtensionPoint

import com.jerolba.carpet.CarpetReader
import com.jerolba.carpet.CarpetWriter
import org.apache.parquet.hadoop.ParquetFileWriter

/**
* Implements extensions for reading and writing Parquet files.
Expand Down Expand Up @@ -54,6 +51,28 @@ class ParquetExtension extends PluginExtensionPoint {
return target
}

/**
* Write each item in a source channel to a Parquet file.
*
* @param source
* @param path
*/
@Operator
DataflowWriteChannel toParquet(DataflowReadChannel source, String path, Map params=[:]) {
final target = CH.createBy(source)
final writer = new ParquetWriter(path, params)
final onNext = {
writer.write(it as Record)
target << it
}
final onComplete = {
writer.close()
target << Channel.STOP
}
DataflowHelper.subscribeImpl(source, [onNext: onNext, onComplete: onComplete])
return target
}

class ParquetSplitter {
private DataflowWriteChannel target
private Class<Record> clazz
Expand Down Expand Up @@ -92,19 +111,24 @@ class ParquetExtension extends PluginExtensionPoint {

}

/**
* Write each item in a source channel to a Parquet file.
*
* @param source
* @param path
*/
@Operator
DataflowReadChannel toParquet(DataflowReadChannel source, String path) {
final onNext = {
println it
class ParquetWriter implements Closeable{
private Class<Record> clazz
CarpetWriter writer
ParquetWriter(String output, Map params){
if( !params.record ||!(params.record instanceof Class<Record>)) {
throw new IllegalArgumentException("A Record.class is required. Class provided $params.record")
}
this.clazz = params.record as Class<Record>
var outputStream = new FileOutputStream(output)
writer = new CarpetWriter<>(outputStream, this.clazz)
}
DataflowHelper.subscribeImpl(source, [onNext: onNext])
return source
}

void write(Record record){
writer.write(record)
}

void close(){
writer.close()
}
}
}
2 changes: 1 addition & 1 deletion plugins/nf-parquet/src/resources/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Manifest-Version: 1.0
Plugin-Id: nf-parquet
Plugin-Version: 0.1.0
Plugin-Version: 0.2.0
Plugin-Class: nextflow.parquet.ParquetPlugin
Plugin-Provider: nextflow
Plugin-Requires: >=24.04.2
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package nextflow.parquet

record DemoRecord(long id, String name) {

}
24 changes: 24 additions & 0 deletions plugins/nf-parquet/src/test/nextflow/parquet/PluginTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,28 @@ class PluginTest extends Dsl2Spec{
result.val == Channel.STOP
}

def 'should write a projection to a file'(){
when:
def pathInput = getClass().getResource('/test.parquet').toURI().path
def pathOutput = Files.createTempFile("", ".parquet")
def SCRIPT = """
include {splitParquet; toParquet} from 'plugin/nf-parquet'
import nextflow.parquet.DemoRecord
channel.of(1,2,3)
.map( { new DemoRecord(it, "The \$it record") } )
.toParquet("$pathOutput", [record:DemoRecord])
.view()
""".toString()
and:
def result = new MockScriptRunner([:]).setScript(SCRIPT).execute()
then:
result.val.id == 1
result.val.id == 2
result.val.id == 3
result.val == Channel.STOP
pathOutput.toFile().length()
}

}
10 changes: 10 additions & 0 deletions validation/write_custom.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
include { splitParquet; toParquet } from 'plugin/nf-parquet'

import myrecords.*

channel.of(1,2,3)
.map( {
new CustomRecord(it, "the $it record", it, it, it)
})
.toParquet("work/demo.parquet", [record: CustomRecord])
| view

0 comments on commit 0a8b064

Please sign in to comment.