Skip to content

Commit

Permalink
feature: DLQ의 재처리 로직을 구현한다.
Browse files Browse the repository at this point in the history
- 헤더에 RETRY_COUNT_HEADER값이 없으면 0으로 설정 후 reservation큐로 다시 보낸다. 그리고 1을 더한다.
- 헤더에 RETRY_COUNT_HEADER값이 3미만이면 reservation큐로 다시 보낸다. 그리고 1을 더한다.
- 헤더에 RETRY_COUNT_HEADER값이 3이상이면 메세지를 소멸시킨다.

Related: #70
  • Loading branch information
hoa0217 committed Apr 5, 2024
1 parent faad5e8 commit 33a995c
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 2 deletions.
39 changes: 39 additions & 0 deletions src/main/java/com/modoospace/alarm/consumer/AlarmDLQConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.modoospace.alarm.consumer;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@RequiredArgsConstructor
@Component
@Slf4j
public class AlarmDLQConsumer {

private static final String RETRY_COUNT_HEADER = "x-retries_count";

private final RabbitTemplate rabbitTemplate;

@Value("${spring.rabbitmq.retry_count}")
private int retryCount;

@RabbitListener(queues = "q.reservation.dlx")
public void processFailedMessagesRequeue(Message failedMessage) {
Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders()
.get(RETRY_COUNT_HEADER);
if (retriesCnt == null) {
retriesCnt = 0;
}
if (retriesCnt >= retryCount) {
log.info("Discarding message");
return;
}
log.info("Retrying message for the {} time", retriesCnt);
failedMessage.getMessageProperties().getHeaders().put(RETRY_COUNT_HEADER, ++retriesCnt);
rabbitTemplate.send("x.reservation",
failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public abstract class AbstractIntegrationContainerBaseTest {
System.setProperty("spring.rabbitmq.port", MY_RABBITMQ_CONTAINER.getMappedPort(5672).toString());
System.setProperty("spring.rabbitmq.username", "guest");
System.setProperty("spring.rabbitmq.password", "guest");
System.setProperty("spring.rabbitmq.retry_count", "3");
RabbitMQTestInitializer.initializeRabbitMQ(MY_RABBITMQ_CONTAINER);
}
}
2 changes: 0 additions & 2 deletions src/test/java/com/modoospace/RabbitMQTestInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

public class RabbitMQTestInitializer {

private static final String QUEUE_NAME = "RESERVATION";

static void initializeRabbitMQ(RabbitMQContainer container) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(container.getHost());
Expand Down
114 changes: 114 additions & 0 deletions src/test/java/com/modoospace/alarm/consumer/AlarmDLQConsumerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package com.modoospace.alarm.consumer;

import static org.assertj.core.api.Assertions.assertThat;

import com.modoospace.AbstractIntegrationContainerBaseTest;
import com.modoospace.alarm.controller.dto.AlarmEvent;
import com.modoospace.alarm.domain.Alarm;
import com.modoospace.alarm.domain.AlarmRepository;
import com.modoospace.alarm.domain.AlarmType;
import com.modoospace.member.domain.Member;
import com.modoospace.member.domain.MemberRepository;
import com.modoospace.member.domain.Role;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.testcontainers.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;

class AlarmDLQConsumerTest extends AbstractIntegrationContainerBaseTest {
private static final String RETRY_COUNT_HEADER = "x-retries_count";

@Autowired
private AlarmDLQConsumer alarmDLQConsumer;

@Autowired
private MemberRepository memberRepository;

@Autowired
private AlarmRepository alarmRepository;

@Value("${spring.rabbitmq.retry_count}")
private int retryCount;

private final ObjectMapper objectMapper = new ObjectMapper();

private AlarmEvent alarmEvent;

@BeforeEach
public void setup() {
Member member = Member.builder()
.email("member@email")
.name("member")
.role(Role.VISITOR)
.build();
memberRepository.save(member);
alarmEvent = AlarmEvent.builder()
.email(member.getEmail())
.reservationId(1L)
.facilityName("test facility")
.alarmType(AlarmType.NEW_RESERVATION)
.build();
}

@AfterEach
public void after() {
memberRepository.deleteAll();
alarmRepository.deleteAll();
}

@DisplayName("RETRY_COUNT_HEADER 값이 없을 경우 0으로 초기화한 후 재처리한다. 그리고 1을 더한다.")
@Test
public void processFailedMessagesRequeue_retry_ifHEADER_X_RETRIES_COUNT_null()
throws JsonProcessingException, InterruptedException {
String body = objectMapper.writeValueAsString(alarmEvent);
Message message = new Message(body.getBytes());
alarmDLQConsumer.processFailedMessagesRequeue(message);

Thread.sleep(1000);

List<Alarm> alarms = alarmRepository.findAll();
assertThat(alarms).hasSize(1);
assertThat((Integer) message.getMessageProperties().getHeaders()
.get(RETRY_COUNT_HEADER)).isEqualTo(1);
}

@DisplayName("RETRY_COUNT_HEADER 값이 retryCount미만일 경우 재처리한다. 그리고 1을 더한다.")
@Test
public void processFailedMessagesRequeue_retry_ifHEADER_X_RETRIES_COUNT_less()
throws JsonProcessingException, InterruptedException {
String body = objectMapper.writeValueAsString(alarmEvent);
Message message = new Message(body.getBytes());
message.getMessageProperties().getHeaders().put(RETRY_COUNT_HEADER, retryCount - 1);
alarmDLQConsumer.processFailedMessagesRequeue(message);

Thread.sleep(1000);

List<Alarm> alarms = alarmRepository.findAll();
assertThat(alarms).hasSize(1);
assertThat((Integer) message.getMessageProperties().getHeaders()
.get(RETRY_COUNT_HEADER)).isEqualTo(retryCount);
}

@DisplayName("RETRY_COUNT_HEADER 값이 3이상일 경우 메세지를 소멸시킨다.")
@Test
public void processFailedMessagesRequeue_notRetry_ifHEADER_X_RETRIES_COUNT_over3()
throws JsonProcessingException, InterruptedException {
String body = objectMapper.writeValueAsString(alarmEvent);
Message message = new Message(body.getBytes());
message.getMessageProperties().getHeaders().put(RETRY_COUNT_HEADER, retryCount);
alarmDLQConsumer.processFailedMessagesRequeue(message);

Thread.sleep(1000);

List<Alarm> alarms = alarmRepository.findAll();
assertThat(alarms).hasSize(0);
assertThat((Integer) message.getMessageProperties().getHeaders()
.get(RETRY_COUNT_HEADER)).isEqualTo(retryCount);
}
}

0 comments on commit 33a995c

Please sign in to comment.