diff --git a/lib/state-store.js b/lib/state-store.js index 21509363..fa3ec061 100644 --- a/lib/state-store.js +++ b/lib/state-store.js @@ -459,23 +459,39 @@ class StateStore { } async lockStreamConsumer(consumerName, version) { - const { client, consumerGroup, consumerId, logger, streamName } = internal(this); + const { + client, + consumerGroup, + consumerId, + logger, + streamName, + useAutoShardAssignment + } = internal(this); + try { await client.update({ ConditionExpression: `#a.#b.#d = :z`, - ExpressionAttributeNames: { - '#a': 'enhancedConsumers', - '#b': consumerName, - '#c': 'isUsedBy', - '#d': 'version' - }, - ExpressionAttributeValues: { - ':x': consumerId, - ':y': generate(), - ':z': version - }, + ExpressionAttributeNames: Object.assign( + { + '#a': 'enhancedConsumers', + '#b': consumerName, + '#c': 'isUsedBy', + '#d': 'version' + }, + !useAutoShardAssignment && { '#e': 'shards' } + ), + ExpressionAttributeValues: Object.assign( + { + ':x': consumerId, + ':y': generate(), + ':z': version + }, + !useAutoShardAssignment && { ':e': {} } + ), Key: { consumerGroup, streamName }, - UpdateExpression: `SET #a.#b.#c = :x, #a.#b.#d = :y` + UpdateExpression: `SET #a.#b.#c = :x, #a.#b.#d = :y${ + !useAutoShardAssignment ? ', #a.#b.#e = if_not_exists(#a.#b.#e, :e)' : '' + }` }); return true; } catch (err) {