Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(async-flow): startGeneration: a new synthetic LogEntry #10293

Merged
merged 1 commit into from
Oct 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/async-flow/src/async-flow.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { prepareEndowmentTools } from './endowments.js';
import { LogEntryShape, FlowStateShape } from './type-guards.js';

/**
* @import {WeakMapStore} from '@agoric/store'
* @import {WeakMapStore, MapStore} from '@agoric/store'
* @import {Zone} from '@agoric/base-zone'
* @import {FlowState, GuestAsyncFunc, HostAsyncFuncWrapper, HostOf, PreparationOptions} from '../src/types.js'
* @import {ReplayMembrane} from '../src/replay-membrane.js'
Expand Down Expand Up @@ -70,7 +70,7 @@ export const prepareAsyncFlowTools = (outerZone, outerOptions = {}) => {
keyShape: M.remotable('flow'), // flowState !== 'Done'
});

/** @type WeakMapStore<AsyncFlow, ReplayMembrane> */
/** @type {WeakMapStore<AsyncFlow, ReplayMembrane>} */
const membraneMap = makeScalarWeakMapStore('membraneFor', {
keyShape: M.remotable('flow'),
valueShape: M.remotable('membrane'),
Expand Down
143 changes: 121 additions & 22 deletions packages/async-flow/src/log-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,76 @@ import { makeEphemera } from './ephemera.js';
const LogStoreI = M.interface('LogStore', {
reset: M.call().returns(),
dispose: M.call().returns(),
getUnfilteredIndex: M.call().returns(M.number()),
getIndex: M.call().returns(M.number()),
getLength: M.call().returns(M.number()),
isReplaying: M.call().returns(M.boolean()),
peekEntry: M.call().returns(LogEntryShape),
nextEntry: M.call().returns(LogEntryShape),
nextUnfilteredEntry: M.call().returns(LogEntryShape),
pushEntry: M.call(LogEntryShape).returns(M.number()),
dumpUnfiltered: M.call().returns(M.arrayOf(LogEntryShape)),
dump: M.call().returns(M.arrayOf(LogEntryShape)),
promiseReplayDone: M.call().returns(M.promise()),
});

/**
* Get a generation number to use in `startGeneration` log entries.
*
* @param {Zone} zone
* @returns {number}
*/
export const nextGeneration = zone => {
/** @type {MapStore<'generation', number>} */
const logStoreMetadata = zone.mapStore('LogStoreMetadata');
const generationKey = 'generation';

if (!logStoreMetadata.has(generationKey)) {
const firstGen = 0;
logStoreMetadata.init(generationKey, firstGen);
return firstGen;
}

const nextGen = logStoreMetadata.get(generationKey) + 1;
logStoreMetadata.set(generationKey, nextGen);
return nextGen;
};

/**
* A growable, replayable, sequence of `LogEntry`s.
*
* @param {Zone} zone
*/
export const prepareLogStore = zone => {
/**
* Ensure that any new or reset LogStore instance that (for a given
* incarnation) pushes at least one entry will insert these entries first,
* even if the log is reset, replayed, and pushed to in the same incarnation.
* @type {LogEntry[]}
*/
const initialPush = harden([['startGeneration', nextGeneration(zone)]]);

/**
* A predicate to indicate whether the entry is normally visible to the
* LogStore user, or is a more internal entry that only is visible via the
* Unfiltered methods.
* @param {LogEntry} entry
*/
const entryIsVisible = entry => entry[0] !== 'startGeneration';

/**
* @type {Ephemera<LogStore, {
* index: number
* replayDoneKit: PromiseKit<undefined>
* index: number;
* unfilteredIndex: number;
* initialPush: LogEntry[] | undefined;
* replayDoneKit: PromiseKit<undefined>;
* }>}
*/
const tmp = makeEphemera(log => {
const result = {
index: 0,
unfilteredIndex: 0,
initialPush,
replayDoneKit: makePromiseKit(),
};
if (log.getLength() === 0) {
Expand Down Expand Up @@ -82,6 +127,12 @@ export const prepareLogStore = zone => {
tmp.resetFor(self);
mapStore.clear();
},
getUnfilteredIndex() {
const { self } = this;
const eph = tmp.for(self);

return eph.unfilteredIndex;
},
getIndex() {
const { self } = this;
const eph = tmp.for(self);
Expand All @@ -99,52 +150,96 @@ export const prepareLogStore = zone => {
const { mapStore } = state;
const eph = tmp.for(self);

return eph.index < mapStore.getSize();
return eph.unfilteredIndex < mapStore.getSize();
},
/**
* @returns {LogEntry}
*/
peekEntry() {
erights marked this conversation as resolved.
Show resolved Hide resolved
const { state, self } = this;
const { mapStore } = state;
const eph = tmp.for(self);

self.isReplaying() ||
Fail`No longer replaying: ${q(eph.index)} vs ${q(
Fail`No longer replaying: ${q(eph.unfilteredIndex)} vs ${q(
mapStore.getSize(),
)}`;
const result = mapStore.get(eph.index);
const result = mapStore.get(eph.unfilteredIndex);
return result;
},
nextEntry() {
/**
* @returns {LogEntry}
*/
nextUnfilteredEntry() {
const { self } = this;
const eph = tmp.for(self);

const result = self.peekEntry();
eph.index += 1;
eph.unfilteredIndex += 1;
if (entryIsVisible(result)) {
eph.index += 1;
}
if (!self.isReplaying()) {
eph.replayDoneKit.resolve(undefined);
}
return result;
},
pushEntry(entry) {
/**
* @returns {LogEntry}
*/
nextEntry() {
const { self } = this;
let result = self.nextUnfilteredEntry();
while (!entryIsVisible(result)) {
self.isReplaying() || Fail`Unexpected entry at log tail: ${result}`;
result = self.nextUnfilteredEntry();
}
return result;
},
/**
* @param {LogEntry} latestEntry
*/
pushEntry(latestEntry) {
const { state, self } = this;

const { mapStore } = state;
const eph = tmp.for(self);

!self.isReplaying() ||
Fail`still replaying: ${q(eph.index)} vs ${q(mapStore.getSize())}`;
eph.index === mapStore.getSize() ||
Fail`internal: index confusion ${q(eph.index)} vs ${q(
mapStore.getSize(),
)}`;
mapStore.init(eph.index, entry);
eph.index += 1;
eph.index === mapStore.getSize() ||
Fail`internal: index confusion ${q(eph.index)} vs ${q(
mapStore.getSize(),
)}`;
// console.log('LOG ENTRY ', eph.index - 1, entry);
return eph.index;
Fail`still replaying: ${q(eph.unfilteredIndex)} vs ${q(mapStore.getSize())}`;

const pushOne = entry => {
eph.unfilteredIndex === mapStore.getSize() ||
Fail`internal: unfilteredIndex confusion ${q(eph.unfilteredIndex)} vs ${q(
mapStore.getSize(),
)}`;
mapStore.init(eph.unfilteredIndex, entry);
eph.unfilteredIndex += 1;
if (entryIsVisible(entry)) {
eph.index += 1;
}
eph.unfilteredIndex === mapStore.getSize() ||
Fail`internal: unfilteredIndex confusion ${q(eph.unfilteredIndex)} vs ${q(
mapStore.getSize(),
)}`;
};

if (eph.initialPush) {
const initialEntries = eph.initialPush;
eph.initialPush = undefined;
for (const initialEntry of initialEntries) {
pushOne(initialEntry);
}
}
pushOne(latestEntry);

// console.log('LOG ENTRY ', eph.unfilteredIndex - 1, entry);
return eph.unfilteredIndex;
},
dump() {
/**
* @returns {LogEntry[]}
*/
dumpUnfiltered() {
const { state } = this;
const { mapStore } = state;
const len = mapStore.getSize();
Expand All @@ -154,6 +249,10 @@ export const prepareLogStore = zone => {
}
return harden(result);
},
dump() {
const { self } = this;
return harden(self.dumpUnfiltered().filter(entryIsVisible));
},
promiseReplayDone() {
const { self } = this;
const eph = tmp.for(self);
Expand Down
19 changes: 14 additions & 5 deletions packages/async-flow/src/replay-membrane.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ export const makeReplayMembraneForTesting = ({

const Panic = (template, ...args) => panic(makeError(X(template, ...args)));

const startGeneration = generation => {
Number.isSafeInteger(generation) ||
Fail`generation expected integer; got ${generation}`;
generation >= 0 ||
Fail`generation expected non-negative; got ${generation}`;
};

// ////////////// Host or Interpreter to Guest ///////////////////////////////

/**
Expand Down Expand Up @@ -278,6 +285,7 @@ export const makeReplayMembraneForTesting = ({
throw Panic`internal: eventual send synchronously failed ${hostProblem}`;
}
try {
/** @type {LogEntry} */
const entry = harden(['doReturn', callIndex, vow]);
log.pushEntry(entry);
const guestPromise = makeGuestForHostVow(vow, guestReturnedP);
Expand Down Expand Up @@ -602,21 +610,22 @@ export const makeReplayMembraneForTesting = ({
// /////////////////////////////// Interpreter ///////////////////////////////

/**
* These are the only ones that are driven from the interpreter loop
* These are the only ones that are driven from the top level interpreter loop
*/
const topDispatch = harden({
startGeneration,
doFulfill,
doReject,
// doCall, // unimplemented in the current plan
});

/**
* These are the only ones that are driven from the interpreter loop
* These are the only ones that are driven from the nested interpreter loop
*/
const nestDispatch = harden({
// doCall, // unimplemented in the current plan
doReturn,
doThrow,
// doCall, // unimplemented in the current plan
});

const interpretOne = (dispatch, [op, ...args]) => {
Expand Down Expand Up @@ -646,7 +655,7 @@ export const makeReplayMembraneForTesting = ({
const nestInterpreter = callIndex => {
callStack.push(callIndex);
while (log.isReplaying() && !stopped) {
const entry = log.nextEntry();
const entry = log.nextUnfilteredEntry();
const optOutcome = interpretOne(nestDispatch, entry);
if (unnestFlag) {
optOutcome ||
Expand Down Expand Up @@ -687,7 +696,7 @@ export const makeReplayMembraneForTesting = ({
if (!(op in topDispatch)) {
return;
}
void log.nextEntry();
void log.nextUnfilteredEntry();
interpretOne(topDispatch, entry);
}
};
Expand Down
1 change: 1 addition & 0 deletions packages/async-flow/src/type-guards.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const FlowStateShape = M.or(
export const PropertyKeyShape = M.or(M.string(), M.symbol());

export const LogEntryShape = M.or(
['startGeneration', M.number()],
// ////////////////////////////// From Host to Guest /////////////////////////
['doFulfill', VowShape, M.any()],
['doReject', VowShape, M.any()],
Expand Down
28 changes: 9 additions & 19 deletions packages/async-flow/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,14 @@ export type Ephemera<S extends WeakKey = WeakKey, V extends unknown = any> = {
* yet support.
*/
export type LogEntry =
| [
// ///////////////// From Host to Guest /////////////////////////
op: 'doFulfill',
vow: HostVow,
fulfillment: Host,
]
| [op: 'startGeneration', generation: number]
// ///////////////// From Host to Guest /////////////////////////
| [op: 'doFulfill', vow: HostVow, fulfillment: Host]
| [op: 'doReject', vow: HostVow, reason: Host]
| [op: 'doReturn', callIndex: number, result: Host]
| [op: 'doThrow', callIndex: number, problem: Host]
// ///////////////////// From Guest to Host /////////////////////////
| [
// ///////////////////// From Guest to Host /////////////////////////
op: 'checkCall',
target: Host,
optVerb: PropertyKey | undefined,
Expand Down Expand Up @@ -161,12 +158,9 @@ export type LogEntry =
* vows and remotables.
*/
export type FutureLogEntry =
| [
// ///////////////// From Host to Guest ///////////////////////
op: 'doFulfill',
vow: HostVow,
fulfillment: Host,
]
| [op: 'startGeneration', generation: number]
// ///////////////// From Host to Guest ///////////////////////
| [op: 'doFulfill', vow: HostVow, fulfillment: Host]
| [op: 'doReject', vow: HostVow, reason: Host]
| [
op: 'doCall',
Expand All @@ -191,12 +185,8 @@ export type FutureLogEntry =
]
| [op: 'doReturn', callIndex: number, result: Host]
| [op: 'doThrow', callIndex: number, problem: Host]
| [
// ///////////////////// From Guest to Host /////////////////////////
op: 'checkFulfill',
vow: HostVow,
fulfillment: Host,
]
// ///////////////////// From Guest to Host /////////////////////////
| [op: 'checkFulfill', vow: HostVow, fulfillment: Host]
| [op: 'checkReject', vow: HostVow, reason: Host]
| [
op: 'checkCall',
Expand Down
Loading
Loading