이전 포스팅에서 ZeroMQ 을 간략히 보면서, 그 중에서도 pub/sub 패턴을 사용해 간단한 단방향 메시징에 대해 알아봤습니다.
ZeroMQ 는 pub/sub 패턴 외에도 push/pull 과 request/reply 등 일반적인 메시징 패턴도 지원하는데 이번 포스팅에선
이 패턴들에 대해 살펴보고자 합니다.
- PUSH/PULL
이 패턴은 파이프라인 패턴으로도 불리며, AMQP 포스팅에서 이미 적긴 했지만 환풍기 또는 경쟁 소비자 패턴과 유사합니다.
파이프라인 이라는 처리 형식은 Node.JS 개발자라면 더 익숙할 것 같습니다.
이름 그대로 PUSH 소켓은 메시지를 전송하기 위한 소켓이고, PULL 소켓은 수신용 소켓이며, PUSH 에서 PULL 방향으로
단방향 통신이 이루어집니다. 이는 request/reply 패턴과 차이점이며, 그럼 pub/sub 하고는 뭔 차이야? 하실 수 있는데
pub/sub 의 경우 publish 한 메시지를 모든 subscribe 소켓이 수신하지만 push/pull 패턴에서는 push 한 메시지를
모든 pull 소켓들에 순차적으로 전송하게 됩니다.
- 하나의 PUSH 소켓에 여러 PULL 소켓이 연결된 경우엔, 메시지가 로드밸런싱 되듯이 균등하게 분배됩니다. 반면 여러 PUSH 소켓에서 메시지를 받는 하나의 PULL 소켓은 내부에서 대기열 시스템을 사용해 메시지를 처리하게 됩니다.
- 연결된 PULL 소켓이 없는 경우 PUSH 된 메시지는 없어지지 않고 대기열에 남아있다가 PULL 소켓이 연결되면 메시지가 처리됩니다. 이는 어찌보면 장점이자 단점일 수 있을 것 같습니다.
- PUSH 소켓과 PULL 소켓 둘 다 connect 또는 bind 모드로 동작할 수 있습니다. 저도 예시를 작성하면서 처음 알게됐는데, 묘하게 차이점이 있네요. PUSH 에서 PULL 로 단방향은 동일하지만, 차이점은 연결을 초기화 하는 부분입니다.
bind 모드는 영구적인 노드에 적합하고 connect 모드는 임시 노드에 적합합니다. - 이 패턴으로 데이터를 분산 처리하는 구조에선 3가지 노드를 구현하고 연결하면 됩니다.
- ventilator (변형 생성자) : 처리할 데이터 생성
- worker (작업자) : 데이터 처리 후 결과값 생성
- sink (수집자) : worker 에 의해 생성된 결과값 처리
이런 구조의 노드를 만들어두면 worker 만 늘려서 유동적인 분산 처리 작업이 가능해집니다.
하지만 이러면 Node.JS 에서 워커 프로세스를 늘려서 분산 처리하는거랑 크게 뭔 차이가 있는거냐? 싶긴 한데요.
ZeroMQ 에서 항상 소켓이라는 단일 API 로 코드 작성이 가능해지고, worker 를 어떤 장비나 어떤 인스턴스에 있던 붙이는게 쉬워집니다.
프로그램 구조가 동일하면서 단순하다는 것은 관리면에서 상당한 강점이죠.
// zmq-ventilator.ts
import * as zmq from 'zeromq';
const pushSocket = zmq.socket('push');
pushSocket.bindSync('tcp://*:5000');
setInterval(() => {
const randomNumber = Math.random() * 100;
console.info('push - ', randomNumber);
const msg = { number: randomNumber };
pushSocket.send(JSON.stringify(msg));
}, 1000);
변형 생성자 부분의 코드입니다. 5000 포트에 바인딩하며, 1초마다 랜덤 숫자를 생성해 해당 메시지를 push 합니다.
다음의 코드에서 bind 가 connect 에선 어떻게 달라지는지 볼 수 있습니다.
// zmq-worker.ts
import * as zmq from 'zeromq';
const pullSocket = zmq.socket('pull');
const pushSocket = zmq.socket('push');
pullSocket.connect('tcp://localhost:5000');
pushSocket.connect('tcp://localhost:5017');
pullSocket.on('message', (message) => {
const data = JSON.parse(message.toString());
console.info(process.pid, data);
pushSocket.send(data.number);
});
작업자 코드입니다. PUSH 소켓에서 메시지를 수신하기 위해 PULL 소켓을 생성하고, 메시지가 들어오면 pid 와 함께 stdout 한번 찍고
다른 포트로 생성한 PUSH 소켓에 그대로 push 합니다.
위와 다르게 여기선 bind 하지 않고 connect 를 하고있는데, bind 와 다르게 들어가는 주소값에도 약간 차이가 있지만 진짜 차이는
이후 코드를 실행할 때 말씀드리겠습니다.
// zmq-sink.ts
import * as zmq from 'zeromq';
const pullSocket = zmq.socket('pull');
pullSocket.bindSync('tcp://*:5017');
pullSocket.on('message', (message) => {
console.info(process.pid, message.toString());
});
worker 로부터 작업된 결과를 수집하는 수집자 코드입니다. 여기선 다시 bind 를 하고 있는데, 이 노드는 영구 노드이기 때문입니다.
그럼 한번 sink > worker > ventilator 순서로 코드를 실행해보겠습니다.
> ts-node zmq-sink.ts
> ts-node zmq-worker.ts
> ts-node zmq-ventilator.ts
보기에 썩 깔끔한 캡쳐는 아닙니다만.. 첫번째가 ventilator / 세번째가 worker / 네번째가 sink 출력 결과입니다.
ventilator -> worker -> sink 순서로 작업이 잘 처리되고 있습니다.
하나의 worker 가 돌아가고 있는 상황에서 worker 를 하나 더 실행시켰습니다(두번째 공간)
캡쳐로는 조금 알아보기가 어려울 수 있지만.. 하나의 worker 에서 처리되던 메시지가 로드밸런싱 되어 처리되는 것을 확인할 수 있습니다.
worker 와 같은 이런 임시 노드는 이런식으로 자유롭게 증감이 가능합니다. 여기서 왜 connect 모드를 사용해야하냐면, bind 모드를
사용할 경우 해당 포트 주소가 그냥 묶여버리기 때문에 worker 를 하나에서 더 늘릴수가 없습니다. (Address already use ERROR)
push/pull 패턴에서 하나만 더 보고 가겠습니다. 그냥 확인에 가까운 내용인데..
위에서 PUSH 된 메시지는 PULL 소켓이 없을 경우엔 계속 남아있다가 PULL 소켓이 연결되는 순간 한번에 메시지가 처리된다고 한
내용을 확인해봤습니다.
위에 작성했던 코드를 worker 만 살짝 바꾼 후 ventilator 를 먼저 실행하고 몇 초 있다가 worker 를 실행한 결과물입니다.
worker 에서 찍힌 datetime 을 보면 33초에 메시지가 한번에 찍히고 34, 35... 이렇게 넘어가는걸 볼 수 있습니다.
PUSH 된 메시지를 잃어버리지 않는건 장점 같기도 합니다만, worker 에 어떠한 문제가 있어 어떠한 PULL 소켓도 붙을 수 없는 경우
계속 메시지가 쌓이기만 하면 PUSH 프로세스 쪽에 문제가 생기거나 PULL 연결이 재개될 때 너무 대량의 메시지가 한번에 처리되면서
worker 에 문제가 생길 수 있지 않나.. 하는 생각이 드네요.
- REQUEST/REPLY
request/reply 패턴은 전형적인 클라이언트 - 서버 패턴이라고 볼 수 있습니다. 우리가 흔히 운영하는 웹서버도 이런 패턴입니다만..
ZeroMQ 를 비롯한 단방향 메시징 시스템에서 누릴 수 있는 이점은 상당히 많지만, 간혹 양방향처럼 동작하는 무언가가 필요할 때
이 패턴을 사용할 수 있습니다. ZeroMQ 또한 마찬가지로 이런 기능을 하는 소켓을 제공합니다.
// zmq-request.ts
import * as zmq from 'zeromq';
const requestSocket = zmq.socket('req');
requestSocket.bindSync('tcp://*:5000');
setInterval(() => {
const randomNumber = Math.random() * 100;
console.info('request -', randomNumber);
const msg = { number: randomNumber };
requestSocket.send(JSON.stringify(msg));
}, 1000);
requestSocket.on('message', msg => {
console.info('request', msg.toString());
});
push/pull 패턴에서 본 것과 비슷한 예시 코드입니다. 1초마다 랜덤 숫자를 생성해 REQ 소켓으로 메시지를 보내고, 반대로 패턴 이름처럼
그에 대한 응답을 받을 수 있게 message 이벤트에 대한 핸들러를 붙여둔 상황입니다.
// zmq-reply.ts
import * as zmq from 'zeromq';
const replySocket = zmq.socket('rep');
replySocket.connect('tcp://localhost:5000');
replySocket.on('message', msg => {
const data = JSON.parse(msg.toString());
console.info('reply', data);
replySocket.send(data.number);
});
REPLY 소켓에선 메시지를 수신하고, 받아서 그대로 다시 send 합니다. 이제 이 코드를 reply > request 순서로 실행해보겠습니다.
이미 위에서 많이 보긴 했지만 reply 프로세스는 두개를 켜보겠습니다.
> ts-node zmq-reply.ts
> ts-node zmq-reply.ts
> ts-node zmq-request.ts
다른 패턴에서 볼 수 있듯이 ZeroMQ 는 request/reply 패턴에서도 손쉽게 reply 소켓을 늘릴 수 있습니다.
REP 소켓에서 수신한 메시지를 그대로 send 했을 때 다시 REQ 소켓이 그 응답을 받을 수 있는것도 확인할 수 있습니다.
사실 그럼 ZeroMQ 를.. 메시징 시스템을 사용할 때 어떤 환경에서 request/reply 패턴이 필요할지 실용적인 예시를 뭔가 들고
싶었습니다만 딱히 생각나는게 없네요..
말로 떼우자면, 이 패턴을 사용함으로써 우리는 일단 원하는 방향으로 메시지를 이동시킬 수 있습니다.
그리고 가장 두드러지는 것이 다른 패턴에 비해 가질 수 있는 신뢰성인데, 신뢰성은 비단 메시지를 전송하는데 실패하는 것만이 아니라
프로세스가 망가지는 상황에서의 핸들링도 의미합니다.
pub/sub 패턴의 경우 subscriber 가 죽는다면, publisher 는 그걸 알 수 없습니다. 어떤 정보도 sub 에서 pub 으로 전송되지 않으니까요. 구독하고 있는 연결수 정도를 제공하는 건 어디서 본 것 같기도 합니다만 이걸 갖고는 뭘 할수가 없겠죠. 그러기 위한 패턴도 아니고..
또 다른 패턴으로 위에서 본 push/pull 패턴에선 worker 가 죽는 경우 ventilator 는 이를 알 수가 없고, 또는 반대로 ventilator 나 sink 가 죽는 경우 worker 도 이를 알 수 없습니다. 파이프라인 패턴에선 한 방향으로 흘러가니 당연하긴 합니다만, 메시징 시스템을 사용할 때
이런 문제를 해결하고 싶은 케이스가 있는거죠.
위와 같은 문제를 해결하고자 할 때 request/reply 패턴을 사용할 수 있습니다. request 가 죽는다면, 여기에 대한 응답이 오지 않으므로
이를 알 수 있습니다. 적절한 예시를 들지 못한것이 아쉽습니다만, 아주 쉽게 이런 패턴들을 사용할 수 있는 ZeroMQ 를 사내 시스템 중
어디에 붙이면 기존 솔루션보다 훨씬 정답이 될 수 있을지 좀 찾아봐야 할 것 같다는 생각이 듭니다.
'Backend > Node.js' 카테고리의 다른 글
CLI program (0) | 2023.07.23 |
---|---|
Use AMQP (w/ RabbitMQ) (0) | 2023.02.11 |
Use ZeroMQ (0) | 2023.02.05 |
Messaging System (0) | 2023.01.28 |
MSA (0) | 2023.01.14 |