home
🔀

SSE로 실시간 데이터 전송하기

Author
조하담 / Backend Engineer
Category
Development Glossary
Tags
Server-Sent Event
sse
AI Service
Published
2024/12/03
5 more properties
이 아티클에서는 다음과 같은 문제 상황에서 해결했던 방법과 구현에 대한 내용을 다룹니다.
LLM을 활용한 질의응답 서비스에서 답변 생성에 시간이 오래 걸림
실시간으로 생성되는 답변을 클라이언트에게 전달할 방법 필요
웹 브라우저와 웹 서버 간의 데이터 통신은 인터넷을 통해 이루어지며, 이를 위해 HTTP 표준이 사용됩니다. 그러나 클라이언트가 연속적인 정보를 서버에 전송하거나, 실시간으로 업데이트된 서버의 정보를 클라이언트에게 전달해야 하는 경우 지속적인 HTTP 요청이 필요하며 이는 비용 면에서 효율적이지 않습니다.
이러한 문제에 대응하기 위해 폴링, 웹소켓, 그리고 SSE(Server-Sent Event)가 등장했습니다.
Polling : 일정한 주기로 클라이언트가 서버에 요청을 보내어 최신 정보를 가져옵니다.주기적인 요청으로만 데이터를 갱신할 수 있어서, 실시간성을 보장하기 어렵습니다.
Websocket : 단일 TCP 연결을 통해 서버와 클라이언트 간 양방향 통신을 제공합니다.실시간 데이터 전송이 가능합니다.
SSE(Server-Sent Event)서버에서 클라이언트로 데이터를 전송하는 단방향 통신을 제공합니다.서버가 이벤트 스트림을 전송하여 실시간으로 업데이트된 정보를 제공합니다.ㄹ
이 글에서는 실무에서 SSE를 선택한 이유와, 라이브러리 의존 없이 SSE를 구현하고 트러블 슈팅(Trouble shooting)한 과정을 소개합니다.

요구사항

회사에서 개발 중인 제품은 텍스트 기반 지식과 사용자의 피드백을 활용하여 인공지능(AI)를 쉽게 훈련시키고 원하는 정보를 생성하거나 분석할 수 있도록 돕는 SaaS 생성 AI 플랫폼입니다. 간단히 말해, 기업들이 ChatGPT와 같은 대규모 언어 모델(Large Language Model, LLM)을 사용하여 내부 문서를 학습시키고 원하는 서비스에 적용할 수 있도록 지원하는 서비스입니다. 그 중에서도 사용자의 질문에 대답하는 기능을 구현한 과정을 도식화하면 다음과 같습니다.
해당 기능에 대한 요구사항은 다음과 같습니다.
사용자가 채팅창에 질문을 입력하면, LLM을 통해 생성된 답변이 화면에 표시되어야 합니다. 이를 구현하기 위한 도식을 살펴보면, 클라이언트가 질문 요청 API를 중계서버에 요청하면, 중계서버가 LLM을 거쳐 생성된 답변을 전달받아 클라이언트에게 응답합니다. 이렇게만 들으면 API 요청만으로도 구현이 가능할 것 같은데, 왜 SSE를 도입하려 했을까요?
“LLM이 답변을 완성하는데 생각보다 오래 걸린다.”
LLM이 답변을 생성하는데 걸리는 시간은 밀리초 단위가 아닌, 몇초가 소요됩니다. 또한 일정한 응답 속도를 보장하는 것도 아닙니다. 이러한 이유로, LLM 모델로부터 답변 콜백을 기다리는 동안 API 응답이 지연되고, 이는 서버 병목 현상 등을 야기할 수 있습니다. 이러한 문제를 방지하기 위해, LLM 모델에서 답변 생성이 완료되는 즉시 중계 서버가 실시간으로 클라이언트에게 답변을 전달할 수 있는 방법을 찾아야 했습니다.
따라서, 실시간성을 보장하기 위해 많이 사용되는 웹소켓과 SSE를 비교 분석한 결과, SSE를 선택하게 되었습니다.

Websocket 대신 SSE를 선택한 이유

1.
통신 방향 : 웹소켓과 SSE에 가장 큰 차이점은 통신 방향입니다. 사람들 간의 대화는 서로에게 메시지를 보내고 받으며 동시에 이에 대한 응답을 주고받기 때문에, 웹소켓과 같은 양방향 통신이 적합합니다. 반면 ChatGPT와 같은 챗봇과의 대화는 사용자가 요청을 보내고 챗봇이 응답하는 단계적인 과정으로 이루어지기 때문에, 단방향 통신이 더 적합합니다.
2.
구현 리소스 : Websocket은 TCP 프로토콜을 처리하기 위해 전이중 연결과 새로운 웹 소켓 서버가 필요합니다. 따라서, 시스템에 상당한 복잡성을 추가하고 초기 구현에 많은 리소스가 들어갑니다. 반면, SSE는 HTTP 프로토콜을 기반으로 하기 때문에 기존의 HTTP 연결을 재사용합니다. 또한 단순히 단방향 연결을 지원하므로 양방향 연결에 비해 시스템에 추가적인 복잡성이나 리소스가 들어가지 않습니다. 또한 Websocket은 HTTP 프로토콜이 아니기 때문에 에러 처리 표준을 제공하지 않으며, 연결이 끊겼을때 재접속 처리 또한 제공하지 않아서 개발 리소스가 증가할 수 있습니다.
요약하면, SSE는 클라이언트가 서버로부터 업데이트된 데이터만 받는 실시간 데이터 스트림에 대한 구현이 필요할 때 유용한 선택입니다. Websocket은 빠른 고품질의 양방향 연결이 필요한 경우에 유용하지만, 시스템에 복잡성을 추가하고 구현하는 데 많은 리소스가 필요하므로 Polling이나 SSE가 적합하지 않은 상황에 사용하는 것을 권장합니다.
ChatGPT 역시 SSE를 사용해서 질의응답 기능을 구현했는데요. ChatGPT는 “LLM이 답변을 완성하는데 생각보다 오래걸린다” 라는 문제를 어떻게 풀어냈는지에 대해, “개발자라면 알아둬야 할 ChatGPT의 동작 방식과 LLM”이라는 주제로 추후 따로 다뤄 볼 예정입니다.

라이브러리를 쓰지 않은 이유

초기에는 Node.js 라이브러리들 중에서 가장 많이 사용하는 sse 라이브러리 사용을 고려했었지만, 해당 라이브러리의 낮은 신뢰성으로 인해서 사용하지 않는 것으로 결정했습니다. 이 결정에는 다음과 같은 이유가 있습니다.
마지막으로 업데이트된 시기가 7년 전입니다.
Issue 관리가 원활히 이루어지지 않고 있습니다.
2개의 파일로 구현체를 만들어놓은 것이 전부입니다. 기본 연결에 최소한의 validaion을 추가해 캡슐화만 해놓았다고 봐도 무방합니다.
위 특징들을 고려했을 때, 라이브러리의 신뢰성과 유지보수 상태에 대한 의문이 들었습니다.
따라서 직접 SSE 기능을 구현함으로써 얻을 수 있는 확장성과 유지보수 측면을 고려하여, 불안정한 라이브러리를 사용하는 대신 직접 구현하는 방법을 선택했습니다.

SSE 구현

Node.js에서 서버-클라이언트 간의 SSE 통신을 관리하기 위해 두 가지 관리자 클래스를 구현했습니다.

SseConnectionManager

import { Request, Response } from 'express'; class SseConnectionManager { // 연결 정보를 저장하는 객체 private connections: { [k: string]: SseConnection } = {}; constructor() {} public createConnection(connectKey: string, req: Request, res: Response): void {} public hasConnection(connectKey: string): boolean {} public getConnection(connectKey: string): SseConnection {} public removeConnection(connectKey: string): void {} public closeImmediately(connectKey: string): void {} public closeWithTimeout(connectKey: string, timeout: number = SSE_TIMEOUT_SECONDS): void {} }; // SSE 연결 관리자의 인스턴스 생성 및 익스포트 export const sseConnectionManager: SseConnectionManager = new SseConnectionManager();
Plain Text
복사
SseConnectionManager는 SSE 연결을 관리하는 클래스입니다.
createConnection, removeConnection과 같은 메서드를 제공하여 연결을 생성 및 제거할 수 있습니다.
closeImmediately , closeWithTimeout 메서드들을 통해 연결을 즉시 닫거나 일정 시간이 지난 후 닫을 수 있습니다.
sse 연결을 생성하는 createConnection 메서드를 살펴보겠습니다.
import { Request, Response } from 'express'; import { responseSse } from '@/util/response'; const SSE_HEADERS = { 'Content-Type': 'text/event-stream', Connection: 'keep-alive', 'Cache-Control': 'no-cache', }; public createConnection(connectKey: string, req: Request, res: Response) { this.connections[connectKey] = { res }; req.on('close', () => { // 연결이 끊겼을 때 처리 로직 }); // 응답 헤더 설정 res.writeHead(202, SSE_HEADERS); // 클라이언트에게 연결 키 반환 responseSse(res, { sseConnectionKey: connectKey }); // 이하 코드 생략됨 }
Plain Text
복사
createConnection 메서드는 클라이언트로부터 요청을 받아 새로운 SSE 연결을 생성하는 역할을 합니다. 중복 연결을 방지하기 위해 랜덤한 연결 키를 생성하고, 이 키를 사용하여 새로운 연결을 메모리에 저장합니다. 또한 클라이언트와의 연결이 끊어졌을 때 이를 감지하여 연결을 제거하고, SSE 통신을 위한 응답 헤더를 설정하여 클라이언트에게 생성된 연결 키를 반환합니다.
// SSE 통신을 위한 응답 헤더 const SSE_HEADERS = { 'Content-Type': 'text/event-stream', Connection: 'keep-alive', 'Cache-Control': 'no-cache', };
Plain Text
복사
Content-Type: 'text/event-stream': SSE에서는 이벤트 스트림 형식으로 데이터를 전송하며, 클라이언트는 이를 해석하여 받은 데이터를 처리합니다.
Connection: 'keep-alive': SSE는 실시간 통신을 위해 서버와 클라이언트 간의 연결을 유지해야 하므로, 이 헤더를 설정하여 연결이 닫히지 않도록 합니다.
Cache-Control': 'no-cache' : SSE는 실시간 업데이트를 제공하는 데 사용되므로, 클라이언트가 캐시를 사용하지 않고 항상 최신 데이터를 받도록 합니다.
Postman을 통해 sse 연결 생성 API를 요청한 결과입니다. 해당 API 처리 로직에서 createConnection(req, res); 메서드가 호출됩니다.

SseEventManager

class SseEventManager { private eventConnections: { [k: string]: string[] } = {}; constructor() {} public broadcast( eventKey: string, data: object, timeout: number = SSE_TIMEOUT_SECONDS, ): void {} public hasEventKey(key: string): boolean {} public getListByEventKey(key: string): string[] {} public setEventSseConnection(eventKey: string, sseConnectionKey: string): void {} public releaseEventKey(key: string): void {} public releaseSseConnectionKey(eventKey: string, sseConnectionKey: string): void {} }; // SSE 이벤트 관리자의 인스턴스 생성 및 익스포트 export const sseEventManagers: Record<'threads', SseEventManager> = { threads: new SseEventManager(), };
Plain Text
복사
SseEventManagers는 SSE 이벤트를 관리하는 클래스 입니다.
SSE 이벤트는 서버에서 클라이언트로 전달되는 데이터의 종류를 나타냅니다. 예: 채팅 스레드 threads
broadcast 메서드를 사용하여 이벤트를 전파하고, setEventSseConnection을 통해 이벤트와 연결을 매핑합니다.
releaseEventKey, releaseSseConnectionKey 메서드들을 통해 이벤트 키와 연결 키를 해제할 수 있습니다.
이벤트 키에 해당하는 모든 연결에 데이터를 전송하는 broadcast 메서드를 살펴보겠습니다.
public broadcast( eventKey: string, data: object, timeout: number = SSE_TIMEOUT_SECONDS, ) { // 연결 객체의 응답에 데이터를 전송합니다. const connection = sseConnectionManager.getConnection(connKey); responseSse(connection.res as Response, data); // 주어진 타임아웃 값으로 연결을 종료합니다. sseConnectionManager.closeWithTimeout(connKey, timeout); // 이하 코드 생략됨 }) }
Plain Text
복사
broadcast 메서드는 주어진 이벤트 키에 해당하는 모든 SSE 연결에 데이터를 전송하는 역할을 합니다. 각 연결에 데이터를 전송하고, 연결이 유효하지 않을 경우 연결을 해제합니다. 각 연결 마다 데이터 전송 후 주어진 타임아웃 값으로 연결을 종료합니다.

Client Side Code

IE를 제외한 대부분의 브라우저는 SSE를 쉽게 사용할 수 있도록 기본적으로 EventSource API를 제공합니다. (Polyfill 라이브러리를 사용하면 IE에서도 SSE를 사용할 수 있습니다.)
if ('EventSource' in window) { // use polyfill } var source = new EventSource('URL');
Plain Text
복사
위 코드는 브라우저가 EventSource API를 지원하는지에 따라 분기 처리할 수 있는 코드입니다.
useEffect(() => { const sseEvents = new EventSource('http://localhost:3000/sse'); sseEvents.onopen = function() {}; sseEvents.onerror = function (error) {}; sseEvents.onmessage = function (stream) { const parsedData = JSON.parse(stream.data); }; }, []);
Plain Text
복사
이 코드는 React 함수형 컴포넌트에서 useEffect 훅을 사용하여 SSE를 설정하는 부분입니다. 서버로부터 SSE 메시지를 수신했을 때 onmessage 이벤트 핸들러가 실행됩니다.

트러블 슈팅으로 알게 된 것들

1. SSE는 특별한 응답 Spec을 가진다.

클라이언트 개발자의 요청 사항은 위와 같았습니다.
function responseSse(res: Response, data: object) { // 이외 코드 생략됨 return res.write( `data:${JSON.stringify({ status: 'success', data})}\n\n` ); }
Plain Text
복사
서버에서는 이처럼 전달할 데이터를 data 필드로 감싸서 전달하고 있었습니다. 클라이언트 요청에 따라 불필요해 보이는 data 필드를 제거하고 테스트한 결과, 빈 객체가 응답되었습니다.
이러한 응답이 발생한 이유는, Event Stream 포맷이 정해져 있기 때문입니다.
data 필드 사용: SSE에서는 각 이벤트를 data 필드에 담아 전송합니다. 이는 클라이언트에서 이벤트를 쉽게 구분할 수 있도록 합니다. 따라서 responseSse 함수는 data 필드에 객체를 담고 이를 JSON 형식으로 직렬화하여 전송합니다.
\n\n 추가: SSE에서는 각 이벤트가 끝나는 지점을 알려주기 위해 \n\n을 사용합니다. 이는 클라이언트에서 이벤트를 수신할 때마다 이벤트가 끝났음을 인식할 수 있도록 합니다.
해당 형식을 준수하지 않고 클라이언트에게 응답을 전송하면, 의도하지 않은 응답을 수신하거나 혹은 응답이 수신되지 않을 수 있으니 주의해야 합니다.

2. Axios 대신 Fetch API를 사용하자.

클라이언트에서 API 요청을 통해 SSE 연결을 시도하는 과정에서 ERR_INCOMPLETE_CHUNKED_ENCODING 오류가 발생하는 문제가 있었습니다. 이 오류는 일반적으로 웹 서버가 클라이언트에게 전송하는 데이터의 크기를 정확하게 파악할 수 없을 때 발생합니다. 서버로부터 전송된 데이터가 연결이 닫히기 전에 완전히 수신되지 않았음을 알 수 있습니다.
해당 이슈는 Axios 대신 Fetch API를 사용함으로써 해결되었습니다. Axios의 기반인 XHR은 스트리밍 응답을 지원하지 않으며 최신 업데이트가 중단되었습니다. 반면, Fetch API의 ReadableStream 인터페이스는 스트리밍 응답을 지원하여, SSE 연결을 통해 데이터를 실시간으로 처리할 수 있습니다. 따라서 이 문제를 해결하기 위해서는 Fetch API를 기반으로 한 다른 요청 라이브러리를 사용하거나 Fetch API를 직접 활용해야 합니다.

3. 인메모리 대신 외부 저장소를 사용한다.

현재 SSE 연결과 SSE 이벤트를 모두 인메모리로 관리하고 있습니다. 하지만 추후에 서버 인스턴스를 확장하는 경우 기능이 제대로 작동하지 않을 수 있습니다.
이러한 문제를 해결하기 위해서는 서버 간의 연결 정보를 공유할 수 있는 외부 저장소를 사용하는것이 좋습니다. Redis와 같은 분산 캐시나 데이터베이스를 활용하면 서버 인스턴스를 추가하더라도 모든 서버 간에 연결 정보가 공유되므로 스케일 아웃 문제를 해결할 수 있습니다.