저번 포스팅에서 @nestjs/cqrs 에 관한 내용 중 이벤트 기반 프로그래밍에 대한 부분을 이번 포스팅에서 간략히 보도록 하겠습니다.
- EventBus
수동으로 이벤트를 발송하고, 그 이벤트를 구독해 적합한 처리를 하는 것은 @nestjs/cqrs 에서 제공하는 EventBus 를 이용하면
간단하게 구현할 수 있습니다. 여기서 EventBus 를 사용해볼 부분은 광고를 생성한 후 생성 결과 내용을 Slack 으로 알리는 부분입니다.
@CommandHandler(CreateAdCommand)
export class CreateAdHandler implements ICommandHandler<CreateAdCommand> {
...
async execute(command: CreateAdCommand): Promise<any> {
...
await this.slackService.send(adType, title, country);
}
}
현재 코드는 CreateAdHandler 에서 SlackService 를 주입받아 send 메서드를 execute 내에서 직접 호출하고 있는데요.
물론 이게 현재 프로덕션에서 동작하는 코드는 아니긴 하지만, 광고가 생성된 후 해당 결과를 slack 으로 보내는 것은 정해진 하나의
업무 규칙이라고 할 수 있습니다. 하지만 구조적으로 광고를 생성 처리와 slack 전송은 강하게 결합되어 있습니다.
그나마 생성자 주입을 하고 있긴 하지만 slack 외의 다른 수단으로 변경한다거나, 더 추가가 되어야 한다고 할 때 필연적으로
이 부분까지 수정을 할 수 밖에 없죠. 생성자 주입으로 받는 구현체의 타입을 추상화한다고 해도 말입니다.
이는 별개로 다룰 수 있어야되고, 해당 부분에서 광고가 생성이 완료되었다는 이벤트만 전파하고 필요한 부분에서 이 이벤트를 구독해
적합한 처리를 하는식으로 변경한다면 CreateAdHandler 에서는 이벤트만 발행할 뿐 더 신경쓸 무언가는 없습니다.
덤으로 그 외의 로직들은 비동기 처리까지 할 수 있으니 성능향상에도 도움이 될 거구요.
import { IEvent } from '@nestjs/cqrs';
import { CountryCode } from '../../types/country';
export class AdCreatedEvent implements IEvent {
constructor(
readonly adType: string,
readonly title: string,
readonly country: CountryCode,
) {}
}
방식은 Command, Query 를 추가할 때와 크게 다르지 않습니다. 우선 src/ads 하위에 event 디렉토리를 생성하고 ad-created.event.ts 파일을 위와 같이 작성합니다. @nestjs/cqrs 의 IEvent 인터페이스를 구현하는 형태고, 이렇게 해야 EventHandler 에서 해당 이벤트를 받을 수 있습니다. 어쨌거나 이 이벤트는 광고가 생성되었다 를 알리는 이벤트인겁니다.
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { SlackService } from '../../slack/slack.service';
import { AdCreatedEvent } from './ad-created.event';
@EventsHandler(AdCreatedEvent)
export class AdEventsHandler implements IEventHandler<AdCreatedEvent> {
constructor(private slackService: SlackService) {}
async handle(event: AdCreatedEvent) {
const { adType, title, country } = event;
await this.slackService.send(adType, title, country);
}
}
동일한 위치에 ad-events.handler.ts 파일을 작성합니다. 광고와 관련된 어떠한 이벤트가 발생하는 경우엔 이 핸들러가 처리하는거구요.
EventsHandler 데코레이터를 사용해서 위에서 생성했던 AdCreatedEvent 이벤트를 받을 수 있는 클래스임을 나타내고, IEventHandler 인터페이스를 구현은 handle 메서드를 구현해주면 됩니다. 여기로 SlackService 를 옮겨왔습니다.
...
import { AdCreatedEvent } from '../event/ad-created.event';
@CommandHandler(CreateAdCommand)
export class CreateAdHandler implements ICommandHandler<CreateAdCommand> {
constructor(
...,
private readonly eventBus: EventBus,
) {}
async execute(command: CreateAdCommand): Promise<any> {
const { adType, title, country } = command;
await this.adsRepository.createUser(command);
this.eventBus.publish(new AdCreatedEvent(adType, title, country));
}
}
실제로 광고가 생성된 후 이벤트를 발행해야겠죠. CreateAdHandler 에선 기존에 SlackService 를 주입받는 대신 EventBus 를
주입받습니다. EventBus 또한 Injectable 한 컴퍼넌트입니다. 위에서 생성했던 AdCreatedEvent 를 EventBus 의 publish 메서드를
사용해 발행해줍니다.
...
import { AdEventsHandler } from './event/ad-events.handler';
@Module({
...,
providers: [
...,
AdEventsHandler,
],
})
export class AdsModule {}
AdsModule 에서 AdEventsHandler 를 provider 로 제공하면 됩니다. 이제 광고 생성 요청을 보내보면 광고 생성도 되고,
원래 동작했던 것처럼 slack 으로 광고 생성 완료 알림도 오는 것을 확인할 수 있습니다.
- Event Publish
...
@Injectable()
export class EventBus<EventBase extends IEvent = IEvent>
extends ObservableBus<EventBase>
implements IEventBus<EventBase>, OnModuleDestroy
{
...
constructor(
private readonly commandBus: CommandBus,
private readonly moduleRef: ModuleRef,
private readonly unhandledExceptionBus: UnhandledExceptionBus,
) {
super();
this.subscriptions = [];
...
}
...
/**
* Publishes an event.
* @param event The event to publish.
*/
publish<TEvent extends EventBase, TContext = unknown>(
event: TEvent,
context?: TContext,
) {
return this._publisher.publish(event, context);
}
register(handlers: EventHandlerType<EventBase>[] = []) {
handlers.forEach((handler) => this.registerHandler(handler));
}
protected registerHandler(handler: EventHandlerType<EventBase>) {
const instance = this.moduleRef.get(handler, { strict: false });
if (!instance) {
return;
}
const events = this.reflectEvents(handler);
events.map((event) =>
this.bind(
instance as IEventHandler<EventBase>,
defaultReflectEventId(event),
),
);
}
...
}
@nestjs/cqrs 의 EventBus 구현 부분 중 파악에 필요한 것만 일부 떼와봤습니다.
EventBus 를 사용할 때 이벤트 기반이고 publish 를 보는 순간 자연스레 저는 Redis 의 pub/sub 이 생각났는데요.
실제 구현부를 보면 EventBus 의 생성자에서 subscriptions 를 빈배열로 초기화하는 코드가 있네요. 이벤트를 구독하는 무언가의 리스트인 것 같습니다.
직접 호출하지는 않지만 register 를 보면 handler 를 배열로 받아 각 handler 를 인자로 해서 registerHandler 메서드를 호출하구요.
registerHandler 에선 event 와 handler 를 바인딩해주는 것처럼 보입니다.
어찌보면 저 bind 메서드가 핵심 같은데 제가 생략하긴 했습니다만.. bind 메서드에선 event 를 인자로 해서 handler 의 handle 메서드를 호출하도록 연결해주면서 그렇게 생성된 하나의 구독을 subscriptions 에 넣고 있습니다.
결국 대략적인 흐름은 Redis pub/sub 모델과 약간 빗대어 보자면 데코레이터이긴 하지만 EventsHandler 를 통해 특정 이벤트를
구독하고, EventBus 가 특정 이벤트를 발행하면 해당 이벤트를 구독하는 EventsHandler 의 handle 메서드가 호출된다 정도로 이해할 수 있을 것 같습니다. 인메모리에서 pub/sub 을 비슷하게 구현한 모델이다 정도로 볼 수 있을 것 같고, 자세한 구현 사항을 파악하고 싶다면 해당 소스 코드를 한번 파보시는 것도 좋을 것 같습니다.
이정도 내용만 보고 처음엔 이전에 사용했던 CommandBus 와 QueryBus 도 비슷하게 이벤트 발행 - 구독의 형태이지 않을까 생각했는데요. 크게보면 비슷하면서 세부적인 구현은 미묘한 차이를 보입니다.
...
@Injectable()
export class CommandBus<CommandBase extends ICommand = ICommand>
extends ObservableBus<CommandBase>
implements ICommandBus<CommandBase>
{
private handlers = new Map<string, ICommandHandler<CommandBase>>();
private _publisher: ICommandPublisher<CommandBase>;
constructor(private readonly moduleRef: ModuleRef) {
super();
this.useDefaultPublisher();
}
/**
* Executes a command.
* @param command The command to execute.
* @returns A promise that, when resolved, will contain the result returned by the command's handler.
*/
execute<T extends CommandBase, R = any>(command: T): Promise<R> {
const commandId = this.getCommandId(command);
const handler = this.handlers.get(commandId);
if (!handler) {
const commandName = this.getCommandName(command);
throw new CommandHandlerNotFoundException(commandName);
}
this._publisher.publish(command);
return handler.execute(command);
}
bind<T extends CommandBase>(handler: ICommandHandler<T>, id: string) {
this.handlers.set(id, handler);
}
register(handlers: CommandHandlerType[] = []) {
handlers.forEach((handler) => this.registerHandler(handler));
}
protected registerHandler(handler: CommandHandlerType) {
const instance = this.moduleRef.get(handler, { strict: false });
if (!instance) {
return;
}
const target = this.reflectCommandId(handler);
if (!target) {
throw new InvalidCommandHandlerException();
}
this.bind(instance as ICommandHandler<CommandBase>, target);
}
...
}
EventBus 에선 구독하는 컴퍼넌트 리스트를 들고 있었다면 CommandBus 의 경우엔 Map 으로 들고 있네요.
비슷하게 register 부분을 보면 각 Command 에 맞는 핸들러를 Map 에 등록하고, 이후 특정 Command 가 전달되었을 때 Map 에서
그 Command 에 등록되어있는 핸들러를 꺼내 execute 메서드를 호출해주는 식입니다.
QueryBus 도 CommandBus 와 거의 동일한 구성이구요. 역시나 자세한 내용을 원한다면 소스 코드를 보시는게 좋을 것 같습니다.
- Event Handling
굳이 적을만한건 아니지만 cqrs 에서 제공하는 publish 에 대해 그냥 이것저것 테스트를 해보려고 합니다.
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { SlackService } from '../../slack/slack.service';
import { AdCreatedEvent } from './ad-created.event';
@EventsHandler(AdCreatedEvent)
export class AdEventsHandler implements IEventHandler<AdCreatedEvent> {
constructor(private slackService: SlackService) {}
async handle(event: AdCreatedEvent) {
const { adType, title, country } = event;
console.info('handle slack service');
await this.slackService.send(adType, title, country);
}
}
@EventsHandler(AdCreatedEvent)
export class AdEventsHandler2 implements IEventHandler<AdCreatedEvent> {
constructor() {}
async handle(event: AdCreatedEvent) {
console.info('handle another service');
}
}
일반적인 pub/sub 모델을 생각하면 이벤트는 받는 대상을 신경쓰지 않고 발행하며, 구독하는 쪽이 알아서 처리를 하죠.
지금은 slack 만 처리하면 되니 하나의 핸들러만 AdCreatedEvent 를 구독하지만 해당 이벤트가 발생했을 때 또 다른 무언가 처리가
필요하다면, 위와 같이 AdCreatedEvent 를 구독하는 핸들러를 추가해주면 될겁니다.
실제 이벤트를 발행해보면, 여러 핸들러라 동일한 이벤트에 대해 구독하고 있음을 확인할 수 있습니다.
import { IEvent } from '@nestjs/cqrs';
import { CountryCode } from '../../types/country';
abstract class CqrsEvent {
constructor(readonly name: string) {}
}
export class AdCreatedEvent extends CqrsEvent implements IEvent {
constructor(
readonly adType: string,
readonly title: string,
readonly country: CountryCode,
) {
super(AdCreatedEvent.name);
console.info('@@@', AdCreatedEvent.name);
}
}
export class AnotherEvent extends CqrsEvent implements IEvent {
constructor() {
super(AnotherEvent.name);
console.info('@@@', AnotherEvent.name);
}
}
이번엔 하나의 로직에서 여러 이벤트를 발행하는 경우를 보려고 합니다.
기존 AdCreatedEvent 를 작성했던 파일에서 AnotherEvent 를 하나 더 만들어줬습니다. CqrsEvent 추상클래스는 이벤트를 구분하기 위해 추가한 클래스인데, 핸들러에서 이벤트 네임으로 이벤트를 구분하기 위함입니다.
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { SlackService } from '../../slack/slack.service';
import { AdCreatedEvent, AnotherEvent } from './ad-created.event';
@EventsHandler(AdCreatedEvent, AnotherEvent)
export class AdEventsHandler implements IEventHandler<AdCreatedEvent> {
constructor(private slackService: SlackService) {}
async handle(event: AdCreatedEvent | AnotherEvent) {
switch (event.name) {
case AdCreatedEvent.name: {
console.info('get AdCreatedEvent');
const { adType, title, country } = <AdCreatedEvent>event;
await this.slackService.send(adType, title, country);
break;
}
case AnotherEvent.name:
console.info('get AnotherEvent');
break;
default:
break;
}
}
}
위처럼 하나의 핸들러가 여러 이벤트를 구독하는건 간단합니다. 예시만으론 AnotherEvent 를 구독하는 별도의 핸들러를 만드는게
나을 것 같긴 하지만, 위와 같은 예시가 필요한 경우는 실제 서비스 구현시 꽤 있을 것 같습니다.
...
@CommandHandler(CreateAdCommand)
export class CreateAdHandler implements ICommandHandler<CreateAdCommand> {
...
async execute(command: CreateAdCommand): Promise<any> {
...
this.eventBus.publish(new AdCreatedEvent(adType, title, country));
this.eventBus.publish(new AnotherEvent());
}
}
기존 광고 생성시 AdCreatedEvent 를 발행했던 부분에서 AnotherEvent 도 발행해줍니다.
의도한대로 여러 이벤트를 구독하는 핸들러 또한 잘 동작하는 것을 확인할 수 있습니다.
- Subscribe All Events
이건 뭐 예시로 볼건 아니고 그냥 공식 문서를 보다보니 보여서 갖고왔습니다.
실제 코드를 확인해보면 CommandBus, QueryBus, EventBus 전부 ObservableBus 를 확장하고 있는데요.
이름에서 그런 냄새를 풍기듯이 ObservableBus 를 확인해보면 결국 얘는 rxjs 의 Observable 입니다.
따라서 얘네들은 전부 Observable 이라서 특정 이벤트가 아닌 발행되는 모든 이벤트를 구독하는 것 또한 가능합니다.
이건 대충 보니 이벤트 소싱을 구현하기 위해 사용할 수도 있어 보이네요.
아직 팀에서 CQRS 까지는 오버라고 생각하는 상황이라 이를 당장 프로덕션에서 사용할 일은 없을 것 같긴 합니다만,
굳이 CQRS 가 아니더라도 이벤트 기반 프로그래밍을 도입해서 각 컴퍼넌트간의 결합도를 낮추는 데에 유용히
쓸 수 있을 것 같습니다.
'Backend > NestJS' 카테고리의 다른 글
NestJS - 대충 서비스 만들어보기 (18) (0) | 2023.07.29 |
---|---|
NestJS - 대충 서비스 만들어보기 (16) (0) | 2023.07.08 |
NestJS - 대충 서비스 만들어보기 (15) (0) | 2023.07.01 |
NestJS - 대충 서비스 만들어보기 (14) (0) | 2023.06.17 |
NestJS - 대충 서비스 만들어보기 (13) (0) | 2023.06.12 |