From ab632bdc6f1ce3b42c22124e49cd3669e1594e14 Mon Sep 17 00:00:00 2001 From: AaronJan Date: Tue, 27 Sep 2022 10:25:48 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20=E7=BB=99receiver=E6=96=B0=E5=A2=9E?= =?UTF-8?q?cleanup=E5=9B=9E=E8=B0=83=E7=94=A8=E4=BA=8E=E6=B8=85=E7=90=86?= =?UTF-8?q?=E5=AF=B9=E8=B1=A1=E5=AD=98=E5=82=A8=E4=B8=AD=E7=9A=84=E4=B8=B4?= =?UTF-8?q?=E6=97=B6=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/bufferSupport/receiver.ts | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/src/bufferSupport/receiver.ts b/src/bufferSupport/receiver.ts index 108a95c..6251a04 100644 --- a/src/bufferSupport/receiver.ts +++ b/src/bufferSupport/receiver.ts @@ -28,6 +28,19 @@ export interface IReceiveParsedPayload { headers?: any; isBuffer?: boolean; } + +export interface ILogger { + debug(...data: any[]): void + info(...data: any[]): void + warn(...data: any[]): void + error(...data: any[]): void + log(...data: any[]): void +} + +export interface IReceiverContext { + logger?: ILogger +} + export interface IReplyPayload { storeType: string; isBuffer?: boolean; @@ -41,8 +54,9 @@ export function initReceiver( ossThreshold: number = 0 ): { receive: ( - event: Buffer | string | IReceiveParsedPayload - ) => Promise<{ headers?: any; body: any; storeType?: string }>; + event: Buffer | string | IReceiveParsedPayload, + context?: IReceiverContext + ) => Promise<{ headers?: any; body: any; storeType?: string, cleanup: () => Promise }>; reply: replyFunc; } { const fcConfig = loadConfigWithEnvs(ossType); @@ -50,8 +64,10 @@ export function initReceiver( const storageClient = getClientByType(ossType, storageOptions); const receive = async ( - event: Buffer | string | IReceiveParsedPayload - ): Promise<{ headers?: any; body: string | Buffer; storeType?: string }> => { + event: Buffer | string | IReceiveParsedPayload, + context: IReceiverContext = {} + ): Promise<{ headers?: any; body: string | Buffer; storeType?: string, cleanup: () => Promise }> => { + const logger = context.logger ?? console let storeType: string; let ossKey: string | undefined; let body: any; @@ -101,16 +117,16 @@ export function initReceiver( } const content: Buffer = resp.content; - storageClient.del(ossKey as string).catch(console.error); + const cleanup = () => storageClient.del(ossKey as string).catch(logger.error) return omitBy( - { headers, body: isBuffer ? content : content.toString(), storeType }, + { headers, body: isBuffer ? content : content.toString(), storeType, cleanup }, (v: any) => v === undefined ); } return omitBy( - { headers, body: isBuffer ? Buffer.from(body, 'base64') : body }, + { headers, body: isBuffer ? Buffer.from(body, 'base64') : body, cleanup: () => Promise.resolve(void 0) }, (v: any) => v === undefined ); }; From 3a1950e977908ba514cf8c3126fd2d72b1271bd8 Mon Sep 17 00:00:00 2001 From: AaronJan Date: Tue, 27 Sep 2022 14:50:44 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat:=20=E4=BF=9D=E7=95=99=E5=AF=B9?= =?UTF-8?q?=E7=8E=B0=E6=9C=89API=E7=9A=84=E5=85=BC=E5=AE=B9=EF=BC=8C?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0receiveManually=E6=96=B9=E6=B3=95=EF=BC=8C?= =?UTF-8?q?=E5=B9=B6=E6=9B=B4=E6=96=B0=E4=BD=BF=E7=94=A8=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 22 ++++++++++++++++++++++ __tests__/receiver-spec.ts | 5 ++++- src/bufferSupport/receiver.ts | 25 ++++++++++++++++--------- src/receiver.ts | 4 +++- 4 files changed, 45 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 63318ff..27e21e7 100644 --- a/README.md +++ b/README.md @@ -136,6 +136,7 @@ const { receive, reply } = require('fc-toolkit').initReveiver( async function handler (event, context, callback) { try { + // `receive` 在从 OSS 收到 payload 后会自动将其删除 const body = await receive(event) // handle the body here.. const returnValue = await doSomethingYouNeed(body) @@ -146,6 +147,27 @@ async function handler (event, context, callback) { } ``` +或者也可以使用 `receiveManually` 来获取包含 header 在内的更多响应内容,以及手动控制 OSS 临时对象的清理: + +```js +const { receiveManually, reply } = require('fc-toolkit').initReveiver( + false, + 'aws' +) + +async function handler (event, context, callback) { + try { + const resp = await receiveManually(event) + const returnValue = await doSomethingYouNeed(resp.body) + await reply(callback)(returnValue) + // 手动调用清理回调 + await resp.cleanup() + } catch (e) { + callback(e) + } +} +``` + ## 使用Buffer ### initInvoker diff --git a/__tests__/receiver-spec.ts b/__tests__/receiver-spec.ts index cf5c574..ce1d6b8 100644 --- a/__tests__/receiver-spec.ts +++ b/__tests__/receiver-spec.ts @@ -69,6 +69,7 @@ describe('receiver test cases', () => { storeType: 'direct', isBuffer: true, }); + await resp.cleanup(); expect(Buffer.isBuffer(resp.body)).toBeTruthy(); expect(resp.body.toString()).toBe('test message'); }); @@ -97,6 +98,7 @@ describe('receiver test cases', () => { storeType: 'oss', ossKey: testKey, }); + await resp.cleanup(); expect(resp.body).toBe(testContent); }); @@ -109,6 +111,7 @@ describe('receiver test cases', () => { ossKey: testKey, isBuffer: true, }); + await resp.cleanup(); expect(Buffer.isBuffer(resp.body)).toBeTruthy(); expect(resp.body.toString()).toBe(testContent); }); @@ -163,7 +166,7 @@ describe('receiver test cases', () => { storeType: 'oss', ossKey: testKey, }); - expect(resp).toEqual(testBody); + expect(resp).toEqual(JSON.stringify(testBody)); }); }); }); diff --git a/src/bufferSupport/receiver.ts b/src/bufferSupport/receiver.ts index 6251a04..08bf78d 100644 --- a/src/bufferSupport/receiver.ts +++ b/src/bufferSupport/receiver.ts @@ -30,15 +30,15 @@ export interface IReceiveParsedPayload { } export interface ILogger { - debug(...data: any[]): void - info(...data: any[]): void - warn(...data: any[]): void - error(...data: any[]): void - log(...data: any[]): void + debug(...data: any[]): void; + info(...data: any[]): void; + warn(...data: any[]): void; + error(...data: any[]): void; + log(...data: any[]): void; } export interface IReceiverContext { - logger?: ILogger + logger?: ILogger; } export interface IReplyPayload { @@ -48,6 +48,13 @@ export interface IReplyPayload { meta?: any; } +export interface IReceiveResponse { + headers?: any; + body: any; + storeType?: string; + cleanup: () => Promise; +} + export function initReceiver( noOSS: boolean = false, ossType: StorageEngine = StorageEngine.ALIYUN_OSS, @@ -56,7 +63,7 @@ export function initReceiver( receive: ( event: Buffer | string | IReceiveParsedPayload, context?: IReceiverContext - ) => Promise<{ headers?: any; body: any; storeType?: string, cleanup: () => Promise }>; + ) => Promise; reply: replyFunc; } { const fcConfig = loadConfigWithEnvs(ossType); @@ -66,7 +73,7 @@ export function initReceiver( const receive = async ( event: Buffer | string | IReceiveParsedPayload, context: IReceiverContext = {} - ): Promise<{ headers?: any; body: string | Buffer; storeType?: string, cleanup: () => Promise }> => { + ): Promise => { const logger = context.logger ?? console let storeType: string; let ossKey: string | undefined; @@ -117,7 +124,7 @@ export function initReceiver( } const content: Buffer = resp.content; - const cleanup = () => storageClient.del(ossKey as string).catch(logger.error) + const cleanup = () => storageClient.del(ossKey as string).catch(logger.error); return omitBy( { headers, body: isBuffer ? content : content.toString(), storeType, cleanup }, diff --git a/src/receiver.ts b/src/receiver.ts index e5ca453..b85421f 100644 --- a/src/receiver.ts +++ b/src/receiver.ts @@ -10,13 +10,15 @@ export function initReceiver( ossType: StorageEngine = StorageEngine.ALIYUN_OSS ): { receive: (event: string | IReceiveParsedPayload) => Promise; + receiveManually: (event: string | IReceiveParsedPayload) => Promise; reply: bufferSupport.replyFunc; } { const { receive, reply } = bufferSupport.initReceiver(noOSS, ossType, 0); return { receive: (event: string | IReceiveParsedPayload) => - receive(event).then(res => res.body), + receive(event).then(res => res.cleanup().then(() => res.body)), + receiveManually: receive, reply, }; }