Skip to content

Commit

Permalink
fix promises returned from push not resolving when buffer is not full
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Kim authored and brainkim committed May 8, 2024
1 parent 6e64566 commit 9248257
Show file tree
Hide file tree
Showing 3 changed files with 603 additions and 582 deletions.
37 changes: 28 additions & 9 deletions packages/repeater/src/__tests__/repeater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -789,33 +789,52 @@ describe("Repeater", () => {
push = push1;
}, new FixedBuffer(3));
const next1 = r.next(-1);
const push1 = push(1);
await expect(push(1)).resolves.toBeUndefined();
const push2 = push(2);
const push3 = push(3);
const push4 = push(4);
const push5 = push(5);
await expect(next1).resolves.toEqual({ value: 1, done: false });
await expect(push2).resolves.toBeUndefined();
await expect(push3).resolves.toBeUndefined();
await expect(push4).resolves.toBeUndefined();
await expect(
Promise.race([push5, delayPromise(1, -1000)]),
).resolves.toEqual(-1000);
await expect(r.next(-2)).resolves.toEqual({ value: 2, done: false });
await expect(push1).resolves.toEqual(-2);
await expect(r.next(-3)).resolves.toEqual({ value: 3, done: false });
await expect(r.next(-4)).resolves.toEqual({ value: 4, done: false });
await expect(r.next(-5)).resolves.toEqual({ value: 5, done: false });
await expect(push2).resolves.toBeUndefined();
await expect(push3).resolves.toBeUndefined();
await expect(push4).resolves.toBeUndefined();
await expect(push5).resolves.toBe(-3);
const push6 = push(6);
const push7 = push(7);
const push8 = push(8);
const push9 = push(9);
await expect(r.next(-6)).resolves.toEqual({ value: 6, done: false });
await expect(r.next(-7)).resolves.toEqual({ value: 7, done: false });
await expect(r.next(-8)).resolves.toEqual({ value: 8, done: false });
await expect(r.next(-9)).resolves.toEqual({ value: 9, done: false });
await expect(push6).resolves.toBeUndefined();
await expect(push7).resolves.toBeUndefined();
await expect(push8).resolves.toBeUndefined();
await expect(
Promise.race([push9, delayPromise(1, -1000)]),
).resolves.toEqual(-1000);
await expect(r.next(-6)).resolves.toEqual({ value: 6, done: false });
await expect(r.next(-7)).resolves.toEqual({ value: 7, done: false });
await expect(r.next(-8)).resolves.toEqual({ value: 8, done: false });
await expect(push9).resolves.toBe(-7);
const next9 = r.next(-8);
const next10 = r.next(-9);
const next11 = r.next(-10);
const next12 = r.next(-11);

const push10 = push(10);
const push11 = push(11);
const push12 = push(12);
await expect(next9).resolves.toEqual({ value: 9, done: false });
await expect(next10).resolves.toEqual({ value: 10, done: false });
await expect(next11).resolves.toEqual({ value: 11, done: false });
await expect(next12).resolves.toEqual({ value: 12, done: false });
await expect(push10).resolves.toBe(-10);
await expect(push11).resolves.toBe(-11);
await expect(push12).resolves.toBeUndefined();
});

test("push throws when push queue is full", async () => {
Expand Down
2 changes: 2 additions & 0 deletions packages/repeater/src/repeater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ function push<T, TReturn, TNext>(
next.resolve(createIteration<T, TReturn, TNext>(r, valueP));
if (r.nexts.length) {
nextP = Promise.resolve(r.nexts[0].value);
} else if (typeof r.buffer !== "undefined" && !r.buffer.full) {
nextP = Promise.resolve(undefined);
} else {
nextP = new Promise((resolve) => (r.onnext = resolve));
}
Expand Down
Loading

0 comments on commit 9248257

Please sign in to comment.