개발 일기 5편 : Redis Streams 유실 처리
개발 일기 5편
[ Redis Streams에 대해서 ]
아래 이미지를 간략히 분석해보자
- 여러 producer(노란색, 주황색, 빨간색 원)이 돌아가면서 이벤트를 발행하고
- 그 섞인 이벤트는 특정 Consumer Group(색상 구분)으로 배정된다!
- 큐처럼 정해진 순서를 가지고 메시지가 이동하며
- Batch 처리 (like Buffer) : N개의 이벤트(이미지에서는 4개)를 꺼내서
- Consumer(노란 네모)에 M개의 이벤트(노란 이벤트 3개)를 가져가는 구조이다!
- 실제 영상을 보면, 순차적으로 주황색, 빨간색 등 다른 Consumer가 순차적으로 방문하며
- 가져가는 것은 실제로는 appendOnly이기 때문에 COPY
< vs Redis PUB/SUB >
기존 redis pub/sub 모델은
- subscriber는 일단 publisher가 있는지 조차 알 수 없으며,
- 메시지의 순서 보장은 물론이고
- 네트워크 연결이 끊어지면 메시지가 손실
- 심지어는 publisher는 메시지를 잘 받았는지 알 수 없다.
이 단점을 보완하고자 Redis Streams의 경우
- 저장하되, Append Only만 가능(수정 X)하게 하여 기존 데이터에 접근하거나, 락을 걸지 않고, O(1)로 쓴다
- Consumer Group으로 메시지에 대해 여러 consumer의 병렬 작업 처리!
- 가장 중요한 승인 처리(ACK)를 통한 메시지의 소유권과 전달을 보장하고
- 추가로 CLAIM을 이용해서 소유권 이전
- 큐 역할의 순서 보장도 가능하다!
⇒ 차후에 Kafka와 비교 분석하는 글도 한번 써보겠다!
[ 이벤트 흐름 간단 정리 ]
<Trigger>
MongoChangeStreams (이후에 설명할 예정) 를 통해서
outbox에 데이터가 들어오면 Spring Listener로 해당 이벤트를 넘겨주는게 시작점!
< Relay : Producer >
Message Worker가 producer역할로 SpringListener를 통해 읽어온 메시지를
이벤트화 해서 redis에 xADD!
< Socket : Subscriber >
최초 서버 동작 시 2초 주기로 xRead를 통해 읽어오는 구조!
- 데이터가 없으면 빈 리스트를 전달
- 데이터가 있으면 Stream으로 전달 → websocket 서비스로 연결
[ 세션 단계의 Long Pooling? ]
최초 구상 단계에서, CDC처럼, Redis에 새로운 데이터가 들어온 것을 인식해서
push 방식으로 인식할 수 있을 것이라 기대했으나,
→ 실제로는 Long Pooling 방식이었기 때문에
소켓 서버가 신경 써야할 큐가 너무 많다!
세션 단계의 redis Streams를 가지는 것은 서버에 연결된 세션 수 만큼 계속 Long Pooling!
- pooling이 main webflux thread를 침범하지 않도록 설정은 가능하지만
- 중복 세션(서로 다른 기기로 채팅 접속)도 허용하기 때문에 유저 수가 늘어남에 따라 관리 어려움
소켓 서버가 신경 써야할 큐를 하나로!
서버 당 큐를 하나 만들고!
- 이미 Relay 서버 단에서 분기처리를 해줬으니, 그대로 받자 → Read할 큐가 소켓 서버 당 1개
대신 여러 유저의 메시지가 하나의 큐에 오기 때문에
- 유저별 메시지 순위 처리 → datetime 기준 정렬 (어차피 streams의 기본 기능)
- 유저별 메시지 분기 (순위처리 할 때, 가져오는 메시지를 유저별로 분기 하면 됨!)
[ 서버 부하보다는, 로직을 더 견고하게! ]
서버 단 큐로 구조를 변경하게 되면,
유저 또는 세션 수준에서 메시지 분기를 소켓 서버 단에서 진행되어야 한다!
즉, 애초에 Relay 서버에서 유저별 큐로 분기를 해주는 방향이 아닌,
소켓 서버 단에서
- 유저별 분기 큐
- 순서처리
- 물론 순서처리는 기존 구조에서도 필요하긴 했다!
하지만 이제는 조금 복잡하게 설계를 진행해야 한다.
세션 수 만큼의 큐에 대한 Pooling은 말도 안되는 구조라 생각하기 때문에… 다른 방법이 없는 거 같다.
< 1. 유저 분기 과정을 최소화! : Reclaim >
[ Sink에 대해서 ]
Sink를 논하기 전에 간단히 Session에 대해 알아보자
WebSocketSession
Session은 Stream을 호출하는 리모컨 정도로 생각하면 된다.
- 버튼을 누르면 특정 파이프를 하나 불러오거나
session.receive()
- 원하는 파이프의 끝을 출구(클라이언트)랑 연결
session.send(Flux)
그럼 우리가 기본적으로 사용하는 것은 String, Int, Object 같은 데이터고
WebSocket의 기본 매체는 통로를 이용하는데
데이터를 묶어서 통로에 넣을 수 있게 입구를 만들어주는 주는 게 Sinks.Many<T>다!
consumer에서 read한 이벤트를 그대로 사용자의 웹소켓에 연결되어 있는 통로에 넣자!
- read 한 Event(객체)를 받았을 때
- 미리 인 메모리로 유저-Sink 연결을 저장해두고
- event에서 유저 확인해서 연결된 Sink로 넣어주고
- sink가 이미 사용자의 webSocket으로 연결되어 있었다면?
event를 다이렉트로 receiver의 websocket에 전달할 수 있다!
추가로 (Sink에 넣었다 = 유저의 websocket에 메시지가 전달되었다)이기 때문에
ack에 대한 판단기준을
- 이후에 처리과정에서 손실이 발생할 수 있는 단순히 큐에 넣었다가 아니라
- 확실히 보장된 웹소켓으로 보냈다를 기준으로 잡을 수 있다!
Sinks.EmitResult.OK
< 2. 실패한 메시지 처리 : Reclaim >
실패 즉 xack를 못 받는 경우는 크게 두 가지이다.
- case1 : 순수 xack만 실패
- instance 살아있음
- WebSocket 연결 살아있음
- case 2 : 소켓 서버 다운
- 재배포 필요
- 새롭게 유저가 들어 왔을 때 재처리
이 기능의 핵심은 Redis Streams의 PEL 기능이다!
Pending Entries List는 xack를 못받은 메시지를 redis 내부의 저장소에 보관한다.
- 정확히는 모든 메시지를 전송과 함께 PEL에 보관
- xack를 받은 메시지를 지운다
그럼 이 pending 상태의 메시지는
- xpending으로 정체된 메시지 확인(Idle time check)하고,
- xclaim으로 재배정 이후 처리되도록 하는게 기본 구조다!
하지만 현재 아키텍쳐에서 xclaim을 이용한 자동 처리 구조는 사용할 수 없다!
- A라는 유저를 타겟으로 보낸 메시지는 A라는 유저가 있는 서버로 전송되어야하고
- streamKey 자체를 instanceID 값을 넣어 설정하기 때문에
- 다른 StreamKey를 사용하는 메시지에 대해서, cosumer를 변경해주는 xclaim은 사용할 수 없다
- 조금 더 이해하기 쉽게 설명하면, xclaim은 특정 stream으로 오는 데이터를
- msgID로 다른 consumer를 배정 + idle time 초기화 해주는데
- 기존에 예상했던, 동일 consumer group에 stream 분배 구조가 아니라
- socket group - server1, server 3
- socket group - server2 일때,
Stream 1
Stream 2
둘 다 socket group이라는 group에 포함되어 있으므로,
stream1의 메시지를 sever2에서 처리할 수 있는 것을 기대 했지만
Stream1의 메시지는 server1,3에서만 처리할 수 있었다ㅠㅠ
(group name만 동일하다고 같은 group이 아님!)
즉 앞서 언급했던
case1 : 순수 xack만 실패
- 이 경우에는 단순히 xclaim으로, 소유권을 유지한 채 idle time만 초기화 해주게 되면 ⇒ 재처리 = 해결!
case 2 : 소켓 서버 다운
- server heartbeat 확인 이후에, 다운 되었다라고 판단하면
- relay 서버에서 xpending으로 죽은 서버의 PEL을 옮겨 받고
- 죽은 메시지에 대한 재분기를 처리!
- 다른 서버에 접속 중일 때에는 단순 xadd하는 방식으로
- 미접속의 경우에는 알림을 주는 방식으로
<구조화>
고민 : 이 구조에서는 Socket 서버 입장에서, 재전송 메시지인지, 새 메시지인지 구분 할 수 없다!
PendingMessageService에서
최종적으로 결국 socket:inbox:{instanceId}로 동일하게 XADD 하기 때문에,
재전송 메시지에 대한 순서 변경이라던가, 표시 등의 추가 작업이 불가능한 구조이다!
→ 아얘 event payload 자체를 수정해야 함
고민 : Redis Streams의 가장 큰 장점인 병렬 처리 기능을 사용할 수 없는 구조이다!
socket:notificaiton:all의 경우 하나의 큐를 group으로 share하는 구조- 여러 알림 서버가 알아서 해당 stream을 처리하고
- 정상 처리가 실패한 경우 xclaim을 통한 재처리도 가능한 구조
socket:inbox:{instanceId}의 경우 각자의 큐를 가지고, group에 속한 subscriber도 1개(해당 소켓 서버)인 구조{instanceId} -> all로 해버리면, 처리가 pub/sub처럼 되는데 분기의 의미가 없어짐
고민 : Relay Worker에서 매 socket server의 heartbeat check가 필요
- 다른 방법이 없을까?