diff --git a/README.md b/README.md index 315ebb5..fef0722 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,7 @@ Executes a lambda given the `options` object, which is a dictionary where the ke | `envdestroy`|optional, destroy added environment on closing, default to false| | `verboseLevel`|optional, default 3. Level 2 dismiss handler() text, level 1 dismiss lambda-local text and level 0 dismiss also the result.| | `callback`|optional, lambda third parameter [callback][1]. When left out a Promise is returned| +| `onInvocationEnd`|optional. called once the invocation ended. useful when awslambda.streamifyResponse is used to distinguish between end of response stream and end of invocation. | | `clientContext`|optional, used to populated clientContext property of lambda second parameter (context) #### `lambdaLocal.setLogger(logger)` diff --git a/src/lambdalocal.ts b/src/lambdalocal.ts index 749bcb0..109ed67 100644 --- a/src/lambdalocal.ts +++ b/src/lambdalocal.ts @@ -187,6 +187,7 @@ function _executeSync(opts) { timeoutMs = opts.timeoutMs || 3000, verboseLevel = opts.verboseLevel, callback = opts.callback, + onInvocationEnd = opts.onInvocationEnd, clientContext = null; if (opts.clientContext) { @@ -295,7 +296,8 @@ function _executeSync(opts) { }); } }, - clientContext: clientContext + clientContext: clientContext, + onInvocationEnd: onInvocationEnd, }); if(callback) context.callback = callback; diff --git a/src/lib/context.ts b/src/lib/context.ts index 3f8b7e6..fa684ee 100644 --- a/src/lib/context.ts +++ b/src/lib/context.ts @@ -8,6 +8,7 @@ import utils = require('./utils.js'); import mute = require('./mute.js'); +import { StreamingBody } from './streaming.js'; function Context() { this.logger = null; @@ -39,6 +40,7 @@ function Context() { this.logStreamName = 'Stream name'; this.identity = null; this.clientContext = null; + this.onInvocationEnd = null; /* * callback function called after done @@ -112,6 +114,7 @@ Context.prototype._initialize = function(options) { this.unmute = mute(); } this.clientContext = options.clientContext; + this.onInvocationEnd = options.onInvocationEnd; return; }; @@ -149,7 +152,12 @@ Context.prototype.generate_context = function(){ logStreamName: this.logStreamName, identity: this.identity, clientContext: this.clientContext, - _stopped: false + _stopped: false, + + // INTERNAL + __lambdaLocal: { + onInvocationEnd: this.onInvocationEnd + }, }; return ctx; } @@ -207,6 +215,12 @@ Context.prototype.done = function(err, message) { } } this.finalCallback(); //Destroy env... + + const isStream = typeof message === "object" && message?.body instanceof StreamingBody + if (!isStream) { + this.onInvocationEnd?.(); + } + /* The finalCallback method will be instantly called if 'this.callbackWaitsForEmptyEventLoop' is False Otherwise, lambda-local will wait for an empty loop then call it. diff --git a/src/lib/streaming.ts b/src/lib/streaming.ts index a7c5eae..f301b05 100644 --- a/src/lib/streaming.ts +++ b/src/lib/streaming.ts @@ -19,13 +19,15 @@ function streamifyResponse(handler) { if (!body.headersSent) { body.sendHeader(metadata) } + context.__lambdaLocal.onInvocationEnd?.(); } catch (error) { reject(error); + context.__lambdaLocal.onInvocationEnd?.(error); } }); } -class StreamingBody extends PassThrough { +export class StreamingBody extends PassThrough { constructor(private readonly resolve: (metadata) => void) { super(); } diff --git a/test/functs/test-func-streaming.js b/test/functs/test-func-streaming.js index 28e2eae..1328b59 100644 --- a/test/functs/test-func-streaming.js +++ b/test/functs/test-func-streaming.js @@ -18,6 +18,8 @@ exports.handler = awslambda.streamifyResponse( responseStream.write("bar"); responseStream.end(); }, 100); + + await new Promise(resolve => setTimeout(resolve, 200)); } ); diff --git a/test/test.js b/test/test.js index c4103d0..ca20250 100644 --- a/test/test.js +++ b/test/test.js @@ -364,6 +364,16 @@ describe("- Testing lambdalocal.js", function () { assert.equal(data.result, "testvar"); }); }); + it('should call onInvocationEnd', function () { + var lambdalocal = require(lambdalocal_path); + lambdalocal.setLogger(winston); + let invocationEnded = 0 + opts.onInvocationEnd = () => invocationEnded++ + return lambdalocal.execute(opts).then(function (data) { + assert.equal(data.result, "testvar"); + assert.equal(invocationEnded, 1) + }); + }); it('should be stateless', function () { var lambdalocal = require(lambdalocal_path); lambdalocal.setLogger(winston); @@ -425,13 +435,17 @@ describe("- Testing lambdalocal.js", function () { it('should return a readable stream as `body`', function () { var lambdalocal = require(lambdalocal_path); lambdalocal.setLogger(winston); + let invocationEnded = 0 return lambdalocal.execute({ event: require(path.join(__dirname, "./events/test-event.js")), lambdaPath: path.join(__dirname, "./functs/test-func-streaming.js"), lambdaHandler: functionName, callbackWaitsForEmptyEventLoop: false, timeoutMs: timeoutMs, - verboseLevel: 1 + verboseLevel: 1, + onInvocationEnd() { + invocationEnded++ + } }).then(function (data) { assert.deepEqual( data.headers, @@ -448,7 +462,13 @@ describe("- Testing lambdalocal.js", function () { data.body.on("end", () => { assert.deepEqual(chunks, ["foo", "bar"]) assert.closeTo(times[1] - times[0], 100, 50) - resolve() + + assert.equal(invocationEnded, 0) + + setTimeout(() => { + assert.equal(invocationEnded, 1) + resolve() + }, 200) }); }) })