Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/automated tests #41

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules
.vscode
.idea
.idea
/tests/data-tests/
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM semtech/mu-javascript-template:1.7.0
FROM semtech/mu-javascript-template:1.8.0
LABEL maintainer="[email protected]"
ENV PERSIST_STATE "false"
ENV LDES_ENDPOINT_VIEW "http://ldes-time-fragmenter:3000/example/1"
Expand Down
3 changes: 1 addition & 2 deletions app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import {
REPLACE_VERSIONS,
RUNONCE
} from "./config";
import { ConfigurableLDESOptions } from "./consumer";
import LdesPipeline from "./ldes-pipeline";
import LdesPipeline, { ConfigurableLDESOptions } from "./ldes-pipeline";
import { NamedNode } from "n3";
let taskIsRunning = false;

Expand Down
2 changes: 1 addition & 1 deletion config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ export const SPARQL_BATCH_SIZE = env.get("SPARQL_BATCH_SIZE").default(0).asInt()
export const ENABLE_SPARQL_BATCHING = SPARQL_BATCH_SIZE > 0;
export const SPARQL_ENDPOINT_HEADER_PREFIX = "SPARQL_ENDPOINT_HEADER_";
export const SKIP_ERRORS = env.get("SKIP_ERRORS").asBool();
export const PERSIST_STATE = env.get("PERSIST_STATE").asBool()
export const PERSIST_STATE = env.get("PERSIST_STATE").asBool();
27 changes: 11 additions & 16 deletions ldes-pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import { pipeline } from 'stream/promises';
import { pipeline } from "stream/promises";
import { newEngine, LDESClient, State } from "@treecg/actor-init-ldes-client";
import * as RDF from "rdf-js";
import { NamedNode } from "rdf-js";
import { extractEndpointHeadersFromEnv } from "./utils";
import { extractEndpointHeadersFromEnv } from "./rdf-utils";
import { LDES_ENDPOINT_HEADER_PREFIX, PERSIST_STATE } from "./config";
import { fetchState, updateState } from "./sparql-queries";
import { DataFactory } from "n3";
const { quad, variable } = DataFactory;
import MemberProcessor from './member-processor';
import MemberProcessor from "./member-processor";

export interface ConfigurableLDESOptions {
pollingInterval?: number;
mimeType?: string;
dereferenceMembers?: boolean;
requestsPerMinute?: number;
};
}

interface LDESOptions {
representation: string
Expand Down Expand Up @@ -57,28 +55,25 @@ export default class LdesPipeline {
this.datasetIri = datasetIri;
}

async consumeStream() {
async consumeStream () {
const lastState = PERSIST_STATE ? await fetchState(this.datasetIri) : undefined;
try {
const ldesStream = this.client.createReadStream(
this.endpoint,
// @ts-ignore
this.ldesOptions,
lastState as State | undefined
);
const memberProcessor = new MemberProcessor();
await pipeline(
ldesStream,
memberProcessor
)
if (PERSIST_STATE)
await updateState(this.datasetIri, ldesStream.exportState());
console.log('finished processing stream');
}
catch (e) {
console.log('processing stream failed');
);
if (PERSIST_STATE) { await updateState(this.datasetIri, ldesStream.exportState()); }
console.log("finished processing stream");
} catch (e) {
console.log("processing stream failed");
console.error(e);
}
}


}
33 changes: 16 additions & 17 deletions member-processor.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import { Writable } from 'stream';
import { Writable } from "stream";
import * as RDF from "rdf-js";
import { TreeProperties, extractVersionTimestamp, extractBaseResourceUri, convertBlankNodes } from "./utils";
import { TreeProperties, extractVersionTimestamp, extractBaseResourceUri, convertBlankNodes } from "./rdf-utils";
import { LDES_VERSION_OF_PATH, LDES_TIMESTAMP_PATH, REPLACE_VERSIONS } from "./config";
import { executeDeleteInsertQuery, getLatestTimestamp } from "./sparql-queries";
import { DataFactory } from "n3";
const { quad, variable } = DataFactory;


export type Member = {
id: RDF.Term;
quads: RDF.Quad[];
};

type MemberWithCallBack = {
type MemberWithCallBack = {
member: Member;
callback: (e?: Error) => void;
}
Expand All @@ -28,7 +27,7 @@ export default class MemberProcessor extends Writable {
private latestVersionMap : Map<string, Date> = new Map();
membersToProcess: MemberWithCallBack[] = [];

constructor() {
constructor () {
super({ objectMode: true, highWaterMark: 1000 });
this.treeProperties = {
versionOfPath: LDES_VERSION_OF_PATH,
Expand All @@ -37,8 +36,8 @@ export default class MemberProcessor extends Writable {
this.processingLoop();
}

_write(member: Member, _encoding : string , callback: () => void) {
this.membersToProcess.push({member, callback});
_write (member: Member, _encoding : string, callback: () => void) {
this.membersToProcess.push({ member, callback });
return true;
}

Expand All @@ -48,16 +47,17 @@ export default class MemberProcessor extends Writable {
if (next) {
try {
await this.processMember(next.member);
await next.callback();
}
catch (e) {
next.callback();
} catch (e) {
console.error(e);
await next.callback(e);
// @ts-ignore
next.callback(e);
// @ts-ignore
this.destroy(e);
}
}
await timeout(10);
} while (! this.closed);
} while (!this.closed);
}

async processMember (member: Member) {
Expand All @@ -75,12 +75,12 @@ export default class MemberProcessor extends Writable {
if (versionTimestamp) {
this.latestVersionMap.set(baseResourceUri.value, versionTimestamp);
}
// eslint-disable-next-line brace-style
}
// Case: the retreived version is newer then the last version found in the store.
// Case: the retrieved version is newer then the last version found in the store.
else if (latestTimestamp && versionTimestamp && versionTimestamp > latestTimestamp) {

// Here, we only want the latest version of the resource in the store.
if(REPLACE_VERSIONS) {
if (REPLACE_VERSIONS) {
quadsToRemove.push(
quad(variable("s"), this.treeProperties.versionOfPath, baseResourceUri)
);
Expand All @@ -92,8 +92,7 @@ export default class MemberProcessor extends Writable {
}
quadsToAdd = member.quads;
}
}
else {
} else {
console.warn(`
No baseResourceUri found for the member. This might potentialy be an odd LDES-feed.
If this member contained blank nodes, multiple instances of the same blank nodes will be created.
Expand Down
Loading