Skip to content

Commit

Permalink
feat: update effects APIs (#68)
Browse files Browse the repository at this point in the history
* feat: update effects APIs

* fix: allow one effect per kind and replace effect if same kind is added

---------

Co-authored-by: Bryce Tham <[email protected]>
  • Loading branch information
brycetham and brycetham authored Jan 10, 2024
1 parent 8f644fd commit 64ac8b6
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 140 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
"dependencies": {
"@webex/ts-events": "^1.1.0",
"@webex/web-capabilities": "^1.1.0",
"@webex/web-media-effects": "^2.7.0",
"@webex/web-media-effects": "^2.15.6",
"events": "^3.3.0",
"js-logger": "^1.6.1",
"typed-emitter": "^2.1.0",
Expand Down
1 change: 1 addition & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export enum WebrtcCoreErrorType {
DEVICE_PERMISSION_DENIED = 'DEVICE_PERMISSION_DENIED',
CREATE_STREAM_FAILED = 'CREATE_STREAM_FAILED',
ADD_EFFECT_FAILED = 'ADD_EFFECT_FAILED',
}

/**
Expand Down
155 changes: 92 additions & 63 deletions src/media/local-stream.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import { BaseEffect } from '@webex/web-media-effects';
import { createBrowserMock } from '../mocks/create-browser-mock';
import MediaStreamStub from '../mocks/media-stream-stub';
import MediaStreamTrackStub from '../mocks/media-stream-track-stub';
import { mocked } from '../mocks/mock';
import { WebrtcCoreError } from '../errors';
import { createMockedStream } from '../util/test-utils';
import { LocalStream } from './local-stream';
import { LocalStream, LocalStreamEventNames, TrackEffect } from './local-stream';

/**
* A dummy LocalStream implementation so we can instantiate it for testing.
Expand Down Expand Up @@ -50,72 +46,105 @@ describe('LocalStream', () => {
expect(spy).toHaveBeenCalledWith();
});
});
});

describe('LocalTrack addEffect', () => {
createBrowserMock(MediaStreamStub, 'MediaStream');

// eslint-disable-next-line jsdoc/require-jsdoc
const createMockedTrackEffect = () => {
const effectTrack = mocked(new MediaStreamTrackStub());
const effect = {
dispose: jest.fn().mockResolvedValue(undefined),
load: jest.fn().mockResolvedValue(effectTrack),
on: jest.fn(),
};

return { effectTrack, effect };
};

// TODO: addTrack and removeTrack do not work the current implementation of createMockedStream, so
// we have to use the stubs here directly for now
const mockTrack = mocked(new MediaStreamTrackStub()) as unknown as MediaStreamTrack;
const mockStream = mocked(new MediaStreamStub([mockTrack])) as unknown as MediaStream;
let localStream: LocalStream;
beforeEach(() => {
localStream = new TestLocalStream(mockStream);
});
describe('addEffect', () => {
let effect: TrackEffect;
let loadSpy: jest.SpyInstance;
let emitSpy: jest.SpyInstance;

beforeEach(() => {
effect = {
id: 'test-id',
kind: 'test-kind',
isEnabled: false,
dispose: jest.fn().mockResolvedValue(undefined),
load: jest.fn().mockResolvedValue(undefined),
on: jest.fn(),
} as unknown as TrackEffect;

loadSpy = jest.spyOn(effect, 'load');
emitSpy = jest.spyOn(localStream[LocalStreamEventNames.EffectAdded], 'emit');
});

it('loads and uses the effect when there is no loading effect', async () => {
expect.hasAssertions();
it('should load and add an effect', async () => {
expect.hasAssertions();

const { effectTrack, effect } = createMockedTrackEffect();
const addEffectPromise = localStream.addEffect(effect);

const addEffectPromise = localStream.addEffect('test-effect', effect as unknown as BaseEffect);
await expect(addEffectPromise).resolves.toBeUndefined();
expect(loadSpy).toHaveBeenCalledWith(mockStream.getTracks()[0]);
expect(localStream.getEffects()).toStrictEqual([effect]);
expect(emitSpy).toHaveBeenCalledWith(effect);
});

await expect(addEffectPromise).resolves.toBeUndefined();
expect(localStream.outputStream.getTracks()[0]).toBe(effectTrack);
});
it('should load and add multiple effects with different IDs and kinds', async () => {
expect.hasAssertions();

const firstEffect = effect;
const secondEffect = {
...effect,
id: 'another-id',
kind: 'another-kind',
} as unknown as TrackEffect;
await localStream.addEffect(firstEffect);
await localStream.addEffect(secondEffect);

expect(loadSpy).toHaveBeenCalledTimes(2);
expect(localStream.getEffects()).toStrictEqual([firstEffect, secondEffect]);
expect(emitSpy).toHaveBeenCalledTimes(2);
});

it('does not use the effect when the loading effect is cleared during load', async () => {
expect.hasAssertions();
it('should not load an effect with the same ID twice', async () => {
expect.hasAssertions();

const { effect } = createMockedTrackEffect();
await localStream.addEffect(effect);
const secondAddEffectPromise = localStream.addEffect(effect);

// Add effect and immediately dispose all effects to clear loading effects
const addEffectPromise = localStream.addEffect('test-effect', effect as unknown as BaseEffect);
await localStream.disposeEffects();
await expect(secondAddEffectPromise).resolves.toBeUndefined(); // no-op
expect(loadSpy).toHaveBeenCalledTimes(1);
expect(localStream.getEffects()).toStrictEqual([effect]);
expect(emitSpy).toHaveBeenCalledTimes(1);
});

await expect(addEffectPromise).rejects.toThrow('not required after loading');
expect(localStream.outputStream).toBe(mockStream);
});
it('should throw an error if an effect of the same kind is added while loading', async () => {
expect.hasAssertions();

it('loads and uses the latest effect when the loading effect changes during load', async () => {
expect.hasAssertions();
const { effect: firstEffect } = createMockedTrackEffect();
const { effectTrack, effect: secondEffect } = createMockedTrackEffect();

const firstAddEffectPromise = localStream.addEffect(
'test-effect',
firstEffect as unknown as BaseEffect
);
const secondAddEffectPromise = localStream.addEffect(
'test-effect',
secondEffect as unknown as BaseEffect
);
await expect(firstAddEffectPromise).rejects.toThrow('not required after loading');
await expect(secondAddEffectPromise).resolves.toBeUndefined();

expect(localStream.outputStream.getTracks()[0]).toBe(effectTrack);
const firstEffect = effect;
const secondEffect = { ...effect, id: 'another-id' } as unknown as TrackEffect; // same kind
const firstAddEffectPromise = localStream.addEffect(firstEffect);
const secondAddEffectPromise = localStream.addEffect(secondEffect);

await expect(firstAddEffectPromise).rejects.toBeInstanceOf(WebrtcCoreError);
await expect(secondAddEffectPromise).resolves.toBeUndefined();
expect(loadSpy).toHaveBeenCalledTimes(2);
expect(localStream.getEffects()).toStrictEqual([secondEffect]);
expect(emitSpy).toHaveBeenCalledTimes(1);
});

it('should replace the effect if an effect of the same kind is added after loading', async () => {
expect.hasAssertions();

const firstEffect = effect;
const secondEffect = { ...effect, id: 'another-id' } as unknown as TrackEffect; // same kind
await localStream.addEffect(firstEffect);
const secondAddEffectPromise = localStream.addEffect(secondEffect);

await expect(secondAddEffectPromise).resolves.toBeUndefined();
expect(loadSpy).toHaveBeenCalledTimes(2);
expect(localStream.getEffects()).toStrictEqual([secondEffect]);
expect(emitSpy).toHaveBeenCalledTimes(2);
});

it('should throw an error if effects are cleared while loading', async () => {
expect.hasAssertions();

const addEffectPromise = localStream.addEffect(effect);
await localStream.disposeEffects();

await expect(addEffectPromise).rejects.toBeInstanceOf(WebrtcCoreError);
expect(loadSpy).toHaveBeenCalledTimes(1);
expect(localStream.getEffects()).toStrictEqual([]);
expect(emitSpy).toHaveBeenCalledTimes(0);
});
});
});
133 changes: 100 additions & 33 deletions src/media/local-stream.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import { AddEvents, TypedEvent, WithEventsDummyType } from '@webex/ts-events';
import { BaseEffect, EffectEvent } from '@webex/web-media-effects';
import { WebrtcCoreError, WebrtcCoreErrorType } from '../errors';
import { logger } from '../util/logger';
import { Stream, StreamEventNames } from './stream';

export type TrackEffect = BaseEffect;

export enum LocalStreamEventNames {
ConstraintsChange = 'constraints-change',
OutputTrackChange = 'output-track-change',
EffectAdded = 'effect-added',
}

interface LocalStreamEvents {
[LocalStreamEventNames.ConstraintsChange]: TypedEvent<() => void>;
[LocalStreamEventNames.OutputTrackChange]: TypedEvent<(track: MediaStreamTrack) => void>;
[LocalStreamEventNames.EffectAdded]: TypedEvent<(effect: TrackEffect) => void>;
}

export type TrackEffect = BaseEffect;

type EffectItem = { name: string; effect: TrackEffect };

/**
* A stream which originates on the local device.
*/
Expand All @@ -24,7 +26,9 @@ abstract class _LocalStream extends Stream {

[LocalStreamEventNames.OutputTrackChange] = new TypedEvent<(track: MediaStreamTrack) => void>();

private effects: EffectItem[] = [];
[LocalStreamEventNames.EffectAdded] = new TypedEvent<(effect: TrackEffect) => void>();

private effects: TrackEffect[] = [];

private loadingEffects: Map<string, TrackEffect> = new Map();

Expand Down Expand Up @@ -141,55 +145,118 @@ abstract class _LocalStream extends Stream {
/**
* Adds an effect to a local stream.
*
* @param name - The name of the effect.
* @param effect - The effect to add.
*/
async addEffect(name: string, effect: TrackEffect): Promise<void> {
// Load the effect
this.loadingEffects.set(name, effect);
const outputTrack = await effect.load(this.outputTrack);
async addEffect(effect: TrackEffect): Promise<void> {
// Check if the effect has already been added.
if (this.effects.some((e) => e.id === effect.id)) {
return;
}

// Load the effect. Because loading is asynchronous, keep track of the loading effects.
this.loadingEffects.set(effect.kind, effect);
await effect.load(this.outputTrack);

// Check that the loaded effect is the latest one and dispose if not
if (effect !== this.loadingEffects.get(name)) {
// After loading, check whether or not we still want to use this effect. If another effect of
// the same kind was added while this effect was loading, we only want to use the latest effect,
// so dispose this one. If the effects list was cleared while this effect was loading, also
// dispose it.
if (effect !== this.loadingEffects.get(effect.kind)) {
await effect.dispose();
throw new Error(`Effect "${name}" not required after loading`);
throw new WebrtcCoreError(
WebrtcCoreErrorType.ADD_EFFECT_FAILED,
`Another effect with kind ${effect.kind} was added while effect ${effect.id} was loading, or the effects list was cleared.`
);
}
this.loadingEffects.delete(effect.kind);

// Use the effect
this.loadingEffects.delete(name);
this.effects.push({ name, effect });
this.changeOutputTrack(outputTrack);

// When the effect's track is updated, update the next effect or output stream.
// TODO: using EffectEvent.TrackUpdated will cause the entire web-media-effects lib to be built
// and makes the size of the webrtc-core build much larger, so we use type assertion here as a
// temporary workaround.
effect.on('track-updated' as EffectEvent, (track: MediaStreamTrack) => {
const effectIndex = this.effects.findIndex((e) => e.name === name);
/**
* Handle when the effect's output track has been changed. This will update the input of the
* next effect in the effects list of the output of the stream.
*
* @param track - The new output track of the effect.
*/
const handleEffectTrackUpdated = (track: MediaStreamTrack) => {
const effectIndex = this.effects.findIndex((e) => e.id === effect.id);
if (effectIndex === this.effects.length - 1) {
this.changeOutputTrack(track);
} else if (effectIndex >= 0) {
this.effects[effectIndex + 1]?.replaceInputTrack(track);
} else {
this.effects[effectIndex + 1]?.effect.replaceInputTrack(track);
logger.error(`Effect with ID ${effect.id} not found in effects list.`);
}
};

/**
* Handle when the effect has been disposed. This will remove all event listeners from the
* effect.
*/
const handleEffectDisposed = () => {
effect.off('track-updated' as EffectEvent, handleEffectTrackUpdated);
effect.off('disposed' as EffectEvent, handleEffectDisposed);
};

// TODO: using EffectEvent.TrackUpdated or EffectEvent.Disposed will cause the entire
// web-media-effects lib to be rebuilt and inflates the size of the webrtc-core build, so
// we use type assertion here as a temporary workaround.
effect.on('track-updated' as EffectEvent, handleEffectTrackUpdated);
effect.on('disposed' as EffectEvent, handleEffectDisposed);

// Add the effect to the effects list. If an effect of the same kind has already been added,
// dispose the existing effect and replace it with the new effect. If the existing effect was
// enabled, also enable the new effect.
const existingEffectIndex = this.effects.findIndex((e) => e.kind === effect.kind);
if (existingEffectIndex >= 0) {
const [existingEffect] = this.effects.splice(existingEffectIndex, 1, effect);
if (existingEffect.isEnabled) {
// If the existing effect is not the first effect in the effects list, then the input of the
// new effect should be the output of the previous effect in the effects list. We know the
// output track of the previous effect must exist because it must have been loaded (and all
// loaded effects have an output track).
const inputTrack =
existingEffectIndex === 0
? this.inputTrack
: (this.effects[existingEffectIndex - 1].getOutputTrack() as MediaStreamTrack);
await effect.replaceInputTrack(inputTrack);
// Enabling the new effect will trigger the track-updated event, which will handle the new
// effect's updated output track.
await effect.enable();
}
});
await existingEffect.dispose();
} else {
this.effects.push(effect);
}

// Emit an event with the effect so others can listen to the effect events.
this[LocalStreamEventNames.EffectAdded].emit(effect);
}

/**
* Get an effect from the effects list by ID.
*
* @param id - The id of the effect you want to get.
* @returns The effect or undefined.
*/
getEffectById(id: string): TrackEffect | undefined {
return this.effects.find((effect) => effect.id === id);
}

/**
* Get an effect from the effects list.
* Get an effect from the effects list by kind.
*
* @param name - The name of the effect you want to get.
* @param kind - The kind of the effect you want to get.
* @returns The effect or undefined.
*/
getEffect(name: string): TrackEffect | undefined {
return this.effects.find((e) => e.name === name)?.effect;
getEffectByKind(kind: string): TrackEffect | undefined {
return this.effects.find((effect) => effect.kind === kind);
}

/**
* Get all the effects from the effects list.
*
* @returns A list of effect items, each containing the name and the effect itself.
* @returns A list of effects.
*/
getAllEffects(): EffectItem[] {
getEffects(): TrackEffect[] {
return this.effects;
}

Expand All @@ -202,7 +269,7 @@ abstract class _LocalStream extends Stream {
// Dispose of any effects currently in use
if (this.effects.length > 0) {
this.changeOutputTrack(this.inputTrack);
await Promise.all(this.effects.map((item: EffectItem) => item.effect.dispose()));
await Promise.all(this.effects.map((effect) => effect.dispose()));
this.effects = [];
}
}
Expand Down
Loading

0 comments on commit 64ac8b6

Please sign in to comment.