Skip to content

Commit

Permalink
Merge pull request #576 from gnieh/cookbook/jsonlines
Browse files Browse the repository at this point in the history
Add cookbook for dealing with JSON Lines data
  • Loading branch information
satabin authored Mar 13, 2024
2 parents a13966e + 123d60f commit 7943b8d
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ jobs:
run: npx -y pagefind --site msite/target/docs/site

- name: Publish site
if: github.event_name != 'pull_request' && startsWith(github.ref, 'refs/tags/v')
if: github.event_name != 'pull_request' && github.ref == 'refs/heads/main'
uses: peaceiris/[email protected]
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
Expand Down
19 changes: 0 additions & 19 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ ThisBuild / crossScalaVersions := Seq(scala212, scala213, scala3)
ThisBuild / scalaVersion := scala213
ThisBuild / tlJdkRelease := Some(11)

ThisBuild / tlSitePublishBranch := None
ThisBuild / tlSitePublishTags := true

val commonSettings = List(
versionScheme := Some("early-semver"),
libraryDependencies ++= List(
Expand Down Expand Up @@ -521,21 +518,6 @@ val chatLink: IconLink = IconLink.external("https://discord.gg/XF3CXcMzqD", Heli
val mastodonLink: IconLink =
IconLink.external("https://fosstodon.org/@lucassatabin", HeliumIcon.mastodon)

val sonatypeApiUrl = setting {
CrossVersion(
crossVersion.value,
scalaVersion.value,
scalaBinaryVersion.value
).map { cross =>
val host = sonatypeCredentialHost.value
val repo = if (isSnapshot.value) "snapshots" else "releases"
val org = organization.value.replace('.', '/')
val mod = cross("fs2-data-docs")
val ver = version.value
url(s"https://$host/service/local/repositories/$repo/archive/$org/$mod/$ver/$mod-$ver-javadoc.jar/!/index.html")
}
}

lazy val site = project
.in(file("msite"))
.enablePlugins(TypelevelSitePlugin)
Expand Down Expand Up @@ -583,7 +565,6 @@ lazy val site = project
),
scalacOptions += "-Ymacro-annotations",
mdocIn := file("site"),
tlSiteApiUrl := sonatypeApiUrl.value,
laikaConfig := tlSiteApiUrl.value
.fold(LaikaConfig.defaults)(url =>
LaikaConfig.defaults
Expand Down
4 changes: 4 additions & 0 deletions site/cookbooks/data/jsonl/nested.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"name": "Gilbert", "wins": [["straight", "7♣"], ["one pair", "10♥"]]}
{"name": "Alexa", "wins": [["two pair", "4♠"], ["two pair", "9♠"]]}
{"name": "May", "wins": []}
{"name": "Deloise", "wins": [["three of a kind", "5♣"]]}
2 changes: 1 addition & 1 deletion site/cookbooks/jq.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ It compiles for all three supported platforms:
@:choice(jvm)

```shell
$ LC_ALL=C.UTF-8 sbt exampleJqJVM/assembly
$ sbt exampleJqJVM/assembly
$ java -jar examples/jqlike/.jvm/target/scala-2.13/jq-like.jar -q '.[] | { "full_name": .name, "language": .language }' -f site/cookbooks/data/json/sample.json
```

Expand Down
142 changes: 142 additions & 0 deletions site/cookbooks/jsonlines.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Handling JSON Lines data

The [JSON Lines format][jsonlines] describes a way to represent JSON records on single lines. This allows to process records one at a time, reading them as they come.
In this cookbook, we will demonstrate how such data can be read and produced using `fs2-data`.

## Reading JSON Lines data

The `fs2-data` [JSON module][json] can natively read concatenated JSON values in an input stream This means that we can naively read data from the input stream and we will get the token stream out of it.

However, this way, we will not check that the input is actually respecting the JSON lines format. The format actually only has a few rules:
- Input must be UTF-8 encoded.
- Each line is a valid JSON value.
- Lines are separated by `\n`.

We can leverage the operators provided by `fs2` and `fs2-data` to enforce these constraints when reading data.

```scala mdoc
import cats.effect.unsafe.implicits.global

import cats.effect.IO
import fs2.Stream
import fs2.data.json.JsonException
import fs2.data.json.circe._
import fs2.io.file.{Files, Path}
import io.circe.Json

def readJsonLines(input: Stream[IO, Byte]): Stream[IO, Json] =
input
// rule #1: input must be UTF-8 encoded
.through(fs2.text.utf8.decode)
// rule #3: new line delimiter is '\n'
.through(fs2.text.lines)
.flatMap { line =>
// rule #2: values must be encoded on single lines
Stream
.emit(line)
.covary[IO]
.through(fs2.data.json.ast.parse)
.handleErrorWith { t =>
Stream.raiseError[IO](JsonException(s"'$line' is not a valid JSON value", inner = t))
}
}

```

Using this function, we can read a [JSON Lines data file][data-jsonl] and wrap the elements in an array.

```scala mdoc
val array =
Files[IO]
.readAll(Path("site/cookbooks/data/jsonl/nested.jsonl"))
.through(readJsonLines)
.through(fs2.data.json.ast.tokenize)
.through(fs2.data.json.wrap.asTopLevelArray)
.through(fs2.data.json.render.pretty())

array
.compile
.string
.unsafeRunSync()
```

This reading function will fail if the input data is not JSON Lines encoded.

```scala mdoc
array
.through(fs2.text.utf8.encode)
.through(readJsonLines)
.compile
.drain
.attempt
.unsafeRunSync()
```

## Producing JSON Lines data

Similarly, using `fs2` and `fs2-data` operators, we can generate a stream that will emit each record on a single line.

```scala mdoc
def writeJsonLines(input: Stream[IO, Json]): Stream[IO, Byte] =
input
.chunkLimit(1)
.unchunks
.flatMap { data =>
// rule #2: values must be encoded on single lines
Stream.emit(data).through(fs2.data.json.ast.tokenize).through(fs2.data.json.render.compact)
}
// rule #3: new line delimiter is '\n'
.intersperse("\n")
// rule #1: input must be UTF-8 encoded
.through(fs2.text.utf8.encode)
```

Using this function, we can generate JSON Lines encoded data out of a [sample JSON array][data-json]

```scala mdoc
import fs2.data.json.jsonpath.literals._

Files[IO]
.readAll(Path("site/cookbooks/data/json/sample.json"))
.through(fs2.text.utf8.decode)
.through(fs2.data.json.tokens)
.through(fs2.data.json.jsonpath.filter.values(jsonpath"$$[*]"))
.take(5)
.through(writeJsonLines)
.through(fs2.text.utf8.decode)
.compile
.string
.unsafeRunSync()
```

## Running the full example

The full code can be found in the repository as a [Scala CLI][scala-cli] [script][jsonlines-script].
This example uses [decline][decline] to parse the CLI options.

@:select(platform)

@:choice(jvm)
```shell
$ scala-cli site/cookbooks/scripts/jsonlines.scala -- read site/cookbooks/data/jsonl/nested.jsonl
```

@:choice(js)
```shell
$ scala-cli --js site/cookbooks/scripts/jsonlines.scala -- read site/cookbooks/data/jsonl/nested.jsonl
```

@:choice(native)
```shell
$ scala-cli --native site/cookbooks/scripts/jsonlines.scala -- read site/cookbooks/data/jsonl/nested.jsonl
```

@:@

[jsonlines]: https://jsonlines.org/
[json]: /documentation/json/index.md
[data-jsonl]: /cookbooks/data/jsonl/nested.jsonl
[data-json]: /cookbooks/data/json/sample.json
[scala-cli]: https://scala-cli.virtuslab.org/
[jsonlines-script]: /cookbooks/scripts/jsonlines.scala
[decline]: https://ben.kirw.in/decline/
89 changes: 89 additions & 0 deletions site/cookbooks/scripts/jsonlines.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//> using toolkit typelevel:default
//> using dep org.gnieh::fs2-data-json-circe::1.10.0

import cats.effect.{ExitCode, IO, IOApp}
import com.monovore.decline.{Command, Opts}
import com.monovore.decline.effect.CommandIOApp
import fs2.Stream
import fs2.data.json.JsonException
import fs2.data.json.circe._
import fs2.data.json.jsonpath.literals._
import fs2.io.file.{Files, Path}
import io.circe.Json

sealed trait Opt
object Opt {
case class Read(input: Path) extends Opt
case class Produce(input: Path) extends Opt
}

object JsonLines
extends CommandIOApp(name = "fs2-jsonelines",
header = "A simple example CLI tool to read and write JSON Lines data") {

def readJsonLines(input: Stream[IO, Byte]): Stream[IO, Json] =
input
// rule #1: input must be UTF-8 encoded
.through(fs2.text.utf8.decode)
// rule #3: new line delimiter is '\n'
.through(fs2.text.lines)
.flatMap { line =>
// rule #2: values must be encoded on single lines
Stream
.emit(line)
.covary[IO]
.through(fs2.data.json.ast.parse)
.handleErrorWith { t =>
Stream.raiseError[IO](JsonException(s"'$line' is not a valid JSON value", inner = t))
}
}

def writeJsonLines(input: Stream[IO, Json]): Stream[IO, Byte] =
input
.flatMap { data =>
// rule #2: values must be encoded on single lines
Stream.emit(data).through(fs2.data.json.ast.tokenize).through(fs2.data.json.render.compact)
}
// rule #3: new line delimiter is '\n'
.intersperse("\n")
// rule #1: input must be UTF-8 encoded
.through(fs2.text.utf8.encode)

val read: Opts[Opt] = Opts.subcommand(
Command(name = "read", header = "Read a JSON lines formatted file, and print data in a pretty printed JSON array") {
Opts.argument[String]("file.jsonl").map(jp => Opt.Read(Path(jp)))
})

val write: Opts[Opt] =
Opts.subcommand(Command(name = "produce", header = "Read a JSON array, and print data in a JSON lines format") {
Opts.argument[String]("file.json").map(jp => Opt.Produce(Path(jp)))
})

override def main: Opts[IO[ExitCode]] =
read.orElse(write).map {
case Opt.Read(input) =>
Files[IO]
.readAll(input)
.through(readJsonLines)
.through(fs2.data.json.ast.tokenize)
.through(fs2.data.json.wrap.asTopLevelArray)
.through(fs2.data.json.render.pretty())
.through(fs2.text.utf8.encode)
.through(fs2.io.stdout)
.compile
.drain
.as(ExitCode.Success)
case Opt.Produce(input) =>
Files[IO]
.readAll(input)
.through(fs2.text.utf8.decode)
.through(fs2.data.json.tokens)
.through(fs2.data.json.jsonpath.filter.values(jsonpath"$$[*]"))
.through(writeJsonLines)
.through(fs2.io.stdout)
.compile
.drain
.as(ExitCode.Success)
}

}

0 comments on commit 7943b8d

Please sign in to comment.