Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embed extra operators to Stream #227

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/extra/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,13 @@ export default function buffer<T>(s: Stream<any>): (ins: Stream<T>) => Stream<Ar
return new Stream<Array<T>>(new BufferOperator<T>(s, ins));
};
}

declare module '../index' {
interface Stream<T> {
buffer<T>(s: Stream<any>): Stream<Array<T>>;
}
}

Stream.prototype.buffer = function (s: any): any {
return this.compose(buffer(s));
};
10 changes: 10 additions & 0 deletions src/extra/dropRepeats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,13 @@ export default function dropRepeats<T>(isEqual: ((x: T, y: T) => boolean) | unde
return new Stream<T>(new DropRepeatsOperator<T>(ins, isEqual));
};
}

declare module '../index' {
interface Stream<T> {
dropRepeats<T>(isEqual?: ((x: T, y: T) => boolean) | undefined): Stream<T>;
}
}

Stream.prototype.dropRepeats = function (isEqual: any): any {
return this.compose(dropRepeats(isEqual));
};
10 changes: 10 additions & 0 deletions src/extra/flattenConcurrently.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,13 @@ export class FlattenConcOperator<T> implements Operator<Stream<T>, T> {
export default function flattenConcurrently<T>(ins: Stream<Stream<T>>): Stream<T> {
return new Stream<T>(new FlattenConcOperator(ins));
}

declare module '../index' {
interface Stream<T> {
flattenConcurrently<R>(this: Stream<Stream<R>>): T;
}
}

Stream.prototype.flattenConcurrently = function (): any {
return this.compose(flattenConcurrently);
};
10 changes: 10 additions & 0 deletions src/extra/flattenSequentially.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,13 @@ export class FlattenSeqOperator<T> implements Operator<Stream<T>, T> {
export default function flattenSequentially<T>(ins: Stream<Stream<T>>): Stream<T> {
return new Stream<T>(new FlattenSeqOperator(ins));
}

declare module '../index' {
interface Stream<T> {
flattenSequentially<R>(this: Stream<Stream<R>>): T;
}
}

Stream.prototype.flattenSequentially = function (): any {
return this.compose(flattenSequentially);
};
2 changes: 1 addition & 1 deletion tests/extra/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ describe('buffer (extra)', () => {
it('should flush and complete when source is completed', (done) => {
const source = xs.of(1, 2).compose(delay(10));
const separator = xs.empty().compose(delay(50));
const buffered = source.compose(buffer(separator));
const buffered = source.buffer(separator);
const expected = [[1, 2]];

let separatorComplete = false;
Expand Down
2 changes: 1 addition & 1 deletion tests/extra/dropRepeats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe('dropRepeats (extra)', () => {

it('should drop consecutive \'duplicate\' strings, with a custom isEqual', (done: any) => {
const stream = xs.of('a', 'b', 'a', 'A', 'B', 'b')
.compose(dropRepeats((x: string, y: string) => x.toLowerCase() === y.toLowerCase()));
.dropRepeats((x: string, y: string) => x.toLowerCase() === y.toLowerCase());
const expected = ['a', 'b', 'a', 'B'];

stream.addListener({
Expand Down
2 changes: 1 addition & 1 deletion tests/extra/flattenConcurrently.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ describe('flattenConcurrently (extra)', () => {
it('should expand each periodic event with 3 sync events', (done: any) => {
const stream = xs.periodic(100).take(3)
.map(i => xs.of(1 + i, 2 + i, 3 + i))
.compose(flattenConcurrently);
.flattenConcurrently();
const expected = [1, 2, 3, 2, 3, 4, 3, 4, 5];

stream.addListener({
Expand Down
2 changes: 1 addition & 1 deletion tests/extra/flattenSequentially.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ describe('flattenSequentially (extra)', () => {
it('should expand each periodic event with 3 sync events', (done: any) => {
const stream = xs.periodic(100).take(3)
.map(i => xs.of(1 + i, 2 + i, 3 + i))
.compose(flattenSequentially);
.flattenSequentially();
const expected = [1, 2, 3, 2, 3, 4, 3, 4, 5];
const listener = {
next: (x: number) => {
Expand Down