Skip to content

Commit

Permalink
Merge pull request #75 from f-lab-edu/feature/73-pubsub
Browse files Browse the repository at this point in the history
[#73] Redis Pub/Sub 구현
  • Loading branch information
hoa0217 authored Apr 23, 2024
2 parents 8d91d88 + 383828f commit a5c379d
Show file tree
Hide file tree
Showing 16 changed files with 368 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void handler(String message) {
AlarmEvent alarmEvent = objectMapper.readValue(message, AlarmEvent.class);
alarmService.saveAndSend(alarmEvent);
} catch (JsonProcessingException e) {
throw new MessageParsingError();
throw new MessageParsingError(e.getMessage());
}
}
}
16 changes: 12 additions & 4 deletions src/main/java/com/modoospace/alarm/controller/AlarmController.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@
import com.modoospace.alarm.controller.dto.AlarmResponse;
import com.modoospace.alarm.domain.AlarmType;
import com.modoospace.alarm.service.AlarmService;
import com.modoospace.alarm.service.RedisMessageService;
import com.modoospace.config.auth.resolver.LoginMember;
import com.modoospace.member.domain.Member;
import lombok.RequiredArgsConstructor;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@RequiredArgsConstructor
Expand All @@ -18,19 +24,20 @@
public class AlarmController {

private final AlarmService alarmService;
private final RedisMessageService redisMessageService;


@GetMapping()
public ResponseEntity<Page<AlarmResponse>> search(@LoginMember Member loginMember,
Pageable pageable) {
Pageable pageable) {
Page<AlarmResponse> alarms = alarmService.searchAlarms(loginMember, pageable);
return ResponseEntity.ok().body(alarms);
}


@DeleteMapping("/{alarmId}")
public ResponseEntity<Void> delete(@PathVariable Long alarmId,
@LoginMember Member loginMember) {
@LoginMember Member loginMember) {
alarmService.delete(alarmId, loginMember);
return ResponseEntity.noContent().build();
}
Expand All @@ -43,7 +50,8 @@ public ResponseEntity<SseEmitter> subscribe(@LoginMember Member loginMember) {

@PostMapping(value = "/send/{email}")
public ResponseEntity<Void> send(@PathVariable String email) {
alarmService.send(email, new AlarmResponse(null, null, "테스트시설", AlarmType.NEW_RESERVATION));
redisMessageService.publish(email,
new AlarmResponse(null, null, "테스트시설", AlarmType.NEW_RESERVATION));
return ResponseEntity.noContent().build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ public class AlarmProducer {

public void send(AlarmEvent alarmEvent) {
try {
log.info("AlarmEvent produce to x.reservation");
log.info("AlarmEvent produce to x.alarm.work");
String message = objectMapper.writeValueAsString(alarmEvent);
rabbitTemplate.convertAndSend("x.alarm.work", "", message);
} catch (JsonProcessingException e) {
throw new MessageParsingError();
throw new MessageParsingError(e.getMessage());
}
}
}
25 changes: 25 additions & 0 deletions src/main/java/com/modoospace/alarm/publisher/RedisPublisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.modoospace.alarm.publisher;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.modoospace.common.exception.MessageParsingError;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@RequiredArgsConstructor
@Component
public class RedisPublisher {

private final ObjectMapper objectMapper;
private final StringRedisTemplate redisTemplate;

public void publish(String channel, Object data) {
try {
String message = objectMapper.writeValueAsString(data);
redisTemplate.convertAndSend(channel, message);
} catch (JsonProcessingException e) {
throw new MessageParsingError(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package com.modoospace.alarm.repository;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@Repository
@RequiredArgsConstructor
public class EmitterLocalCacheRepository {
public class EmitterMemoryRepository {

private final Map<String, SseEmitter> sseEmitterMap = new HashMap<>();
// thread-safe한 자료구조
private final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

public SseEmitter save(String id, SseEmitter sseEmitter) {
sseEmitterMap.put(getKey(id), sseEmitter);
Expand All @@ -24,7 +24,7 @@ public Optional<SseEmitter> find(String id) {
}

public void delete(String id) {
sseEmitterMap.remove(id);
sseEmitterMap.remove(getKey(id));
}

private String getKey(String id) {
Expand Down
46 changes: 13 additions & 33 deletions src/main/java/com/modoospace/alarm/service/AlarmService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,10 @@
import com.modoospace.alarm.domain.Alarm;
import com.modoospace.alarm.domain.AlarmRepository;
import com.modoospace.alarm.repository.AlarmQueryRepository;
import com.modoospace.alarm.repository.EmitterLocalCacheRepository;
import com.modoospace.common.exception.NotFoundEntityException;
import com.modoospace.common.exception.SSEConnectError;
import com.modoospace.config.redis.aspect.CachePrefixEvict;
import com.modoospace.member.domain.Member;
import com.modoospace.member.service.MemberService;
import java.io.IOException;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Page;
Expand All @@ -21,59 +17,43 @@
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Slf4j
@RequiredArgsConstructor
@Service
@Slf4j
public class AlarmService {

private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private static final String ALARM_NAME = "sse";

private final MemberService memberService;
private final AlarmRepository alarmRepository;
private final AlarmQueryRepository alarmQueryRepository;
private final EmitterLocalCacheRepository emitterRepository;
private final SseEmitterService sseEmitterService;
private final RedisMessageService messageService;

public Page<AlarmResponse> searchAlarms(Member loginMember, Pageable pageable) {

return alarmQueryRepository.searchByMember(loginMember, pageable);
}

public SseEmitter connectAlarm(String loginEmail) {
SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
emitterRepository.save(loginEmail, emitter);
SseEmitter sseEmitter = sseEmitterService.save(loginEmail);

sseEmitter.onTimeout(sseEmitter::complete);
sseEmitter.onError((e) -> sseEmitter.complete());
sseEmitter.onCompletion(() -> sseEmitterService.delete(loginEmail));

emitter.onCompletion(() -> emitterRepository.delete(loginEmail));
emitter.onTimeout(() -> emitterRepository.delete(loginEmail));
messageService.subscribe(loginEmail); // 채널 구독
sseEmitterService.send(sseEmitter, loginEmail,
"EventStream Created. [userId=" + loginEmail + "]"); // dummy 메세지 전송

sendToClient(emitter, loginEmail, "EventStream Created. [userId=" + loginEmail + "]");
return emitter;
return sseEmitter;
}

@Transactional
@CachePrefixEvict(cacheNames = "searchAlarms", key = "#alarmEvent.email")
public void saveAndSend(AlarmEvent alarmEvent) {
Member member = memberService.findMemberByEmail(alarmEvent.getEmail());
Alarm alarm = alarmRepository.save(alarmEvent.toEntity());
send(member.getEmail(), AlarmResponse.of(alarm));
}

public void send(String loginEmail, Object data) {
Optional<SseEmitter> optionalSseEmitter = emitterRepository.find(loginEmail);
optionalSseEmitter.ifPresent(sseEmitter -> sendToClient(sseEmitter, loginEmail, data));
}

private void sendToClient(SseEmitter emitter, String email, Object data) {
try {
emitter.send(SseEmitter.event()
.id(email)
.name(ALARM_NAME)
.data(data));
log.info("alarm event send SSE: " + email);
} catch (IOException exception) {
emitterRepository.delete(email);
throw new SSEConnectError();
}
messageService.publish(member.getEmail(), AlarmResponse.of(alarm));
}

@Transactional
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.modoospace.alarm.service;

import com.modoospace.alarm.publisher.RedisPublisher;
import com.modoospace.alarm.subscriber.RedisSubscribeListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
@Slf4j
public class RedisMessageService {

private final RedisMessageListenerContainer redisMessageListenerContainer;
private final RedisPublisher redisPublisher;
private final RedisSubscribeListener redisSubscribeListener;

// 채널 구독
public void subscribe(String channel) {
redisMessageListenerContainer.addMessageListener(redisSubscribeListener,
ChannelTopic.of(channel));
}

// 이벤트 발행
public void publish(String channel, Object message) {
redisPublisher.publish(channel, message);
}
}
51 changes: 51 additions & 0 deletions src/main/java/com/modoospace/alarm/service/SseEmitterService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.modoospace.alarm.service;

import com.modoospace.alarm.repository.EmitterMemoryRepository;
import com.modoospace.common.exception.SSEConnectError;
import java.io.IOException;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@RequiredArgsConstructor
@Service
@Slf4j
public class SseEmitterService {

@Value("${spring.sse.timeout}")
private Long timeout;

@Value("${spring.sse.name}")
private String name;

private final EmitterMemoryRepository emitterRepository;

public SseEmitter save(String email) {
return emitterRepository.save(email, new SseEmitter(timeout));
}

public void delete(String email) {
emitterRepository.delete(email);
}

public void sendToClient(String email, Object data) {
Optional<SseEmitter> optionalSseEmitter = emitterRepository.find(email);
optionalSseEmitter.ifPresent(sseEmitter -> send(sseEmitter, email, data));
}

public void send(SseEmitter emitter, String email, Object data) {
try {
emitter.send(SseEmitter.event()
.id(email)
.name(name)
.data(data));
log.info("SSE Send Event To: {}", email);
} catch (IOException exception) {
emitterRepository.delete(email);
throw new SSEConnectError(exception.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.modoospace.alarm.subscriber;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.modoospace.alarm.controller.dto.AlarmResponse;
import com.modoospace.alarm.service.SseEmitterService;
import com.modoospace.common.exception.MessageParsingError;
import java.io.IOException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

@RequiredArgsConstructor
@Component
@Slf4j
public class RedisSubscribeListener implements MessageListener {

private final SseEmitterService sseEmitterService;
private final ObjectMapper objectMapper;

// 채널 구독 객체
@Override
public void onMessage(Message message, byte[] pattern) {

try {
String email = new String(message.getChannel());
AlarmResponse alarmResponse = objectMapper.readValue(message.getBody(),
AlarmResponse.class);
log.info("Redis Subscribe Channel: {}", email);
log.info("Redis Subscribe Message: {}", alarmResponse.getMessage());

sseEmitterService.sendToClient(email, alarmResponse);
} catch (IOException e) {
throw new MessageParsingError(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,8 @@ public class MessageParsingError extends RuntimeException {
public MessageParsingError() {
super(MESSAGE);
}

public MessageParsingError(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

public class SSEConnectError extends RuntimeException {

private static final String MESSAGE = "알람 연결에 문제가 생겼습니다.";
private static final String MESSAGE = "알람 연결에 문제가 생겼습니다.";

public SSEConnectError() {
super(MESSAGE);
}
public SSEConnectError() {
super(MESSAGE);
}

public SSEConnectError(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.util.ErrorHandler;

@RequiredArgsConstructor
Expand All @@ -13,7 +14,7 @@ public class CustomErrorHandler implements ErrorHandler {

@Override
public void handleError(Throwable t) {
if (this.exceptionStrategy.isFatal(t)) {
if (this.exceptionStrategy.isFatal(t) && t instanceof ListenerExecutionFailedException) {
throw new ImmediateAcknowledgeAmqpException(
"Fatal exception encountered. Retry is futile: " + t.getMessage(), t);
}
Expand Down
Loading

0 comments on commit a5c379d

Please sign in to comment.