Skip to content

Commit

Permalink
Improve delete reliability for kinesis streams (localstack#10371)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikschubert authored Mar 1, 2024
1 parent 6ec63bf commit c1b82d7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
28 changes: 20 additions & 8 deletions localstack/services/dynamodbstreams/dynamodbstreams_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import contextlib
import logging
import threading
import time
from typing import Dict

from bson.json_util import dumps
Expand All @@ -11,6 +9,7 @@
from localstack.services.dynamodbstreams.models import DynamoDbStreamsStore, dynamodbstreams_stores
from localstack.utils.aws import arns, resources
from localstack.utils.common import now_utc
from localstack.utils.threads import FuncThread

DDB_KINESIS_STREAM_NAME_PREFIX = "__ddb_stream_"

Expand Down Expand Up @@ -94,12 +93,25 @@ def delete_streams(account_id: str, region_name: str, table_arn: str) -> None:
table_name = table_name_from_table_arn(table_arn)
if store.ddb_streams.pop(table_name, None):
stream_name = get_kinesis_stream_name(table_name)
with contextlib.suppress(Exception):
connect_to(aws_access_key_id=account_id, region_name=region_name).kinesis.delete_stream(
StreamName=stream_name
)
# sleep a bit, as stream deletion can take some time ...
time.sleep(1)
# stream_arn = stream["StreamArn"]

# we're basically asynchronously trying to delete the stream, or should we do this "synchronous" with the table deletion?
def _delete_stream(*args, **kwargs):
try:
kinesis_client = connect_to(
aws_access_key_id=account_id, region_name=region_name
).kinesis
# needs to be active otherwise we can't delete it
kinesis_client.get_waiter("stream_exists").wait(StreamName=stream_name)
kinesis_client.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True)
kinesis_client.get_waiter("stream_not_exists").wait(StreamName=stream_name)
except Exception:
LOG.warning(
f"Failed to delete underlying kinesis stream for dynamodb table {table_arn=}",
exc_info=LOG.isEnabledFor(logging.DEBUG),
)

FuncThread(_delete_stream).start() # fire & forget


def get_kinesis_stream_name(table_name: str) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ def delete(
- kinesis:RemoveTagsFromStream
"""
model = request.desired_state
request.aws_client_factory.kinesis.delete_stream(StreamARN=model["Arn"])
request.aws_client_factory.kinesis.delete_stream(
StreamARN=model["Arn"], EnforceConsumerDeletion=True
)
return ProgressEvent(
status=OperationStatus.SUCCESS,
resource_model={},
Expand Down

0 comments on commit c1b82d7

Please sign in to comment.