Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@PulsarListener configured ackTimeoutMillis code to be re-delivered only twice after execution timeout, but my policy configured re-delivery for 10 times, and it was not delivered to the dead letter queue. #1019

Open
Programmer-yyds opened this issue Jan 22, 2025 · 0 comments
Labels
status: waiting-for-triage An issue we've not yet triaged

Comments

@Programmer-yyds
Copy link

Programmer-yyds commented Jan 22, 2025

In the current code, the consumeString method is configured with timeout of 1000 and code dormancy of 30*1000. Under normal circumstances, timeout retry should be triggered, but it was only retried twice, while my policy was configured with 10 redeliveries, and it was not delivered to the dead letter queue topic-1-dlq-topic.

code segment:
`
@RestController
public class TaskAsyncController {
private static final DateTimeFormatter sdf = DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss");
private static final Logger log = LoggerFactory.getLogger(TaskAsyncController.class);
static Pattern pattern = Pattern.compile("\.([a-zA-Z0-9]+)(?:\?|$)");
@resource
private PulsarTemplate strProducer;

@PostMapping(value = "/task/test")
public Result<String> test() {
    LocalDateTime now = LocalDateTime.now();
    String format = sdf.format(now);
    strProducer.send("topic-1", "date:" + format);
    return Result.success("");
}

@PulsarListener(topics = "topic-1", ackMode = AckMode.RECORD,
        subscriptionType = SubscriptionType.Shared, schemaType = SchemaType.STRING,
        ackTimeoutRedeliveryBackoff = "testAckTimeoutRedeliveryBackoff",
        negativeAckRedeliveryBackoff = "testAckTimeoutRedeliveryBackoff",
        deadLetterPolicy = "deadLetterPolicy", properties = {"ackTimeoutMillis=1000"})
public void consumeString(String message) throws InterruptedException {
    log.info("date:" + sdf.format(LocalDateTime.now()));
    log.info("enter_sleep");
    Thread.sleep(30 * 1000);
    log.info("awaken");
}

@PulsarListener(topics = "topic-1-dlq-topic", ackMode = AckMode.RECORD,
        subscriptionType = SubscriptionType.Shared, schemaType = SchemaType.STRING)
public void consumeStringD(String message) {
    log.info("dead_letter_received_message_in_the_column,date:" + sdf.format(LocalDateTime.now()));
}
@Bean("testAckTimeoutRedeliveryBackoff")
RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
    return MultiplierRedeliveryBackoff.builder()
            .minDelayMs(1000)
            .maxDelayMs(2 * 1000)
            .multiplier(2)
            .build();
}
@Bean
DeadLetterPolicy deadLetterPolicy() {
    return DeadLetterPolicy.builder()
            .maxRedeliverCount(10)
            .deadLetterTopic("topic-1-dlq-topic")
            .build();
}

}
`

log:
Image

@onobc onobc added the status: waiting-for-triage An issue we've not yet triaged label Jan 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-triage An issue we've not yet triaged
Projects
None yet
Development

No branches or pull requests

2 participants