Use AMQP (w/ RabbitMQ)
계속해서 메시징 관련해 이번 포스팅에선 AMQP 에 대해 좀 보려고 합니다.
ZeroMQ 나 Redis 의 pub/sub 등의 메시징 시스템을 간단하게 사용하는 것으로도 충분히 그 장점을 느낄 수 있지만,
이러한 시스템들은 단순한만큼 신뢰성은 약간 떨어진다고 할 수 있습니다. 물론 그런 상황이니까 사용하는 것도 있지만요.
메시징 시스템에선 신뢰성을 설명할 때 QoS(Quality of Service) 의 레벨을 들곤 합니다. 그 레벨은 아래와 같습니다.
- QoS0
- 최대 한 번. 가장 간단하며 메시지가 지속되지도 않고 전달되는 것을 확인하지도 않습니다. 따라서 이 경우엔 메시지가 손실될 수 있습니다.
- QoS1
- 최소 한 번. 메시지가 적어도 한번은 수신되도록 보장합니다. 메시지를 받았을 경우 클라이언트는 무조건 ACK 를 보내야 하지만 이걸 보내기 전에 메시지가 중복으로 올 수 있습니다. 이 부분은 개발자가 잘 처리해야 합니다.
- QoS2
- 정확히 한 번. 가장 흔하며 가장 안정적인 QoS 입니다. 메시지가 한번만 수신되는 것을 보장하며 대신 좀 느릴 수 있습니다.
위에서 얘기하긴 했지만 비교적 간단히 사용할 수 있는 Redis 의 pub/sub 같은 경우엔 QoS0 을 구현합니다.
다른 QoS 를 구현하기 위해선 pub/sub 에 의존하지 않고 Redis 의 다른 명령어들을 사용해 구현할 순 있지만 굳이? 라는 느낌은 있네요.
안정적인 메시지 대기열을 사용하기 위해 AMQP 를 살펴보려고 합니다.
일반적으로 다루는 비즈니스 상 메시지를 분실하면 안되는 금융, 은행 시스템이나 꼭 이런 시스템이 아니더라도 근래엔 분산 어플리케이션 구조에서 메시지를 분실해선 안되는 케이스가 많은데, 이런 구조에선 메시지 전달의 안전이 입증된 프로토콜을 사용합니다.
AMQP 는 많은 메시지 대기열 시스템에서 지원하는 개방형 표준 프로토콜이며 라우팅, 필터링, 대기열 처리등의 모델들을 제공합니다.
- Queue (대기열)
- 클라이언트가 사용하는 메시지를 저장하는 데이터 구조입니다. 대기열의 메시지들은 하나 이상의 사용자에게 푸시되며, 여러 사용자가 대기열에 붙어있는 경우 메시지는 로드밸런싱 됩니다. 3가지 종류의 Queue 가 있습니다.
- Durable (영구적 큐) - 브로커가 다시 시작되면 대기열이 자동을 다시 만들어지며, 모든 컨텐츠가 보존된다는 것은 아닙니다. persistent 로 표시된 메시지만 디스크에 저장되고 이런 메시지만 재시작시 복원됩니다.
- Exclusive (독점적 큐) - 하나의 특정 구독자 연결에만 바인딩되며, 연결이 닫히면 Queue 도 소멸됩니다.
- Auto-delete (자동 삭제 큐) - 마지막 구독자의 연결이 끊어지면 Queue 도 삭제됩니다.
- 클라이언트가 사용하는 메시지를 저장하는 데이터 구조입니다. 대기열의 메시지들은 하나 이상의 사용자에게 푸시되며, 여러 사용자가 대기열에 붙어있는 경우 메시지는 로드밸런싱 됩니다. 3가지 종류의 Queue 가 있습니다.
- Exchange (교환기)
- 교환기로 메시지가 게시됩니다. 메시지를 하나 이상의 Queue 로 라우팅합니다.
- Direct (직접 교환기) - 라우팅 키를 일치시켜 메시지를 라우팅합니다.
- Topic (토픽 교환기) - 라우팅 키와 일치하는 패턴을 사용해 메시지를 라우팅합니다.
- Fanout (팬아웃 교환기) - 라우팅 키를 무시하고 연결된 모든 Queue 에 메시지를 브로드캐스트합니다.
- 교환기로 메시지가 게시됩니다. 메시지를 하나 이상의 Queue 로 라우팅합니다.
- Binding (바인딩)
- Queue 와 Exchange 간의 연결입니다.
AMQP 는 위와 같은 세 가지 필수 컴퍼넌트로 구성되어있습니다.
이를 그림으로 간단히 표현하면 이런 구조입니다. ZeroMQ 또는 Redis 의 pub/sub 에 비해선 비교적 복잡한 구조를 갖고 있으며,
대신 더 풍부한 기능과 신뢰성을 얻을 수 있습니다.
아무래도 근래에 AMQP 를 가장 쉽게 사용할 수 있는 메시지 브로커는 RabbitMQ 일겁니다.
ActiveMQ 나 Kafka 도 많이 들어보긴 했습니다만 주위에서 많이 써서 그런지 RabbitMQ 가 저한테는 훨씬 친숙하긴 하네요.
Java 와 Erlang 으로 만들어져있는 RabbitMQ 는 사용하기도 쉽고, 상당히 다양한 언어에 라이브러리를 제공하고 있습니다.
간단하게 로컬에서 RabbitMQ 를 설치해 AMQP 를 한번 사용해보도록 하겠습니다.
brew install rabbitmq
mac 에선 brew 를 이용해 RabbitMQ 를 쉽게 설치할 수 있습니다.
설치 후 백그라운드로 돌리려면 brew services start rabbitmq, 포그라운드로 돌리려면 rabbitmq-server 를 커맨드 창에 입력하면 됩니다. RabbitMQ 서버를 로컬에서 실행시켰으니 이제 typescript 로 간단하게 샘플을 만들어 보겠습니다.
npm install amqplib
npm install @types/amqplib --save-dev
지원하는 라이브러리인 amqplib 를 설치했습니다.
이제 이 라이브러리를 사용해 서버와 클라이언트 코드를 각각 작성해보겠습니다.
// amqp-server.ts
import * as amqp from 'amqplib';
(async () => {
const connection = await amqp.connect('amqp://guest:guest@localhost:5672');
const channel = await connection.createChannel();
await channel.assertExchange('test_exchange', 'fanout');
const { queue } = await channel.assertQueue('test_queue');
await channel.bindQueue(queue, 'test_exchange', '');
channel.consume(queue, msg => {
const content = msg?.content.toString();
console.info('message : ' + content);
});
})();
너무 간단하긴 합니다만.. 우선 Rabbitmq 는 기본적으로 5672 포트를 사용하고, 처음 설치하면 guest 계정을 사용할 수 있습니다.
먼저 서버측 코드를 보자면 test_exchange 라는 Exchange 를 fanout 타입으로 만듭니다. (assert 를 사용한 것은 엄밀히 말하면 생성하는 건 아니지만, assert 를 사용해 브로커에 해당 Exchange 가 있는지를 확인하고 없으면 브로커가 생성해줍니다)
그다음 test_queue 라는 이름의 Queue 를 만들고 해당 Queue 를 Exchange 에 바인딩합니다.
그리고 해당 대기열로 오는 메시지를 수신 (consume) 하게 되는데 지금은 수신하면 그냥 console 로 찍고 끝입니다.
// amqp-client.ts
import * as amqp from 'amqplib';
(async () => {
const connection = await amqp.connect('amqp://guest:guest@localhost:5672');
const channel = await connection.createConfirmChannel();
const data = 'test-message';
channel.publish('test_exchange', '', Buffer.from(data, 'utf-8'), undefined, (err, ok) => {
if (err) console.error(err);
else console.info(ok);
});
})();
클라이언트 측 코드는 더 단순하게 게시하는 코드가 전부입니다. 위에 그림에서 구조를 설명했던 것처럼 publisher 는 exchange 로 메시지를 보내기 때문에, 여기선 publish 를 할 때 text_exchange 라고 서버에서 만든것과 동일한 Exchange 에 메시지를 보내고 있습니다.
클라이언트 측에서 test_exchange 라는 이름이 아닌 다른 이름을 사용하면 존재하지 않는 Exchange 로 404 에러가 발생합니다.
이렇게 작성하고 서버, 클라이언트 코드를 순차적으로 실행해주면 클라이언트에서 publish 를 한 메세지를 서버에서 consume 해서 메시지가 찍히는 것을 확인할 수 있습니다. 다만 이렇게만 작성하면 서버에서 아래와 같은 에러가 발생합니다.
이는 위에서 설명했던 내용과 관련이 있는데 RabbitMQ 는 QoS1 을 구현합니다. 따라서 메시지를 받았을 땐, 반드시 ACK 를 보내서
정상적으로 수신했음을 알려야 하는데 이 부분이 코드에 빠져있으니 위와 같은 에러가 발생하는 것입니다.
실제로 이런 에러가 발생한 후 RabbitMQ 대시보드(http://localhost:15762)에서 Queue 를 확인해보면, 보냈던 test-message 가
여전히 Queue 에 남아있음을 확인할 수 있습니다. 에러 해결은 channel.ack(msg) 메서드를 호출해 메시지 수신을 알리면 됩니다.
이런식으로 메시지 브로커의 Exchange 와 Queue 를 사용하는 것은 너무 일반적인 사용법이지만, 약간 다른 패턴을 하나 더 보겠습니다.
우리가 보통 여러 워커에 작업을 분산시키기 위해선, 똑같은 작업을 여러 워커가 동시에 받는 것은 원치 않기 때문에 이런 경우엔 pub/sub 은 적절하지 않습니다. 이런때엔 로드 밸런싱과 비슷한 메시지 배포 패턴이 필요한데, 메시징 시스템에서 이런 패턴은 용어로 경쟁 소비자
또는 팬아웃 배포, 환풍기 패턴 등으로 알려져 있습니다.
위 그림과 같은 구조라고 생각하시면 됩니다. 저렇게 화살표가 있다고 해서 절대 같은 작업을 동시에 받는 것은 아니구요.
메시지를 여러 작업자들에 배포하고 (Fanout) 처리한다음, 그 결과들을 단일 노드로 취합하는 (fanin) 방식은 일반적이며
Fanout 에 해당하는 부분만 RabbitMQ 를 사용해 샘플로 보도록 하겠습니다.
그냥 위에 예시랑 똑같이 짜면 되는거 아니냐? 라고 하실 수 있는데 그렇게 해도 상관은 없습니다만, 뭐 굳이 한번 더 적자면 우리는
AMQP 를 사용할 때 메시지를 최종 목적지에 직접 전달하는 것이 아니라 메시지 브로커에 전달합니다. 메시지 브로커가 Exchange, Binding, Queue 에 정의된 규칙에 따라 메시지를 최종 목적지까지 라우팅해주죠.
이런 구조에서 위와 같은 작업 분배를 하려면 메시지가 한 소비자에게만 수신된다는 보장이 되어야 하는데, Exchange 는 하나 이상의 Queue 에 바인딩 될 수 있어 이를 보장할 수 없습니다. 이러한 경우 우리는 해결책으로 Exchange 를 우회해 Queue 에 직접 메시지를
전송해야 합니다. 코드로 자세히 보도록 하겠습니다.
// amqp-producer.ts
import * as amqp from 'amqplib';
const produce = (channel: amqp.Channel) => {
const randomNumber = Math.random() * 100;
const msg = { number: randomNumber };
console.info('produce - ', randomNumber);
channel.sendToQueue('random_queue', Buffer.from(JSON.stringify(msg)));
};
(async () => {
const connection = await amqp.connect('amqp://guest:guest@localhost:5672');
const channel = await connection.createChannel();
setInterval(() => {
produce(channel);
}, 1000);
})();
random_queue 라는 큐에 직접 메시지를 꽂아넣는 코드입니다. 1초마다 랜덤 넘버를 생성해 queue 에 보내고 있습니다.
// amqp-consumer.ts
import * as amqp from 'amqplib';
const consume = (channel: amqp.Channel, queue: string) => {
channel.consume(queue, (msg => {
const data = msg?.content.toString();
if (data) {
const k = JSON.parse(data);
console.info(process.pid, k);
}
channel.ack(msg!);
}));
};
(async () => {
const connection = await amqp.connect('amqp://guest:guest@localhost:5672');
const channel = await connection.createChannel();
const { queue } = await channel.assertQueue('random_queue');
consume(channel, queue);
})();
이 부분은 이전 예시와 크게 다르지 않습니다. random_queue 에서 메시지를 소비 (consume) 하는 내용입니다.
producer 에서 생성한 메시지가 정확히 한 워커에서만 처리되는지를 확인을 돕기 위해 process.pid 를 같이 찍어봤습니다.
> ts-node amqp-consumer.ts
> ts-node amqp-consumer.ts
> ts-node amqp-producer.ts
위와 같이 consumer 를 두개 실행시켜놓고, producer 를 실행합니다. 그럼 숫자는 랜덤 생성이니 다르겠지만, 대략 아래와 같이
처리되는 것을 확인할 수 있습니다.
producer 가 생성한 메시지들은 정확히 하나의 consumer 들에서 처리되는 것을 볼 수 있습니다.
이런 패턴을 AMQP 에서 경쟁 소비자 패턴이라고 합니다. 작은 규모로 특정 메시지 브로커를 사용하거나 잘 관리된다면 일반적인 방법의 Exchange 를 사용하더라도 이런 처리에 문제가 없겠지만, 다른 Queue 에서도 소비될 수 있는 잠재적인 위험을 생각한다면
직접 Queue 에 메시지를 보내는 이런 패턴도 분산 처리에선 사용해볼 수 있을 것 같습니다.