[Key Word 개발기] Redis pub/sub 실시간 알림 적용기
이전 블로그에서 sse와 ConcurrentHashMap을 이용하여 실시간 알림을 구현해봤다.
그러나 ConcurrentHashMap은 문제점이 존재한다.
스케일 아웃시 코드가 제대로 동작하지 않는다. 그 이유는 SseEmitter객체를 서버의 메모리에서 저장하고 있기때문이다.
예를 들어 A는 was1에 접속했고 B는 was2에 접속했다.
앞서 말했듯이 SseEmitter객체는 서버의 메모리에 저장하고 있기때문에 A가 B에게 알림을 보낼 수 없다.
이 문제를 해결하기 위한 방법이 Redis pub/sub이다.
Redis pub/sub
위의 그림같이 레디스 pub/sub기능을 제공한다.
특정 채널을 구독한 subscriber들에게 메세지를 전달하는 것이다.
이 구조를 통해 scalueout된 모든 애플리케이션 서바가 publisher이자 consumer가 된다.
한 서버에서 알림이 발생하면 그 사실을 모든 애플리케이션서버에게 알려 subscriber에게 메세지를 전달한다.
다만 유의할 점은 redis pub/sub은 메세지를 던지는 형태이다. 즉 메세지를 따로 보관하지 않는다. 수신자가 메세지를 받았는지를 보장하지 않고 subscriber가 없어도 메세지를 publish하여 메세지가 사라진다(수힌확인을 하지 않음)
간단하게 말하자면 publisher는 메세지 던지면 끝니고, subscriber는 받으면 끝.
Redis pub/sub 과정
Redis 서버를 매개로, Redis클라이언트(publisher)가 Redis 서버내 '채널' 생성
-> 메세지를 수신하고 싶은 클라이언트는 사전에 해당 채널을 subscribe
-> 메서지를 보내는 클라이언트는(publisher) 해당 채널에 메세지를 publish 할 수 있음
-> 메세지를 보내는 클라이언트(publisher)가 메세지를 publish하면 subscriber중인 클라이언트는 메세지를 수신한다
Redis pub/sub 구현하기
1. redis 디펜던시 추가
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
2. Redis config 설정
@Configuration
public class RedisConfig {
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.port}")
private int redisPort;
@Bean
public RedisConnectionFactory redisConnectionFactory(){
return new LettuceConnectionFactory(redisHost, redisPort);
}
@Bean
public RedisTemplate <String, MessageDto> redisTemplate( RedisConnectionFactory redisConnectionFactory, ObjectMapper objectMapper){
RedisTemplate<String, MessageDto> redisTemplate = new RedisTemplate<>();
final Jackson2JsonRedisSerializer<MessageDto> jsonRedisSerializer = new Jackson2JsonRedisSerializer <>(
MessageDto.class);
jsonRedisSerializer.setObjectMapper(objectMapper);
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(jsonRedisSerializer);
redisTemplate.setHashKeySerializer(RedisSerializer.string());
redisTemplate.setHashValueSerializer(jsonRedisSerializer);
return redisTemplate;
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
final RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
return redisMessageListenerContainer;
}
}
3. subscriber 클래스 생성
@Slf4j
@RequiredArgsConstructor
@Component
public class RedisSubscriber implements MessageListener {
private final ObjectMapper objectMapper;
private final EmitterRepository emitterRepository;
@Override
public void onMessage(Message message , byte[] pattern) {
log.info("RedisSubscriber onMessage");
MessageDto messageDto = serialize(message);
SseEmitter sseEmitter=emitterRepository.findByUserId(messageDto.getUserId());
if(sseEmitter==null){
sseEmitter = emitterRepository.save(messageDto.getUserId());
}
sendToClient(sseEmitter,messageDto.getUserId(), messageDto);
}
private MessageDto serialize(final Message message) {
try {
return this.objectMapper.readValue(message.getBody(), MessageDto.class);
} catch (IOException e) {
throw new RuntimeException();
}
}
private void sendToClient(final SseEmitter emitter, final Long id, final Object data) {
try {
emitter.send(SseEmitter.event()
.id(id.toString())
.name("addComment")
.data(data));
} catch (IOException e) {
emitterRepository.deleteByUserId(id);
log.error("SSE 연결이 올바르지 않습니다. 해당 memberID={}", id);
}
}
}
4. EmitterRepository 생성
@Repository
public class EmitterRepository {
private final Map <Long, SseEmitter> emitters = new ConcurrentHashMap <>();
public SseEmitter save(Long userId){
SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE);
emitters.put(userId,sseEmitter);
return sseEmitter;
}
public SseEmitter findByUserId(Long userId){
if(!emitters.containsKey(userId)){
return null;
}
return emitters.get(userId);
}
public void deleteByUserId(Long userId){
emitters.remove(userId);
}
}
5. RedisMessageService 클래스 생성
@Service
@RequiredArgsConstructor
public class RedisMessageService {
private static final String CHANNEL_PREFIX = "channel:";
private final RedisMessageListenerContainer container;
private final EmitterRepository emitterRepository;
private final RedisTemplate<String, MessageDto> jsonRedisTemplate;
private final RedisSubscriber subscriber;
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
public void send(MessageDto event){
System.out.println("여기까지는 오니???");
Long userId=event.getUserId();
jsonRedisTemplate.convertAndSend(getChannelName(userId),event);
}
public SseEmitter subscribe(Long userId){
//연결되었는지 확인 메세지 전송
SseEmitter emitter = emitterRepository.save(userId);
try {
emitter.send(SseEmitter.event().id(userId.toString()).name("connect").data("연결되었습니다"));
container.addMessageListener(subscriber, ChannelTopic.of(getChannelName(userId)));
checkEmitterStatus(emitter, subscriber,userId);
} catch (IOException e) {
throw new RuntimeException(e);
}
return emitter;
}
private String getChannelName(Long userId){
return CHANNEL_PREFIX+userId;
}
private void checkEmitterStatus(final SseEmitter emitter, final MessageListener messageListener, Long userId) {
emitter.onCompletion(() -> {
this.container.removeMessageListener(messageListener,ChannelTopic.of(getChannelName(userId)));
});
emitter.onTimeout(() -> {
this.container.removeMessageListener(messageListener,ChannelTopic.of(getChannelName(userId)));
});
}
}
6. subscribe 및 send
@RequiredArgsConstructor
@Service
public class NotificationService {
private final NoticeRepository noticeRepository;
private final MemoRepository memoRepository;
private final JwtUtils jwtUtils;
private final ApplicationEventPublisher eventPublisher;
private final RedisMessageService redisMessageService;
@Transactional
public void notifyAddCommentEvent(Comment comment, Long memoId,Long loginId) {
Memo memo = memoRepository.findById(memoId).get();
Long userId = memo.getUser().getId();
Notice save = noticeRepository.save(Notice.builder()
.user(memo.getUser())
.infoId(memoId)
.build());
System.out.println("알림아 가야해!!!!!");
if(loginId!=userId) {
eventPublisher.publishEvent(MessageDto.builder()
.noticeType(save.getNoticeType())
.content(comment.getContent())
.id(save.getId())
.userId(userId)
.build());
}
}
public SseEmitter save(String token){
Long userId = jwtUtils.getUserIdFromToken(token);
return redisMessageService.subscribe(userId);
}
}
7. subscribe위한 채널 생성 및 연결 컨트롤러
@RequiredArgsConstructor
@Slf4j
@RestController
public class SseController {
private final NotificationService notificationService;
@CrossOrigin
@GetMapping(value = "/sub", consumes = MediaType.ALL_VALUE)
public SseEmitter subscribe(@RequestParam String token) {
SseEmitter emitter = notificationService.save(token);
return emitter;
}
}
구현한 걸 이미지로 보여주자면
여기서 RedisSubscriber클래스에서 onMessage가 호출될때 3,4번 과정이 실행되는 것이다.
추가적으로 알림을 이벤트를 발행한 이유는 느슨한결합을 위해서 이다.
만약 알림이 실패하더라도 댓글단 행위가 롤백이 되면안된다. 즉 주 비즈니스 로직에 큰 영향이 없기때문에 알림이 실패하더라도 롤백되지 않게 하기 위해 설정한 것이다.
코드만 보면 조금 헷갈릴 수 있다. 그래서 이전에 ConcurrentMap??으로 해둔 코드를 먼저 이해하고 redis pub/sub을 적용해보는걸 추천한다!