Backend/Node.js

Use AMQP (w/ RabbitMQ)

모피어스 2023. 2. 11. 23:58

계속해서 메시징 관련해 이번 포스팅에선 AMQP 에 대해 좀 보려고 합니다.

 

ZeroMQ 나 Redis 의 pub/sub 등의 메시징 시스템을 간단하게 사용하는 것으로도 충분히 그 장점을 느낄 수 있지만,

이러한 시스템들은 단순한만큼 신뢰성은 약간 떨어진다고 할 수 있습니다. 물론 그런 상황이니까 사용하는 것도 있지만요.

 

메시징 시스템에선 신뢰성을 설명할 때 QoS(Quality of Service) 의 레벨을 들곤 합니다. 그 레벨은 아래와 같습니다.

 

  • QoS0
    • 최대 한 번. 가장 간단하며 메시지가 지속되지도 않고 전달되는 것을 확인하지도 않습니다. 따라서 이 경우엔 메시지가 손실될 수 있습니다.
  • QoS1
    • 최소 한 번. 메시지가 적어도 한번은 수신되도록 보장합니다. 메시지를 받았을 경우 클라이언트는 무조건 ACK 를 보내야 하지만 이걸 보내기 전에 메시지가 중복으로 올 수 있습니다. 이 부분은 개발자가 잘 처리해야 합니다.
  • QoS2
    • 정확히 한 번. 가장 흔하며 가장 안정적인 QoS 입니다. 메시지가 한번만 수신되는 것을 보장하며 대신 좀 느릴 수 있습니다.

위에서 얘기하긴 했지만 비교적 간단히 사용할 수 있는 Redis 의 pub/sub 같은 경우엔 QoS0 을 구현합니다.

다른 QoS 를 구현하기 위해선 pub/sub 에 의존하지 않고 Redis 의 다른 명령어들을 사용해 구현할 순 있지만 굳이? 라는 느낌은 있네요.

 

안정적인 메시지 대기열을 사용하기 위해 AMQP 를 살펴보려고 합니다.

일반적으로 다루는 비즈니스 상 메시지를 분실하면 안되는 금융, 은행 시스템이나 꼭 이런 시스템이 아니더라도 근래엔 분산 어플리케이션 구조에서 메시지를 분실해선 안되는 케이스가 많은데, 이런 구조에선 메시지 전달의 안전이 입증된 프로토콜을 사용합니다.

AMQP 는 많은 메시지 대기열 시스템에서 지원하는 개방형 표준 프로토콜이며 라우팅, 필터링, 대기열 처리등의 모델들을 제공합니다.

 

  • Queue (대기열)
    • 클라이언트가 사용하는 메시지를 저장하는 데이터 구조입니다. 대기열의 메시지들은 하나 이상의 사용자에게 푸시되며, 여러 사용자가 대기열에 붙어있는 경우 메시지는 로드밸런싱 됩니다. 3가지 종류의 Queue 가 있습니다.
      • Durable (영구적 큐) - 브로커가 다시 시작되면 대기열이 자동을 다시 만들어지며, 모든 컨텐츠가 보존된다는 것은 아닙니다. persistent 로 표시된 메시지만 디스크에 저장되고 이런 메시지만 재시작시 복원됩니다.
      • Exclusive (독점적 큐) - 하나의 특정 구독자 연결에만 바인딩되며, 연결이 닫히면 Queue 도 소멸됩니다.
      • Auto-delete (자동 삭제 큐) - 마지막 구독자의 연결이 끊어지면 Queue 도 삭제됩니다.
  • Exchange (교환기)
    • 교환기로 메시지가 게시됩니다. 메시지를 하나 이상의 Queue 로 라우팅합니다.
      • Direct (직접 교환기) - 라우팅 키를 일치시켜 메시지를 라우팅합니다.
      • Topic (토픽 교환기) - 라우팅 키와 일치하는 패턴을 사용해 메시지를 라우팅합니다.
      • Fanout (팬아웃 교환기) - 라우팅 키를 무시하고 연결된 모든 Queue 에 메시지를 브로드캐스트합니다.
  • Binding (바인딩)
    • Queue 와 Exchange 간의 연결입니다.

AMQP 는 위와 같은 세 가지 필수 컴퍼넌트로 구성되어있습니다.

 

 

이를 그림으로 간단히 표현하면 이런 구조입니다. ZeroMQ 또는 Redis 의 pub/sub 에 비해선 비교적 복잡한 구조를 갖고 있으며,

대신 더 풍부한 기능과 신뢰성을 얻을 수 있습니다.

 

아무래도 근래에 AMQP 를 가장 쉽게 사용할 수 있는 메시지 브로커는 RabbitMQ 일겁니다.

ActiveMQ 나 Kafka 도 많이 들어보긴 했습니다만 주위에서 많이 써서 그런지 RabbitMQ 가 저한테는 훨씬 친숙하긴 하네요.

Java 와 Erlang 으로 만들어져있는 RabbitMQ 는 사용하기도 쉽고, 상당히 다양한 언어에 라이브러리를 제공하고 있습니다.

간단하게 로컬에서 RabbitMQ 를 설치해 AMQP 를 한번 사용해보도록 하겠습니다.

 

brew install rabbitmq

 

mac 에선 brew 를 이용해 RabbitMQ 를 쉽게 설치할 수 있습니다.

설치 후 백그라운드로 돌리려면 brew services start rabbitmq, 포그라운드로 돌리려면 rabbitmq-server 를 커맨드 창에 입력하면 됩니다. RabbitMQ 서버를 로컬에서 실행시켰으니 이제 typescript 로 간단하게 샘플을 만들어 보겠습니다.

 

npm install amqplib
npm install @types/amqplib --save-dev

 

지원하는 라이브러리인 amqplib 를 설치했습니다.

이제 이 라이브러리를 사용해 서버와 클라이언트 코드를 각각 작성해보겠습니다.

 

// amqp-server.ts
import * as amqp from 'amqplib';

(async () => {
  const connection = await amqp.connect('amqp://guest:guest@localhost:5672');
  const channel = await connection.createChannel();
  await channel.assertExchange('test_exchange', 'fanout');
  const { queue } = await channel.assertQueue('test_queue');
  await channel.bindQueue(queue, 'test_exchange', '');

  channel.consume(queue, msg => {
    const content = msg?.content.toString();
    console.info('message : ' + content);
  });
})();

 

너무 간단하긴 합니다만.. 우선 Rabbitmq 는 기본적으로 5672 포트를 사용하고, 처음 설치하면 guest 계정을 사용할 수 있습니다.

먼저 서버측 코드를 보자면 test_exchange 라는 Exchange 를 fanout 타입으로 만듭니다. (assert 를 사용한 것은 엄밀히 말하면 생성하는 건 아니지만, assert 를 사용해 브로커에 해당 Exchange 가 있는지를 확인하고 없으면 브로커가 생성해줍니다)

 

그다음 test_queue 라는 이름의 Queue 를 만들고 해당 Queue 를 Exchange 에 바인딩합니다.

그리고 해당 대기열로 오는 메시지를 수신 (consume) 하게 되는데 지금은 수신하면 그냥 console 로 찍고 끝입니다.

 

// amqp-client.ts
import * as amqp from 'amqplib';

(async () => {
  const connection = await amqp.connect('amqp://guest:guest@localhost:5672');
  const channel = await connection.createConfirmChannel();

  const data = 'test-message';

  channel.publish('test_exchange', '', Buffer.from(data, 'utf-8'), undefined, (err, ok) => {
    if (err) console.error(err);
    else console.info(ok);
  });
})();

 

클라이언트 측 코드는 더 단순하게 게시하는 코드가 전부입니다. 위에 그림에서 구조를 설명했던 것처럼 publisher 는 exchange 로 메시지를 보내기 때문에, 여기선 publish 를 할 때 text_exchange 라고 서버에서 만든것과 동일한 Exchange 에 메시지를 보내고 있습니다.

클라이언트 측에서 test_exchange 라는 이름이 아닌 다른 이름을 사용하면 존재하지 않는 Exchange 로 404 에러가 발생합니다.

 

이렇게 작성하고 서버, 클라이언트 코드를 순차적으로 실행해주면 클라이언트에서 publish 를 한 메세지를 서버에서 consume 해서 메시지가 찍히는 것을 확인할 수 있습니다. 다만 이렇게만 작성하면 서버에서 아래와 같은 에러가 발생합니다.

 

 

이는 위에서 설명했던 내용과 관련이 있는데 RabbitMQ 는 QoS1 을 구현합니다. 따라서 메시지를 받았을 땐, 반드시 ACK 를 보내서

정상적으로 수신했음을 알려야 하는데 이 부분이 코드에 빠져있으니 위와 같은 에러가 발생하는 것입니다.

실제로 이런 에러가 발생한 후 RabbitMQ 대시보드(http://localhost:15762)에서 Queue 를 확인해보면, 보냈던 test-message 가 

여전히 Queue 에 남아있음을 확인할 수 있습니다. 에러 해결은 channel.ack(msg) 메서드를 호출해 메시지 수신을 알리면 됩니다.

 

이런식으로 메시지 브로커의 Exchange 와 Queue 를 사용하는 것은 너무 일반적인 사용법이지만, 약간 다른 패턴을 하나 더 보겠습니다.

 

우리가 보통 여러 워커에 작업을 분산시키기 위해선, 똑같은 작업을 여러 워커가 동시에 받는 것은 원치 않기 때문에 이런 경우엔 pub/sub 은 적절하지 않습니다. 이런때엔 로드 밸런싱과 비슷한 메시지 배포 패턴이 필요한데, 메시징 시스템에서 이런 패턴은 용어로 경쟁 소비자
또는 팬아웃 배포, 환풍기 패턴 등으로 알려져 있습니다.

 

 

위 그림과 같은 구조라고 생각하시면 됩니다. 저렇게 화살표가 있다고 해서 절대 같은 작업을 동시에 받는 것은 아니구요.

메시지를 여러 작업자들에 배포하고 (Fanout) 처리한다음, 그 결과들을 단일 노드로 취합하는 (fanin) 방식은 일반적이며

Fanout 에 해당하는 부분만 RabbitMQ 를 사용해 샘플로 보도록 하겠습니다.

 

그냥 위에 예시랑 똑같이 짜면 되는거 아니냐? 라고 하실 수 있는데 그렇게 해도 상관은 없습니다만, 뭐 굳이 한번 더 적자면 우리는
AMQP 를 사용할 때 메시지를 최종 목적지에 직접 전달하는 것이 아니라 메시지 브로커에 전달합니다. 메시지 브로커가 Exchange, Binding, Queue 에 정의된 규칙에 따라 메시지를 최종 목적지까지 라우팅해주죠.

 

이런 구조에서 위와 같은 작업 분배를 하려면 메시지가 한 소비자에게만 수신된다는 보장이 되어야 하는데, Exchange 는 하나 이상의 Queue 에 바인딩 될 수 있어 이를 보장할 수 없습니다. 이러한 경우 우리는 해결책으로 Exchange 를 우회해 Queue 에 직접 메시지를
전송해야 합니다. 코드로 자세히 보도록 하겠습니다.

 

// amqp-producer.ts
import * as amqp from 'amqplib';

const produce = (channel: amqp.Channel) => {
  const randomNumber = Math.random() * 100;
  const msg = { number: randomNumber };
  console.info('produce - ', randomNumber);
  channel.sendToQueue('random_queue', Buffer.from(JSON.stringify(msg)));
};

(async () => {
  const connection = await amqp.connect('amqp://guest:guest@localhost:5672');
  const channel = await connection.createChannel();

  setInterval(() => {
    produce(channel);
  }, 1000);
})();

 

random_queue 라는 큐에 직접 메시지를 꽂아넣는 코드입니다. 1초마다 랜덤 넘버를 생성해 queue 에 보내고 있습니다.

 

// amqp-consumer.ts
import * as amqp from 'amqplib';

const consume = (channel: amqp.Channel, queue: string) => {
  channel.consume(queue, (msg => {
    const data = msg?.content.toString();
    if (data) {
      const k = JSON.parse(data);
      console.info(process.pid, k);
    }
    channel.ack(msg!);
  }));
};

(async () => {
  const connection = await amqp.connect('amqp://guest:guest@localhost:5672');
  const channel = await connection.createChannel();
  const { queue } = await channel.assertQueue('random_queue');

  consume(channel, queue);
})();

 

이 부분은 이전 예시와 크게 다르지 않습니다. random_queue 에서 메시지를 소비 (consume) 하는 내용입니다.

producer 에서 생성한 메시지가 정확히 한 워커에서만 처리되는지를 확인을 돕기 위해 process.pid 를 같이 찍어봤습니다.

 

> ts-node amqp-consumer.ts
> ts-node amqp-consumer.ts
> ts-node amqp-producer.ts

 

위와 같이 consumer 를 두개 실행시켜놓고, producer 를 실행합니다. 그럼 숫자는 랜덤 생성이니 다르겠지만, 대략 아래와 같이 
처리되는 것을 확인할 수 있습니다.

 

 

producer 가 생성한 메시지들은 정확히 하나의 consumer 들에서 처리되는 것을 볼 수 있습니다.

이런 패턴을 AMQP 에서 경쟁 소비자 패턴이라고 합니다. 작은 규모로 특정 메시지 브로커를 사용하거나 잘 관리된다면 일반적인 방법의 Exchange 를 사용하더라도 이런 처리에 문제가 없겠지만, 다른 Queue 에서도 소비될 수 있는 잠재적인 위험을 생각한다면

직접 Queue 에 메시지를 보내는 이런 패턴도 분산 처리에선 사용해볼 수 있을 것 같습니다.