Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use rdf-connect/ldes-client #45

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ FROM semtech/mu-javascript-template:1.7.0
LABEL maintainer="[email protected]"
ENV PERSIST_STATE "false"
ENV LDES_ENDPOINT_VIEW "http://ldes-time-fragmenter:3000/example/1"
ENV LDES_STREAM "http://example.org/example-stream"
ENV REPLACE_VERSIONS "true"
ENV DEBUG_AUTH_HEADERS "false"
ENV LOG_SPARQL_ALL "false"
66 changes: 44 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,54 @@ Add the following snipped to your docker-compose.yml to include the consumer ser

```
consumer:
image: redpencil/ldes-consumer
image: redpencil/ldes-consumer
volumes:
- ./data/ldes-consumer:/data
```


## Configuration

The service can be configured with the following environment variables:

- `BLANK_NODE_NAMESPACE` [string]: namespace to use for skolemizing blank nodes (default 'http://mu.semte.ch/blank#')
- `CRON_PATTERN` [string]: the cron pattern which the cronjob should use. (default: `* 0 * * * *`)
- `DEBUG_AUTH_HEADERS`: Debugging of [mu-authorization](https://github.com/mu-semtech/mu-authorization) access-control related headers (default `false`)
- `LDES_DEREFERENCE_MEMBERS`: whether to dereference members, because the collection pages do not contain all information (default: false)
- `LDES_ENDPOINT_HEADER_<key>` [string]: A header key-value combination which should be send as part of the headers to the LDES ENDPOINT. E.g. `LDES_ENDPOINT_HEADER_X-API-KEY: <api_key>`.
- `LDES_ENDPOINT_VIEW` [string]: the ldes endpoint containing the view (the first page) of the stream.
- `LDES_POLLING_INTERVAL`: Number of milliseconds before refetching uncacheable fragments (default: 60000)
- `LDES_REQUESTS_PER_MINUTE`: how many requests per minutes may be sent to the same host (optional, any positive number)
- `LDES_STREAM` [string]: the uri which should be used as a subject to store the latest page and timestamp consumed in the database. (default: `http://mu.semte.ch/example-stream`)
- `LDES_TIMESTAMP_PATH` [string]: the predicate to be used to find the timestamp of an object, default: `prov:generatedAtTime`)
- `LDES_VERSION_OF_PATH` [string]: the predicate to be used to find the link to the non version object, default: `dcterms:isVersionOf`)
- `LOG_SPARQL_ALL` [boolean]: log executed SPARQL queries (default: `false`)
- `MU_APPLICATION_GRAPH` [string]: The graph where the data should be ingested. (default: see [semantic.works default graph](https://github.com/mu-semtech/mu-javascript-template/blob/d3281b8dff24502919a75147f7737b83d4dd724f/Dockerfile#L8))
- `MU_SPARQL_ENDPOINT` [string]: SPARQL endpoint to connect to, defaults to 'http://database:8890/sparql'
- `REPLACE_VERSIONS` [boolean]: boolean which indicates whether to remove old versions of a resource when adding a new version or not (default: `true`)
- `RUNONCE` [boolean]: set to true to run the consumer only once (e.g. when running the service as a Kubernetes CronJob). (default: `false`)
- `SPARQL_AUTH_PASSWORD` [string]: provide a passwords to be used in a digest auth to be sent to the SPARQL endpoint.
- `SPARQL_AUTH_USER` [string]: (optional) provide a username to be used in a digest auth to be sent to the SPARQL endpoint.
- `SPARQL_BATCH_SIZE` [integer]: amount of triples sent per query. To work around triplestore query-length limitations (default: `0` - disabled).
- `SPARQL_ENDPOINT_HEADER_<key>` [string]: A header key-value combination which should be send as part of the headers to the SPARQL ENDPOINT.
- `PERSIST_STATE` [boolean]: whether to persist (and restore) state of the ldes client on runs. (default: `false`)
| Environment variable | Default | Description |
|----------------------|---------|-------------|
| `INGEST_MODE` | `ALL` | How the LDES feed should be ingested. Valid options are `ALL` and `MATERIALIZE`. `ALL` will ingest all versioned members as-is and store them in the triplestore. `MATERIALIZE` will store the [materializations of the members](https://semiceu.github.io/LinkedDataEventStreams/#version-materializations). |
| `REPLACE_VERSIONS` | `false` | Whether to remove old versions of a resource when adding a new version or not. |
| `PERSIST_STATE` | `false` | Whether to persist the state of the LDES client. The state is stored as a file in `/data/hostname($LDES_ENDPOINT_VIEW)-state.json`, make sure to mount the data folder to have access to store the state across container rebuilds! |
| `LDES_ENDPOINT_VIEW` | N/A (required) | The view of the LDES endpoint that will be ingested. If not set, the service will not start. |
| `LDES_POLLING_INTERVAL` | `60000` | Number of milliseconds before refetching uncacheable fragments |
| `LDES_REQUESTS_PER_MINUTE` | `0` (unlimited) | How many requests per minutes may be sent to the same host. This is optional, but any passed in value must be a positive number. |
| `LDES_ENDPOINT_HEADERS` | `{}` (no headers will be added) | Extra headers that will be added to the requests sent to the LDES endpoint. Recommended syntax:<pre>environment:<br> LDES_ENDPOINT_HEADERS: ><br> { "HEADER-NAME": "header-value" } # The leading whitespace is important!</pre> |
| `LDES_VERSION_OF_PATH` | `undefined` (will use LDES feed metadata) | The predicate to be used to find the link to the non version object. If no value is provided and the LDES feed does not provide the metadata, the service will throw an error after starting. |
| `LDES_TIMESTAMP_PATH` | `undefined` (will use LDES feed metadata) | The predicate to be used to find the timestamp of an object. If no value is provided and the LDES feed does not provide the metadata, the service will throw an error after starting. |
| `SPARQL_ENDPOINT_HEADER_<key>` | N/A | A header key-value combination which should be send as part of the headers to the SPARQL endpoint. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guessing this will also need to mvoe to a json structure, similar to LDES_ENDPOINT_HEADERS.

| `SPARQL_BATCH_SIZE` | `0` (disabled) | The amount of triples sent per query, used to work around triplestore query-length limitations. Value must be a non-negative integer. If set to 0, no batching will be applied. |
| `SPARQL_AUTH_USER` | N/A | Optional value to provide a username to be used in a digest auth to be sent to the SPARQL endpoint. |
| `SPARQL_AUTH_PASSWORD` | N/A | Optional value to provide a password to be used in a digest auth to be sent to the SPARQL endpoint. |
| `BLANK_NODE_NAMESPACE` | `http://mu.semte.ch/blank#` | namespace to use for skolemizing blank nodes. |
| `CRON_PATTERN` | `* 0 * * * *` | The cron pattern which the cronjob should use. |
| `RUN_ONCE` | `false` | Set to true to run the consumer only once (e.g. when running the service as a Kubernetes CronJob).
| `MU_APPLICATION_GRAPH` | See [semantic.works default graph](https://github.com/mu-semtech/mu-javascript-template/blob/d3281b8dff24502919a75147f7737b83d4dd724f/Dockerfile#L8) | The graph where the data should be ingested. |
| `MU_SPARQL_ENDPOINT` | `http://database:8890/sparql` | SPARQL endpoint to connect to. |
| `LOG_SPARQL_ALL` | `false` | Log executed SPARQL queries |
| `DEBUG_AUTH_HEADERS` | `false` | Debugging of [mu-authorization](https://github.com/mu-semtech/mu-authorization) access-control related headers |


> [!WARNING]
> The following environment variables are **deprecated** and slated to be removed at a later time, but still supported:

| Environment variable | Default | Description |
|----------------------|---------|-------------|
| `RUNONCE` | `false` | Set to true to run the consumer only once (e.g. when running the service as a Kubernetes CronJob). Replaced with `RUN_ONCE` environment variable. |

> [!CAUTION]
> The following environment variables are **no longer supported**:

| Environment variable | Reason | Description |
|----------------------|--------|-------------|
| `LDES_DEREFERENCE_MEMBERS` | The underlying library does not make dereferencing optional. | Whether to dereference members, because the collection pages do not contain all information. |
nvdk marked this conversation as resolved.
Show resolved Hide resolved
| `LDES_STREAM` | The underlying library stores the LDES stream state in a file and we no longer store this info in the triplestore. | The uri which should be used as a subject to store the latest page and timestamp consumed in the database. |
| `LDES_TIMESTAMP_PATH` | Materialization and versioning support is provided by the underlying library, which expects to find this information attached to the LDES feed. | The predicate to be used to find the timestamp of an object. |
| `LDES_VERSION_OF_PATH` | Materialization and versioning support is provided by the underlying library, which expects to find this information attached to the LDES feed.| The predicate to be used to find the link to the non version object. |
| `LDES_ENDPOINT_HEADER_<key>` | Newer versions of Node.js do not support environment variables with dashes in their name. Stuff like `LDES_ENDPOINT_HEADER_X-API-KEY` is no longer supported. | A header key-value combination which should be send as part of the headers to the LDES endpoint. E.g. `LDES_ENDPOINT_HEADER_X-API-KEY: <api_key>`. |
164 changes: 107 additions & 57 deletions app.ts
Original file line number Diff line number Diff line change
@@ -1,69 +1,119 @@
import { CronJob } from "cron";
import fs from 'fs';
import { enhanced_fetch, intoConfig, LDESInfo, replicateLDES } from 'ldes-client';
import {
CRON_PATTERN,
LDES_DEREFERENCE_MEMBERS,
LDES_ENDPOINT_HEADER_PREFIX,
INGEST_MODE,
REPLACE_VERSIONS,
PERSIST_STATE,
LDES_ENDPOINT_VIEW,
LDES_POLLING_INTERVAL,
LDES_REQUESTS_PER_MINUTE,
LDES_STREAM,
LDES_TIMESTAMP_PATH,
RUN_ONCE,
NODE_ENV,
logConfig,
LDES_VERSION_OF_PATH,
REPLACE_VERSIONS,
RUNONCE
} from "./config";
import { ConfigurableLDESOptions } from "./consumer";
import LdesPipeline from "./ldes-pipeline";
import { NamedNode } from "n3";
let taskIsRunning = false;
LDES_TIMESTAMP_PATH,
} from "./cfg";
import { memberProcessor } from './lib/member-processor';
import { custom_fetch } from './lib/fetch/custom-fetch';
import { getLoggerFor } from './lib/logger';
import { DataFactory } from "n3";

const consumerJob = new CronJob(CRON_PATTERN, async () => {
nvdk marked this conversation as resolved.
Show resolved Hide resolved
if (taskIsRunning) {
console.log("Another task is still running");
return;
}
const { namedNode } = DataFactory;

logConfig();

if (NODE_ENV === "production") {
main();
} else {
const timeout = 10_000; // Make this configurable?
console.log(`Starting LDES consumer in ${timeout}ms, connect to your debugger now :)`);
setTimeout(main, timeout);
}

async function main() {
let stateFilePath;
try {
taskIsRunning = true;
const endpoint = LDES_ENDPOINT_VIEW;
if (endpoint) {
const ldesOptions: ConfigurableLDESOptions = {
dereferenceMembers: LDES_DEREFERENCE_MEMBERS,
pollingInterval: LDES_POLLING_INTERVAL
};
if (LDES_REQUESTS_PER_MINUTE) {
ldesOptions.requestsPerMinute = LDES_REQUESTS_PER_MINUTE;
}
const datasetIri = new NamedNode(LDES_STREAM);
const consumer = new LdesPipeline({ datasetIri, endpoint, ldesOptions });
console.log("Started processing " + endpoint);
await consumer.consumeStream();
console.log("Finished processing " + endpoint);
if (RUNONCE) {
console.log("Job is complete.");
process.exit();
const url = new URL(LDES_ENDPOINT_VIEW);
stateFilePath = `/data/${url.host}-state.json`;
} catch (e: any) {
throw new Error("Provided endpoint couldn't be parsed as URL, double check your settings.");
}

let shapeFile;
if (fs.existsSync('/config/shape.ttl')) {
shapeFile = '/config/shape.ttl';
}
const client = replicateLDES(
intoConfig({
url: LDES_ENDPOINT_VIEW,
urlIsView: true,
polling: !RUN_ONCE,
pollInterval: LDES_POLLING_INTERVAL,
stateFile: PERSIST_STATE ? stateFilePath : undefined,
materialize: INGEST_MODE === 'MATERIALIZE',
lastVersionOnly: REPLACE_VERSIONS, // Won't emit members if they're known to be older than what is already in the state file
loose: true, // Make this configurable? IPDC needs this to be true
shapeFile,
fetch: enhanced_fetch({
safe: true, // In case of an exception being thrown by fetch, this will just retry the call in a while (true) loop until it stops throwing? Not great.
/* In comment are the default values, perhaps we want to make these configurable
concurrent: 10, // Amount of concurrent requests to a single domain
retry: {
codes: [408, 425, 429, 500, 502, 503, 504], // Which faulty HTTP status codes will trigger retry
base: 500, // Seems to be unused in the client code
maxRetries: 5,
}*/
}, custom_fetch),

}),
"none",
);

client.on("error", (error: any) => {
logger.info("Received an error from the LDES client!");
logger.error(error);
logger.error(error.stack);
})

const getLDESInfo = async (): Promise<LDESInfo> => {
return new Promise(
(resolve, reject) => {
try {
client.on('description', (info: LDESInfo) => {
resolve(info);
});
} catch (e) {
reject(e);
}
}
)
};

const logger = getLoggerFor('main');
logger.info('Starting stream...');
const ldesStream = client.stream({ highWaterMark: 10 });
try {
logger.info('Waiting for LDES info...');
const { isVersionOfPath: versionOfPath, timestampPath } = await getLDESInfo();
if (versionOfPath !== undefined && timestampPath !== undefined) {
logger.info(`Received LDES info: ${JSON.stringify({ versionOfPath, timestampPath })}`);
} else if (LDES_VERSION_OF_PATH !== undefined && LDES_TIMESTAMP_PATH !== undefined) {
logger.info(`LDES feed info contained no versionOfPath & timestampPath, using provided values: ${JSON.stringify({ LDES_VERSION_OF_PATH, LDES_TIMESTAMP_PATH })}`);
} else {
throw new Error("No endpoint provided");
throw new Error('LDES feed info contained no versionOfPath & timestampPath and no LDES_VERSION_OF_PATH & LDES_TIMESTAMP_PATH were provided to service, exiting.');
}

await ldesStream.pipeTo(
memberProcessor(
versionOfPath ?? namedNode(LDES_VERSION_OF_PATH as string),
timestampPath ?? namedNode(LDES_TIMESTAMP_PATH as string),
)
);

logger.info('Finished processing stream');
} catch (e) {
console.error(e);
logger.error('Processing stream failed');
logger.error(e);
} finally {
taskIsRunning = false;
ldesStream.cancel();
}
});

console.log("config", {
CRON_PATTERN,
LDES_DEREFERENCE_MEMBERS,
LDES_ENDPOINT_HEADER_PREFIX,
LDES_ENDPOINT_VIEW,
LDES_POLLING_INTERVAL,
LDES_REQUESTS_PER_MINUTE,
LDES_STREAM,
LDES_TIMESTAMP_PATH,
LDES_VERSION_OF_PATH,
REPLACE_VERSIONS,
RUNONCE
});

consumerJob.start();
}
69 changes: 69 additions & 0 deletions cfg.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import env from "env-var";
import { getLoggerFor } from "./lib/logger";

const logger = getLoggerFor("config");

export const LDES_ENDPOINT_VIEW = env.get("LDES_ENDPOINT_VIEW").required().asString();
export const LDES_POLLING_INTERVAL = env.get("LDES_POLLING_INTERVAL").default(60000).asIntPositive();
export const LDES_REQUESTS_PER_MINUTE = env.get("LDES_REQUESTS_PER_MINUTE").default(0).asIntPositive();
export const LDES_ENDPOINT_HEADERS_STRING = env.get("LDES_ENDPOINT_HEADERS").default("{}").asString();
export const LDES_VERSION_OF_PATH = env.get("LDES_VERSION_OF_PATH").asString();
export const LDES_TIMESTAMP_PATH = env.get("LDES_TIMESTAMP_PATH").asString();
export let LDES_ENDPOINT_HEADERS = {};

try {
LDES_ENDPOINT_HEADERS = JSON.parse(LDES_ENDPOINT_HEADERS_STRING);
} catch (e: any) {
logger.error(`Failed to parse contents of LDES_HEADERS. Faulty content: ${LDES_ENDPOINT_HEADERS_STRING}`);
logger.error(e);
throw e;
}


export const INGEST_MODE = env.get("INGEST_MODE").default("ALL").asEnum(["ALL", "MATERIALIZE"]);
export const REPLACE_VERSIONS = env.get("REPLACE_VERSIONS").default("true").asBool();
export const PERSIST_STATE = env.get("PERSIST_STATE").default("false").asBool()

export const SPARQL_ENDPOINT_HEADER_PREFIX = "SPARQL_ENDPOINT_HEADER_";

export const SPARQL_BATCH_SIZE = env.get("SPARQL_BATCH_SIZE").default(0).asIntPositive();
export const ENABLE_SPARQL_BATCHING = SPARQL_BATCH_SIZE > 0;

export const SPARQL_AUTH_USER = env.get("SPARQL_AUTH_USER").asString();
export const SPARQL_AUTH_PASSWORD = env.get("SPARQL_AUTH_PASSWORD").asString();

export const BLANK_NODE_NAMESPACE = env.get("BLANK_NODE_NAMESPACE").default("http://mu.semte.ch/blank#").asString();
export const CRON_PATTERN = env.get("CRON_PATTERN").default("* 0 * * * *").asString();

const RUN_ONCE_VAR = env.get("RUN_ONCE").default("false").asBool();
const RUNONCE_VAR = env.get("RUNONCE").default("false").asBool();
export const RUN_ONCE = RUN_ONCE_VAR || RUNONCE_VAR;

export const MU_APPLICATION_GRAPH = env.get("MU_APPLICATION_GRAPH").required().asString(); // Provided by template
export const MU_SPARQL_ENDPOINT = env.get("MU_SPARQL_ENDPOINT").required().asString(); // Provided by template
export const DEBUG_AUTH_HEADERS = env.get("DEBUG_AUTH_HEADERS").default("false").asBool();

export const NODE_ENV = env.get("NODE_ENV").default("production").asEnum(["development", "production"]);

export function logConfig() {
// Should this use the logger instead?
console.log("Config", {
INGEST_MODE,
REPLACE_VERSIONS,
PERSIST_STATE,
LDES_ENDPOINT_VIEW,
LDES_POLLING_INTERVAL,
LDES_REQUESTS_PER_MINUTE,
SPARQL_BATCH_SIZE,
SPARQL_AUTH_USER,
SPARQL_AUTH_PASSWORD,
BLANK_NODE_NAMESPACE,
CRON_PATTERN,
RUN_ONCE,
MU_APPLICATION_GRAPH,
MU_SPARQL_ENDPOINT,
DEBUG_AUTH_HEADERS,
LDES_VERSION_OF_PATH,
LDES_TIMESTAMP_PATH,
});
}
24 changes: 0 additions & 24 deletions config.ts

This file was deleted.

Loading