개발 일기 4편 : Redis Streams 기본 설계
개발 일기 3편
[ 단순 채팅 서버가 아닌, Socket 서버를 만들자 ]
실시간 서비스 도입을 위한 소켓 서버 구현을 진행중이다.
시작 단계에서 단순히 웹 소켓, 즉 데이터를 전달하는 통로에서 끝나는 것이 아닌
- 메시징 복구 시스템
- Thread safe
- backpressure 기능을 기반으로 데이터 흐름 제어
등이 고려된, Non-blocking 서비스를 만들고자 한다!
(일단 backpressure 기능은 프론트와의 협의 끝에 차후 도입으로 넘기로 했다.)
[ 채팅 서버, Relay를 곁들인 ]
웹소켓을 이용해서, 메시지를 받는 단순한 과정도, 실제 운영 환경에서 고려하다 보니,
어떤 시점에서 소켓을 닫을지, 세션을 어떻게 관리할지, ping - pong에 대한 heartbeat 검증 등 여러 고려해야할 요소들이 많았는데 이거는 나중에 한번 자세히 작성해 보겠다!
앞에서 말한 것처럼 소켓 서버를 구현하는게 목적이지만, 일단 현재 필요한 서비스는 “채팅”이었기 때문에
메시지를 최대한 안정적으로 처리할 수 있는 소켓 서버를 만드는 것을 목표로 했다!
전체적인 설계는 아래 그림과 같다.
사실상 MessageBroker에 들어가는 메시지와 채팅에서 사용하는 메시지가 비슷한 듯 다르다!
- 채팅 메시지를 : 채팅
- Broker의 메시지를 : 이벤트
- DB → broker 의 전달 매체 : Outbox
라고 앞으로 부르겠다!
설계 상에서 고려했던 부분은
1. 이벤트의 영속성
채팅이 와서 chatDB에 기록이 되고, relay가 이벤트를 발행하는데
이 때, 충분히 과정 중에 감지를 못하거나 유실될 가능성이 있다.
이는 후에 더 설명하겠지만, “Kafka“라는 분산 메시지 시스템을 “CDC”(캡쳐를 통한 변경 감지)로 사용하면
- 이벤트 영속성 보장
- CDC 커넥터를 통한 db 데이터 변경 감지
를 동시에 해결 할 수 있지만,
팀 특성 상 학습 난이도 + 비용적 문제가 있는 kafka를 사용하기는 어렵다 생각했고.
이를 RedisStreams와 MongoDB의 Change Streams 기능으로 대체했다.
2. 순서 보장 및 유실 최소화
이후에 타 실시간 서비스나, 데이터 유실 등을 고려하기로 했기 때문에,
Relay+Dispatcher 기능을 하는 Worker Server가 반드시 필요했고
RedisStreams를 사용해서
- 단순 큐잉 뿐 아니라
- 조건에 따른 서버 분기
- PEL을 활용한 이벤트 재처리
등을 효율적으로 할 수 있을 것이라 생각하고 설계했다!
[ 기본 구조 만들어 보기 ]
기본적으로
Socket Server의 init 시점에
- instance id 생성
- default consumer group 생성
하고
< Producer >
Message Worker가 producer역할로 outbox를 확인하고
- Redis의 접속 상태( 소켓이 열려있는 지 여부 )에 따라
socket:inbox:{instanceId}를 key로 event를 발행하는 것 (소켓 서버를 통한 메시지 전송 : ws)socket:notification:allkey로 event를 발행하는 것 (알림 서버 처리 : group event)
두가지 방식으로 event를 생산한다.
< Subscriber : Socket >
서버가 온전히 띄워진 후 (@EventListener(ApplicationReadyEvent::class))
- Flux.defer를 활용하여, 반복 read(event)
- 즉,
socket:inbox:{instanceId}스트림을 2초 간격 폴링 - defer를 활용하여 read할 때 마다 새로운 데이터를 flux stream으로 받음
- ws로 해당 event를 채팅으로 전환하여 전송!
하도록 구현했다.
참고 : Defer
Reactor에서는 기본적으로 just를 통해 stream에 값을 넣는다
하지만 중간에 값을 바꿔도 선언 과정에서 이미 streams에 넣었기 때문에 값이 바뀌지 않는다.
그래서 defer를 이용해서
supplier라는 함수형 인터페이스를 호출하고 Mono나 Flex라는 Stream에 값을 매 호출 때 넣어주는 방식을 사용한다.
그러면 중간에 값이 바뀐 변수를 사용할 때는 컴파일 된 값이 아닌, 해당 supplier가 실제 동작할 때 값을 가져오므로, 변경된 값을 읽어 올 수 있다.