Skip to content

Commit

Permalink
Merge branch 'main' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Jun 2, 2021
2 parents 35eeb77 + f78e588 commit a586086
Show file tree
Hide file tree
Showing 33 changed files with 1,191 additions and 1,194 deletions.
10 changes: 10 additions & 0 deletions .eslintrc
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
{
"extends": "lifion",
"rules": {
"jest/no-conditional-expect": "off",
"radar/no-identical-functions": "off",
"unicorn/no-array-callback-reference": "off",
"unicorn/no-array-for-each": "off"
},
"overrides": [
{
"files": ["lib/records.js"],
"rules": { "promise/catch-or-return": "off" }
},
{
"files": ["lib/records.js", "lib/fan-out-consumer.js"],
"rules": {
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

Generated by [`auto-changelog`](https://github.com/CookPete/auto-changelog).

### v1.3.1 (2021-06-02)

- Correct the call to got.stream (fixes #430) [`#430`](https://github.com/lifion/lifion-kinesis/issues/430)

### v1.3.0 (2021-05-03)

- [`#433`](https://github.com/lifion/lifion-kinesis/pull/433): Bump chalk from 4.1.0 to 4.1.1
Expand Down
34 changes: 17 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ pipeline([
* [.putRecord(params)](#module_lifion-kinesis--Kinesis+putRecord) ⇒ <code>Promise</code>
* [.listShards(params)](#module_lifion-kinesis--Kinesis+listShards) ⇒ <code>Promise</code>
* [.putRecords(params)](#module_lifion-kinesis--Kinesis+putRecords) ⇒ <code>Promise</code>
* [.getStats()](#module_lifion-kinesis--Kinesis+getStats) ⇒ <code>object</code>
* [.getStats()](#module_lifion-kinesis--Kinesis+getStats) ⇒ <code>Object</code>
* _static_
* [.getStats()](#module_lifion-kinesis--Kinesis.getStats) ⇒ <code>object</code>
* [.getStats()](#module_lifion-kinesis--Kinesis.getStats) ⇒ <code>Object</code>

<a name="exp_module_lifion-kinesis--Kinesis"></a>

Expand All @@ -87,24 +87,24 @@ Initializes a new instance of the Kinesis client.

| Param | Type | Default | Description |
| --- | --- | --- | --- |
| options | <code>object</code> | | The initialization options. In addition to the below options, it can also contain any of the [`AWS.Kinesis` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#constructor-property). |
| options | <code>Object</code> | | The initialization options. In addition to the below options, it can also contain any of the [`AWS.Kinesis` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#constructor-property). |
| [options.compression] | <code>string</code> | | The kind of data compression to use with records. The currently available compression options are either `"LZ-UTF8"` or none. |
| [options.consumerGroup] | <code>string</code> | | The name of the group of consumers in which shards will be distributed and checkpoints will be shared. If not provided, it defaults to the name of the application/project using this module. |
| [options.createStreamIfNeeded] | <code>boolean</code> | <code>true</code> | Whether if the Kinesis stream should be automatically created if it doesn't exist upon connection |
| [options.dynamoDb] | <code>object</code> | <code>{}</code> | The initialization options for the DynamoDB client used to store the state of the consumers. In addition to `tableNames` and `tags`, it can also contain any of the [`AWS.DynamoDB` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB.html#constructor-property). |
| [options.dynamoDb] | <code>Object</code> | <code>{}</code> | The initialization options for the DynamoDB client used to store the state of the consumers. In addition to `tableNames` and `tags`, it can also contain any of the [`AWS.DynamoDB` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB.html#constructor-property). |
| [options.dynamoDb.tableName] | <code>string</code> | | The name of the table in which to store the state of consumers. If not provided, it defaults to "lifion-kinesis-state". |
| [options.dynamoDb.tags] | <code>object</code> | | If provided, the client will ensure that the DynamoDB table where the state is stored is tagged with these tags. If the table already has tags, they will be merged. |
| [options.encryption] | <code>object</code> | | The encryption options to enforce in the stream. |
| [options.dynamoDb.tags] | <code>Object</code> | | If provided, the client will ensure that the DynamoDB table where the state is stored is tagged with these tags. If the table already has tags, they will be merged. |
| [options.encryption] | <code>Object</code> | | The encryption options to enforce in the stream. |
| [options.encryption.type] | <code>string</code> | | The encryption type to use. |
| [options.encryption.keyId] | <code>string</code> | | The GUID for the customer-managed AWS KMS key to use for encryption. This value can be a globally unique identifier, a fully specified ARN to either an alias or a key, or an alias name prefixed by "alias/". |
| [options.leaseAcquisitionInterval] | <code>number</code> | <code>20000</code> | The interval in milliseconds for how often to attempt lease acquisitions. |
| [options.leaseAcquisitionRecoveryInterval] | <code>number</code> | <code>5000</code> | The interval in milliseconds for how often to re-attempt lease acquisitions when an error is returned from aws. |
| [options.limit] | <code>number</code> | <code>10000</code> | The limit of records per get records call (only applicable with `useEnhancedFanOut` is set to `false`) |
| [options.logger] | <code>object</code> | | An object with the `warn`, `debug`, and `error` functions that will be used for logging purposes. If not provided, logging will be omitted. |
| [options.logger] | <code>Object</code> | | An object with the `warn`, `debug`, and `error` functions that will be used for logging purposes. If not provided, logging will be omitted. |
| [options.maxEnhancedConsumers] | <code>number</code> | <code>5</code> | An option to set the number of enhanced fan-out consumer ARNs that the module should initialize. Defaults to 5. Providing a number above the AWS limit (20) or below 1 will result in using the default. |
| [options.noRecordsPollDelay] | <code>number</code> | <code>1000</code> | The delay in milliseconds before attempting to get more records when there were none in the previous attempt (only applicable when `useEnhancedFanOut` is set to `false`) |
| [options.pollDelay] | <code>number</code> | <code>250</code> | When the `usePausedPolling` option is `false`, this option defines the delay in milliseconds in between poll requests for more records (only applicable when `useEnhancedFanOut` is set to `false`) |
| [options.s3] | <code>object</code> | <code>{}</code> | The initialization options for the S3 client used to store large items in buckets. In addition to `bucketName` and `endpoint`, it can also contain any of the [`AWS.S3` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#constructor-property). |
| [options.s3] | <code>Object</code> | <code>{}</code> | The initialization options for the S3 client used to store large items in buckets. In addition to `bucketName` and `endpoint`, it can also contain any of the [`AWS.S3` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#constructor-property). |
| [options.s3.bucketName] | <code>string</code> | | The name of the bucket in which to store large messages. If not provided, it defaults to the name of the Kinesis stream. |
| [options.s3.largeItemThreshold] | <code>number</code> | <code>900</code> | The size in KB above which an item should automatically be stored in s3. |
| [options.s3.nonS3Keys] | <code>Array.&lt;string&gt;</code> | <code>[]</code> | If the `useS3ForLargeItems` option is set to `true`, the `nonS3Keys` option lists the keys that will be sent normally on the kinesis record. |
Expand All @@ -115,7 +115,7 @@ Initializes a new instance of the Kinesis client.
| [options.statsInterval] | <code>number</code> | <code>30000</code> | The interval in milliseconds for how often to emit the "stats" event. The event is only available while the consumer is running. |
| options.streamName | <code>string</code> | | The name of the stream to consume data from (required) |
| [options.supressThroughputWarnings] | <code>boolean</code> | <code>false</code> | Set to `true` to make the client log ProvisionedThroughputExceededException as debug rather than warning. |
| [options.tags] | <code>object</code> | | If provided, the client will ensure that the stream is tagged with these tags upon connection. If the stream is already tagged, the existing tags will be merged with the provided ones before updating them. |
| [options.tags] | <code>Object</code> | | If provided, the client will ensure that the stream is tagged with these tags upon connection. If the stream is already tagged, the existing tags will be merged with the provided ones before updating them. |
| [options.useAutoCheckpoints] | <code>boolean</code> | <code>true</code> | Set to `true` to make the client automatically store shard checkpoints using the sequence number of the most-recently received record. If set to `false` consumers can use the `setCheckpoint()` function to store any sequence number as the checkpoint for the shard. |
| [options.useAutoShardAssignment] | <code>boolean</code> | <code>true</code> | Set to `true` to automatically assign the stream shards to the active consumers in the same group (so only one client reads from one shard at the same time). Set to `false` to make the client read from all shards. |
| [options.useEnhancedFanOut] | <code>boolean</code> | <code>false</code> | Set to `true` to make the client use enhanced fan-out consumers to read from shards. |
Expand Down Expand Up @@ -149,7 +149,7 @@ Writes a single data record into a stream.

| Param | Type | Description |
| --- | --- | --- |
| params | <code>object</code> | The parameters. |
| params | <code>Object</code> | The parameters. |
| params.data | <code>\*</code> | The data to put into the record. |
| [params.explicitHashKey] | <code>string</code> | The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash. |
| [params.partitionKey] | <code>string</code> | Determines which shard in the stream the data record is assigned to. If omitted, it will be calculated based on a SHA-1 hash of the data. |
Expand All @@ -167,7 +167,7 @@ List the shards of a stream.

| Param | Type | Description |
| --- | --- | --- |
| params | <code>object</code> | The parameters. |
| params | <code>Object</code> | The parameters. |
| [params.streamName] | <code>string</code> | If provided, the method will list the shards of the specific stream instead of the stream name provided during the consumer instantiation. |

<a name="module_lifion-kinesis--Kinesis+putRecords"></a>
Expand All @@ -181,27 +181,27 @@ Writes multiple data records into a stream in a single call.

| Param | Type | Description |
| --- | --- | --- |
| params | <code>object</code> | The parameters. |
| params.records | <code>Array.&lt;object&gt;</code> | The records associated with the request. |
| params | <code>Object</code> | The parameters. |
| params.records | <code>Array.&lt;Object&gt;</code> | The records associated with the request. |
| params.records[].data | <code>\*</code> | The record data. |
| [params.records[].explicitHashKey] | <code>string</code> | The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash. |
| [params.records[].partitionKey] | <code>string</code> | Determines which shard in the stream the data record is assigned to. If omitted, it will be calculated based on a SHA-1 hash of the data. |
| [params.streamName] | <code>string</code> | If provided, the record will be put into the specified stream instead of the stream name provided during the consumer instantiation. |

<a name="module_lifion-kinesis--Kinesis+getStats"></a>

#### kinesis.getStats() ⇒ <code>object</code>
#### kinesis.getStats() ⇒ <code>Object</code>
Returns statistics for the instance of the client.

**Kind**: instance method of [<code>Kinesis</code>](#exp_module_lifion-kinesis--Kinesis)
**Returns**: <code>object</code> - An object with the statistics.
**Returns**: <code>Object</code> - An object with the statistics.
<a name="module_lifion-kinesis--Kinesis.getStats"></a>

#### Kinesis.getStats() ⇒ <code>object</code>
#### Kinesis.getStats() ⇒ <code>Object</code>
Returns the aggregated statistics of all the instances of the client.

**Kind**: static method of [<code>Kinesis</code>](#exp_module_lifion-kinesis--Kinesis)
**Returns**: <code>object</code> - An object with the statistics.
**Returns**: <code>Object</code> - An object with the statistics.

## License

Expand Down
30 changes: 15 additions & 15 deletions lib/bucket.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ const equal = require('fast-deep-equal');
/**
* Checks if the specified bucket exists.
*
* @param {object} params - The params.
* @param {Object} params - The params.
* @param {string} params.bucketName - The name of the bucket to check.
* @param {object} params.client - An instance of the S3 client.
* @param {object} params.logger - A logger instance.
* @param {Object} params.client - An instance of the S3 client.
* @param {Object} params.logger - A logger instance.
* @returns {Promise} Resolves if the bucket exists, rejects otherwise.
* @private
*/
Expand All @@ -33,11 +33,11 @@ async function checkIfBucketExists({ bucketName, client, logger }) {
/**
* Ensures that the bucket is tagged as expected by reading the tags then updating them if needed.
*
* @param {object} params - The params.
* @param {Object} params - The params.
* @param {string} params.bucketName - The name of the bucket to check.
* @param {object} params.client - An instance of the S3 client.
* @param {object} params.logger - A logger instance.
* @param {object} params.tags - The tags that should be present in the bucket.
* @param {Object} params.client - An instance of the S3 client.
* @param {Object} params.logger - A logger instance.
* @param {Object} params.tags - The tags that should be present in the bucket.
* @returns {Promise} Resolves if the bucket is tagged properly, rejects otherwise.
* @memberof module:bucket
*/
Expand All @@ -63,11 +63,11 @@ async function confirmBucketTags({ bucketName, client, logger, tags }) {
/**
* Ensures that the bucket rules are defined properly
*
* @param {object} params - The params.
* @param {Object} params - The params.
* @param {string} params.bucketName - The name of the bucket to check.
* @param {object} params.client - An instance of the S3 client.
* @param {object} params.logger - A logger instance.
* @param {object} params.streamName - The name of the kinesis stream.
* @param {Object} params.client - An instance of the S3 client.
* @param {Object} params.logger - A logger instance.
* @param {Object} params.streamName - The name of the kinesis stream.
* @returns {Promise} Resolves if the bucket is ruled properly, rejects otherwise.
* @memberof module:bucket
*/
Expand Down Expand Up @@ -108,9 +108,9 @@ async function confirmBucketLifecycleConfiguration({ bucketName, client, logger,
/**
* Checks if a bucket exist and if not creates it.
*
* @param {object} params - The params.
* @param {object} params.client - An instance of the S3 client.
* @param {object} params.logger - A logger instance.
* @param {Object} params - The params.
* @param {Object} params.client - An instance of the S3 client.
* @param {Object} params.logger - A logger instance.
* @param {string} params.bucketName - The name of the bucket to create.
* @returns {Promise} Resolves if the bucket exists and is accessible, rejects otherwise.
* @memberof module:bucket
Expand All @@ -121,7 +121,7 @@ async function ensureBucketExists({ bucketName, client, logger }) {
try {
logger.debug(`Verifying the "${bucketName}" bucket exists and accessible…`);
return await checkIfBucketExists({ bucketName, client, logger });
} catch (err) {
} catch {
logger.debug('Trying to create the bucket…');
return client.createBucket(params);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/compression.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ module.exports = {
*
* @param {Buffer} input - The buffer to decompress.
* @param {string} inputEncoding - The encoding of the input buffer to decompress.
* @fulfil {String} - A decompressed UTF-8 string.
* @fulfil {string} - A decompressed UTF-8 string.
* @returns {Promise}
*/
decompress: (input, inputEncoding) =>
Expand Down
16 changes: 8 additions & 8 deletions lib/consumers-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ const privateData = new WeakMap();
/**
* Provides access to the private data of the specified instance.
*
* @param {object} instance - The private data's owner.
* @returns {object} The private data.
* @param {Object} instance - The private data's owner.
* @returns {Object} The private data.
* @private
*/
function internal(instance) {
Expand All @@ -33,21 +33,21 @@ class ConsumersManager {
/**
* Initializes an instance of the consumers manager.
*
* @param {object} options - The initialization options.
* @param {object} options.awsOptions - The initialization options for AWS.Kinesis.
* @param {object} options.client - An instance of the Kinesis client.
* @param {Object} options - The initialization options.
* @param {Object} options.awsOptions - The initialization options for AWS.Kinesis.
* @param {Object} options.client - An instance of the Kinesis client.
* @param {string} options.compression - The kind of data compression to use with records.
* @param {number} options.limit - The limit of records per get records call.
* @param {object} options.logger - An instance of a logger.
* @param {Object} options.logger - An instance of a logger.
* @param {number} options.noRecordsPollDelay - The delay in milliseconds before attempting to
* get more records when there were none in the previous attempt.
* @param {number} options.pollDelay - When the `usePausedPolling` option is `false`, this
* option defines the delay in milliseconds in between poll requests for more records.
* @param {Function} options.pushToStream - A function to push incoming records to the consumer.
* @param {object} options.s3 - The S3 options in the current kinesis client.
* @param {Object} options.s3 - The S3 options in the current kinesis client.
* @param {string|boolean} [options.shouldParseJson] - Whether if retrieved records' data should
* be parsed as JSON or not.
* @param {object} options.stateStore - An instance of the state store.
* @param {Object} options.stateStore - An instance of the state store.
* @param {string} options.streamName - The name of the Kinesis stream.
* @param {boolean} options.useAutoCheckpoints - Whether to automatically store shard checkpoints
* using the sequence number of the most-recently received record or not.
Expand Down
12 changes: 4 additions & 8 deletions lib/deaggregate.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

const ProtoBuf = require('protobufjs');

const aggJson = require('./aggregate-protobuf.json');

const KPL_MAGIC_NUMBER = 'f3899ac2';
Expand All @@ -19,19 +20,14 @@ const builder = ProtoBuf.Root.fromJSON(aggJson).lookupType(MESSAGE_NAME);
/**
* asynchronous deaggregation interface
*
* @param {object} kinesisRecord - The kinesis message
* @param {Object} kinesisRecord - The kinesis message
* @param {Function} perRecordCallback - A callback invoked for each deaggregated record
* @param {Function} afterRecordCallback - A callback invoked after all records have been deaggregated
*/
const deaggregate = (kinesisRecord, perRecordCallback, afterRecordCallback) => {
// we receive the record data as a base64 encoded string
const {
ApproximateArrivalTimestamp,
Data,
ExplicitPartitionKey,
PartitionKey,
SequenceNumber
} = kinesisRecord;
const { ApproximateArrivalTimestamp, Data, ExplicitPartitionKey, PartitionKey, SequenceNumber } =
kinesisRecord;
const recordBuffer = Buffer.from(Data, 'base64');

// first 4 bytes are the Kinesis Producer Library assigned magic number
Expand Down
Loading

0 comments on commit a586086

Please sign in to comment.