프로젝트/협업 프로젝트(2023.12.18-2024.01.25)

[Key Word 개발기] Redis pub/sub 실시간 알림 적용기

dal_been 2024. 2. 9. 17:40
728x90

이전 블로그에서 sse와 ConcurrentHashMap을 이용하여 실시간 알림을 구현해봤다.

그러나 ConcurrentHashMap은 문제점이 존재한다.

스케일 아웃시 코드가 제대로 동작하지 않는다. 그 이유는 SseEmitter객체를 서버의 메모리에서 저장하고 있기때문이다.

 

예를 들어 A는 was1에 접속했고 B는 was2에 접속했다.

앞서 말했듯이 SseEmitter객체는 서버의 메모리에 저장하고 있기때문에 A가 B에게 알림을 보낼 수 없다.

 

이 문제를 해결하기 위한 방법이 Redis pub/sub이다.

 


Redis pub/sub

위의 그림같이 레디스 pub/sub기능을 제공한다.

특정 채널을 구독한 subscriber들에게 메세지를 전달하는 것이다.

 

이 구조를 통해 scalueout된 모든 애플리케이션 서바가 publisher이자 consumer가 된다.

한 서버에서 알림이 발생하면 그 사실을 모든 애플리케이션서버에게 알려 subscriber에게 메세지를 전달한다.

 

다만 유의할 점은 redis pub/sub은 메세지를 던지는 형태이다. 즉 메세지를 따로 보관하지 않는다. 수신자가 메세지를 받았는지를 보장하지 않고 subscriber가 없어도 메세지를 publish하여 메세지가 사라진다(수힌확인을 하지 않음)

간단하게 말하자면 publisher는 메세지 던지면 끝니고, subscriber는 받으면 끝.

 

 

Redis pub/sub 과정

 

Redis 서버를 매개로, Redis클라이언트(publisher)가 Redis 서버내 '채널' 생성

-> 메세지를 수신하고 싶은 클라이언트는 사전에 해당 채널을 subscribe

-> 메서지를 보내는 클라이언트는(publisher) 해당 채널에 메세지를 publish 할 수 있음

-> 메세지를 보내는 클라이언트(publisher)가 메세지를 publish하면 subscriber중인 클라이언트는 메세지를 수신한다

 


Redis pub/sub 구현하기

 

자세한 코드 깃허브 참조

 

1. redis 디펜던시 추가

    implementation 'org.springframework.boot:spring-boot-starter-data-redis'

 

 

 

2. Redis config 설정

@Configuration
public class RedisConfig {

  @Value("${spring.redis.host}")
  private String redisHost;

  @Value("${spring.redis.port}")
  private int redisPort;

  @Bean
  public RedisConnectionFactory redisConnectionFactory(){
    return new LettuceConnectionFactory(redisHost, redisPort);
  }

  @Bean
  public RedisTemplate <String, MessageDto> redisTemplate(  RedisConnectionFactory redisConnectionFactory, ObjectMapper objectMapper){
    RedisTemplate<String, MessageDto> redisTemplate = new RedisTemplate<>();
    final Jackson2JsonRedisSerializer<MessageDto> jsonRedisSerializer = new Jackson2JsonRedisSerializer <>(
        MessageDto.class);
    jsonRedisSerializer.setObjectMapper(objectMapper);
    redisTemplate.setConnectionFactory(redisConnectionFactory);
    redisTemplate.setKeySerializer(new StringRedisSerializer());
    redisTemplate.setValueSerializer(jsonRedisSerializer);
    redisTemplate.setHashKeySerializer(RedisSerializer.string());
    redisTemplate.setHashValueSerializer(jsonRedisSerializer);
    return redisTemplate;
  }

  @Bean
  public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
    final RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
    redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
    return redisMessageListenerContainer;
  }
}

 

 

3. subscriber 클래스 생성

@Slf4j
@RequiredArgsConstructor
@Component
public class RedisSubscriber implements MessageListener {
  private final ObjectMapper objectMapper;
  private final EmitterRepository emitterRepository;
  @Override
  public void onMessage(Message message , byte[] pattern) {

    log.info("RedisSubscriber onMessage");

    MessageDto messageDto = serialize(message);
    SseEmitter sseEmitter=emitterRepository.findByUserId(messageDto.getUserId());
    if(sseEmitter==null){
      sseEmitter = emitterRepository.save(messageDto.getUserId());
    }
    sendToClient(sseEmitter,messageDto.getUserId(), messageDto);
  }
  private MessageDto serialize(final Message message) {
    try {
      return this.objectMapper.readValue(message.getBody(), MessageDto.class);
    } catch (IOException e) {
      throw new RuntimeException();
    }
  }

  private void sendToClient(final SseEmitter emitter, final Long id, final Object data) {
    try {
      emitter.send(SseEmitter.event()
          .id(id.toString())
          .name("addComment")
          .data(data));
    } catch (IOException e) {
      emitterRepository.deleteByUserId(id);
      log.error("SSE 연결이 올바르지 않습니다. 해당 memberID={}", id);
    }
  }
}

 

 

4. EmitterRepository 생성

@Repository
public class EmitterRepository {
  private final Map <Long, SseEmitter> emitters = new ConcurrentHashMap <>();

  public SseEmitter save(Long userId){
    SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE);
    emitters.put(userId,sseEmitter);
    return sseEmitter;
  }

  public SseEmitter findByUserId(Long userId){
    if(!emitters.containsKey(userId)){
      return null;
    }
    return emitters.get(userId);
  }

  public void deleteByUserId(Long userId){
    emitters.remove(userId);
  }
}

 

 

5. RedisMessageService 클래스 생성

@Service
@RequiredArgsConstructor
public class RedisMessageService {

  private static final String CHANNEL_PREFIX = "channel:";
  private final RedisMessageListenerContainer container;
  private final EmitterRepository emitterRepository;
  private final RedisTemplate<String, MessageDto> jsonRedisTemplate;
  private final RedisSubscriber subscriber;

  @Transactional(propagation = Propagation.REQUIRES_NEW)
  @TransactionalEventListener
  public void send(MessageDto event){
    System.out.println("여기까지는 오니???");
    Long userId=event.getUserId();
    jsonRedisTemplate.convertAndSend(getChannelName(userId),event);
  }

  public SseEmitter subscribe(Long userId){
    //연결되었는지 확인 메세지 전송
    SseEmitter emitter = emitterRepository.save(userId);
    try {
      emitter.send(SseEmitter.event().id(userId.toString()).name("connect").data("연결되었습니다"));
      container.addMessageListener(subscriber, ChannelTopic.of(getChannelName(userId)));
      checkEmitterStatus(emitter, subscriber,userId);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }

    return emitter;
  }

  private String getChannelName(Long userId){
    return CHANNEL_PREFIX+userId;
  }

  private void checkEmitterStatus(final SseEmitter emitter, final MessageListener messageListener, Long userId) {
    emitter.onCompletion(() -> {
      this.container.removeMessageListener(messageListener,ChannelTopic.of(getChannelName(userId)));
    });
    emitter.onTimeout(() -> {
      this.container.removeMessageListener(messageListener,ChannelTopic.of(getChannelName(userId)));
    });
  }
}

 

 

6. subscribe 및 send

@RequiredArgsConstructor
@Service
public class NotificationService {

  private final NoticeRepository noticeRepository;
  private final MemoRepository memoRepository;
  private final JwtUtils jwtUtils;
  private final ApplicationEventPublisher eventPublisher;
  private final RedisMessageService redisMessageService;

  @Transactional
  public void notifyAddCommentEvent(Comment comment, Long memoId,Long loginId) {
    Memo memo = memoRepository.findById(memoId).get();
    Long userId = memo.getUser().getId();
    Notice save = noticeRepository.save(Notice.builder()
        .user(memo.getUser())
        .infoId(memoId)
        .build());
    System.out.println("알림아 가야해!!!!!");
    if(loginId!=userId) {
      eventPublisher.publishEvent(MessageDto.builder()
          .noticeType(save.getNoticeType())
          .content(comment.getContent())
          .id(save.getId())
          .userId(userId)
          .build());
    }
  }

  public SseEmitter save(String token){
    Long userId = jwtUtils.getUserIdFromToken(token);
    return redisMessageService.subscribe(userId);
  }

}

 

 

7. subscribe위한 채널 생성 및 연결 컨트롤러

@RequiredArgsConstructor
@Slf4j
@RestController
public class SseController {

  private final NotificationService notificationService;

  @CrossOrigin
  @GetMapping(value = "/sub", consumes = MediaType.ALL_VALUE)
  public SseEmitter subscribe(@RequestParam String token) {
    SseEmitter emitter = notificationService.save(token);
    return emitter;
  }
}

 

 

구현한 걸 이미지로 보여주자면

여기서 RedisSubscriber클래스에서 onMessage가 호출될때 3,4번 과정이 실행되는 것이다.

 

추가적으로 알림을 이벤트를 발행한 이유는 느슨한결합을 위해서 이다.

만약 알림이 실패하더라도 댓글단 행위가 롤백이 되면안된다. 즉 주 비즈니스 로직에 큰 영향이 없기때문에 알림이 실패하더라도 롤백되지 않게 하기 위해 설정한 것이다.

 

스프링  이벤트에 대해서는 이미 적어둔 블로그가 있다.

 


코드만 보면 조금 헷갈릴 수 있다. 그래서 이전에 ConcurrentMap??으로 해둔 코드를 먼저 이해하고 redis pub/sub을 적용해보는걸 추천한다!