본문 바로가기

Backend/NestJS

NestJS - 대충 서비스 만들어보기 (17)

저번 포스팅에서 @nestjs/cqrs 에 관한 내용 중 이벤트 기반 프로그래밍에 대한 부분을 이번 포스팅에서 간략히 보도록 하겠습니다.

 

- EventBus

 

수동으로 이벤트를 발송하고, 그 이벤트를 구독해 적합한 처리를 하는 것은 @nestjs/cqrs 에서 제공하는 EventBus 를 이용하면

간단하게 구현할 수 있습니다. 여기서 EventBus 를 사용해볼 부분은 광고를 생성한 후 생성 결과 내용을 Slack 으로 알리는 부분입니다.

 

@CommandHandler(CreateAdCommand)
export class CreateAdHandler implements ICommandHandler<CreateAdCommand> {
  ...

  async execute(command: CreateAdCommand): Promise<any> {
    ...
    await this.slackService.send(adType, title, country);
  }
}

 

현재 코드는 CreateAdHandler 에서 SlackService 를 주입받아 send 메서드를 execute 내에서 직접 호출하고 있는데요.

물론 이게 현재 프로덕션에서 동작하는 코드는 아니긴 하지만, 광고가 생성된 후 해당 결과를 slack 으로 보내는 것은 정해진 하나의

업무 규칙이라고 할 수 있습니다. 하지만 구조적으로 광고를 생성 처리와 slack 전송은 강하게 결합되어 있습니다.

 

그나마 생성자 주입을 하고 있긴 하지만 slack 외의 다른 수단으로 변경한다거나, 더 추가가 되어야 한다고 할 때 필연적으로

이 부분까지 수정을 할 수 밖에 없죠. 생성자 주입으로 받는 구현체의 타입을 추상화한다고 해도 말입니다.

이는 별개로 다룰 수 있어야되고, 해당 부분에서 광고가 생성이 완료되었다는 이벤트만 전파하고 필요한 부분에서 이 이벤트를 구독해

적합한 처리를 하는식으로 변경한다면 CreateAdHandler 에서는 이벤트만 발행할 뿐 더 신경쓸 무언가는 없습니다.

덤으로 그 외의 로직들은 비동기 처리까지 할 수 있으니 성능향상에도 도움이 될 거구요.

 

import { IEvent } from '@nestjs/cqrs';
import { CountryCode } from '../../types/country';

export class AdCreatedEvent implements IEvent {
  constructor(
    readonly adType: string,
    readonly title: string,
    readonly country: CountryCode,
  ) {}
}

 

방식은 Command, Query 를 추가할 때와 크게 다르지 않습니다. 우선 src/ads 하위에 event 디렉토리를 생성하고 ad-created.event.ts 파일을 위와 같이 작성합니다. @nestjs/cqrs 의 IEvent 인터페이스를 구현하는 형태고, 이렇게 해야 EventHandler 에서 해당 이벤트를 받을 수 있습니다. 어쨌거나 이 이벤트는 광고가 생성되었다 를 알리는 이벤트인겁니다.

 

import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { SlackService } from '../../slack/slack.service';
import { AdCreatedEvent } from './ad-created.event';

@EventsHandler(AdCreatedEvent)
export class AdEventsHandler implements IEventHandler<AdCreatedEvent> {
  constructor(private slackService: SlackService) {}

  async handle(event: AdCreatedEvent) {
    const { adType, title, country } = event;
    await this.slackService.send(adType, title, country);
  }
}

 

동일한 위치에 ad-events.handler.ts 파일을 작성합니다. 광고와 관련된 어떠한 이벤트가 발생하는 경우엔 이 핸들러가 처리하는거구요.

EventsHandler 데코레이터를 사용해서 위에서 생성했던 AdCreatedEvent 이벤트를 받을 수 있는 클래스임을 나타내고, IEventHandler 인터페이스를 구현은 handle 메서드를 구현해주면 됩니다. 여기로 SlackService 를 옮겨왔습니다.

 

...
import { AdCreatedEvent } from '../event/ad-created.event';

@CommandHandler(CreateAdCommand)
export class CreateAdHandler implements ICommandHandler<CreateAdCommand> {
  constructor(
    ...,
    private readonly eventBus: EventBus,
  ) {}

  async execute(command: CreateAdCommand): Promise<any> {
    const { adType, title, country } = command;
    await this.adsRepository.createUser(command);

    this.eventBus.publish(new AdCreatedEvent(adType, title, country));
  }
}

 

실제로 광고가 생성된 후 이벤트를 발행해야겠죠. CreateAdHandler 에선 기존에 SlackService 를 주입받는 대신 EventBus 를
주입받습니다. EventBus 또한 Injectable 한 컴퍼넌트입니다. 위에서 생성했던 AdCreatedEvent 를 EventBus 의 publish 메서드를

사용해 발행해줍니다.

 

...
import { AdEventsHandler } from './event/ad-events.handler';

@Module({
  ...,
  providers: [
    ...,
    AdEventsHandler,
  ],
})
export class AdsModule {}

 

AdsModule 에서 AdEventsHandler 를 provider 로 제공하면 됩니다. 이제 광고 생성 요청을 보내보면 광고 생성도 되고,

원래 동작했던 것처럼 slack 으로 광고 생성 완료 알림도 오는 것을 확인할 수 있습니다.

 

- Event Publish

 

...

@Injectable()
export class EventBus<EventBase extends IEvent = IEvent>
  extends ObservableBus<EventBase>
  implements IEventBus<EventBase>, OnModuleDestroy
{
  ...
  
  constructor(
    private readonly commandBus: CommandBus,
    private readonly moduleRef: ModuleRef,
    private readonly unhandledExceptionBus: UnhandledExceptionBus,
  ) {
    super();
    this.subscriptions = [];
    ...
  }

  ...

  /**
   * Publishes an event.
   * @param event The event to publish.
   */
  publish<TEvent extends EventBase, TContext = unknown>(
    event: TEvent,
    context?: TContext,
  ) {
    return this._publisher.publish(event, context);
  }

  register(handlers: EventHandlerType<EventBase>[] = []) {
    handlers.forEach((handler) => this.registerHandler(handler));
  }

  protected registerHandler(handler: EventHandlerType<EventBase>) {
    const instance = this.moduleRef.get(handler, { strict: false });
    if (!instance) {
      return;
    }
    const events = this.reflectEvents(handler);
    events.map((event) =>
      this.bind(
        instance as IEventHandler<EventBase>,
        defaultReflectEventId(event),
      ),
    );
  }

  ...
}

 

@nestjs/cqrs 의 EventBus 구현 부분 중 파악에 필요한 것만 일부 떼와봤습니다.

EventBus 를 사용할 때 이벤트 기반이고 publish 를 보는 순간 자연스레 저는 Redis 의 pub/sub 이 생각났는데요.

실제 구현부를 보면 EventBus 의 생성자에서 subscriptions 를 빈배열로 초기화하는 코드가 있네요. 이벤트를 구독하는 무언가의 리스트인 것 같습니다.

 

직접 호출하지는 않지만 register 를 보면 handler 를 배열로 받아 각 handler 를 인자로 해서 registerHandler 메서드를 호출하구요.

registerHandler 에선 event 와 handler 를 바인딩해주는 것처럼 보입니다.

어찌보면 저 bind 메서드가 핵심 같은데 제가 생략하긴 했습니다만.. bind 메서드에선 event 를 인자로 해서 handler 의 handle 메서드를 호출하도록 연결해주면서 그렇게 생성된 하나의 구독을 subscriptions 에 넣고 있습니다.

 

결국 대략적인 흐름은 Redis pub/sub 모델과 약간 빗대어 보자면 데코레이터이긴 하지만 EventsHandler 를 통해 특정 이벤트를
구독하고, EventBus 가 특정 이벤트를 발행하면 해당 이벤트를 구독하는 EventsHandler 의 handle 메서드가 호출된다 정도로 이해할 수 있을 것 같습니다. 인메모리에서 pub/sub 을 비슷하게 구현한 모델이다 정도로 볼 수 있을 것 같고, 자세한 구현 사항을 파악하고 싶다면 해당 소스 코드를 한번 파보시는 것도 좋을 것 같습니다.

 

이정도 내용만 보고 처음엔 이전에 사용했던 CommandBus 와 QueryBus 도 비슷하게 이벤트 발행 - 구독의 형태이지 않을까 생각했는데요. 크게보면 비슷하면서 세부적인 구현은 미묘한 차이를 보입니다.

 

...

@Injectable()
export class CommandBus<CommandBase extends ICommand = ICommand>
  extends ObservableBus<CommandBase>
  implements ICommandBus<CommandBase>
{
  private handlers = new Map<string, ICommandHandler<CommandBase>>();
  private _publisher: ICommandPublisher<CommandBase>;

  constructor(private readonly moduleRef: ModuleRef) {
    super();
    this.useDefaultPublisher();
  }

  /**
   * Executes a command.
   * @param command The command to execute.
   * @returns A promise that, when resolved, will contain the result returned by the command's handler.
   */
  execute<T extends CommandBase, R = any>(command: T): Promise<R> {
    const commandId = this.getCommandId(command);
    const handler = this.handlers.get(commandId);
    if (!handler) {
      const commandName = this.getCommandName(command);
      throw new CommandHandlerNotFoundException(commandName);
    }
    this._publisher.publish(command);
    return handler.execute(command);
  }

  bind<T extends CommandBase>(handler: ICommandHandler<T>, id: string) {
    this.handlers.set(id, handler);
  }

  register(handlers: CommandHandlerType[] = []) {
    handlers.forEach((handler) => this.registerHandler(handler));
  }

  protected registerHandler(handler: CommandHandlerType) {
    const instance = this.moduleRef.get(handler, { strict: false });
    if (!instance) {
      return;
    }
    const target = this.reflectCommandId(handler);
    if (!target) {
      throw new InvalidCommandHandlerException();
    }
    this.bind(instance as ICommandHandler<CommandBase>, target);
  }

  ...
}

 

EventBus 에선 구독하는 컴퍼넌트 리스트를 들고 있었다면 CommandBus 의 경우엔 Map 으로 들고 있네요.

비슷하게 register 부분을 보면 각 Command 에 맞는 핸들러를 Map 에 등록하고, 이후 특정 Command 가 전달되었을 때 Map 에서

그 Command 에 등록되어있는 핸들러를 꺼내 execute 메서드를 호출해주는 식입니다.

QueryBus 도 CommandBus 와 거의 동일한 구성이구요. 역시나 자세한 내용을 원한다면 소스 코드를 보시는게 좋을 것 같습니다.

 

- Event Handling

 

굳이 적을만한건 아니지만 cqrs 에서 제공하는 publish 에 대해 그냥 이것저것 테스트를 해보려고 합니다. 

 

import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { SlackService } from '../../slack/slack.service';
import { AdCreatedEvent } from './ad-created.event';

@EventsHandler(AdCreatedEvent)
export class AdEventsHandler implements IEventHandler<AdCreatedEvent> {
  constructor(private slackService: SlackService) {}

  async handle(event: AdCreatedEvent) {
    const { adType, title, country } = event;
    console.info('handle slack service');
    await this.slackService.send(adType, title, country);
  }
}

@EventsHandler(AdCreatedEvent)
export class AdEventsHandler2 implements IEventHandler<AdCreatedEvent> {
  constructor() {}

  async handle(event: AdCreatedEvent) {
    console.info('handle another service');
  }
}

 

일반적인 pub/sub 모델을 생각하면 이벤트는 받는 대상을 신경쓰지 않고 발행하며, 구독하는 쪽이 알아서 처리를 하죠.

지금은 slack 만 처리하면 되니 하나의 핸들러만 AdCreatedEvent 를 구독하지만 해당 이벤트가 발생했을 때 또 다른 무언가 처리가

필요하다면, 위와 같이 AdCreatedEvent 를 구독하는 핸들러를 추가해주면 될겁니다.

 

 

실제 이벤트를 발행해보면, 여러 핸들러라 동일한 이벤트에 대해 구독하고 있음을 확인할 수 있습니다.

 

import { IEvent } from '@nestjs/cqrs';
import { CountryCode } from '../../types/country';

abstract class CqrsEvent {
  constructor(readonly name: string) {}
}

export class AdCreatedEvent extends CqrsEvent implements IEvent {
  constructor(
    readonly adType: string,
    readonly title: string,
    readonly country: CountryCode,
  ) {
    super(AdCreatedEvent.name);
    console.info('@@@', AdCreatedEvent.name);
  }
}

export class AnotherEvent extends CqrsEvent implements IEvent {
  constructor() {
    super(AnotherEvent.name);
    console.info('@@@', AnotherEvent.name);
  }
}

 

이번엔 하나의 로직에서 여러 이벤트를 발행하는 경우를 보려고 합니다.

기존 AdCreatedEvent 를 작성했던 파일에서 AnotherEvent 를 하나 더 만들어줬습니다. CqrsEvent 추상클래스는 이벤트를 구분하기 위해 추가한 클래스인데, 핸들러에서 이벤트 네임으로 이벤트를 구분하기 위함입니다.

 

import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { SlackService } from '../../slack/slack.service';
import { AdCreatedEvent, AnotherEvent } from './ad-created.event';

@EventsHandler(AdCreatedEvent, AnotherEvent)
export class AdEventsHandler implements IEventHandler<AdCreatedEvent> {
  constructor(private slackService: SlackService) {}

  async handle(event: AdCreatedEvent | AnotherEvent) {
    switch (event.name) {
      case AdCreatedEvent.name: {
        console.info('get AdCreatedEvent');
        const { adType, title, country } = <AdCreatedEvent>event;
        await this.slackService.send(adType, title, country);
        break;
      }
      case AnotherEvent.name:
        console.info('get AnotherEvent');
        break;
      default:
        break;
    }
  }
}

 

위처럼 하나의 핸들러가 여러 이벤트를 구독하는건 간단합니다. 예시만으론 AnotherEvent 를 구독하는 별도의 핸들러를 만드는게

나을 것 같긴 하지만, 위와 같은 예시가 필요한 경우는 실제 서비스 구현시 꽤 있을 것 같습니다.

 

...

@CommandHandler(CreateAdCommand)
export class CreateAdHandler implements ICommandHandler<CreateAdCommand> {
  ...
  
  async execute(command: CreateAdCommand): Promise<any> {
    ...
    this.eventBus.publish(new AdCreatedEvent(adType, title, country));
    this.eventBus.publish(new AnotherEvent());
  }
}

 

기존 광고 생성시 AdCreatedEvent 를 발행했던 부분에서 AnotherEvent 도 발행해줍니다.

 

 

의도한대로 여러 이벤트를 구독하는 핸들러 또한 잘 동작하는 것을 확인할 수 있습니다.

 

- Subscribe All Events

 

이건 뭐 예시로 볼건 아니고 그냥 공식 문서를 보다보니 보여서 갖고왔습니다.

 

 

실제 코드를 확인해보면 CommandBus, QueryBus, EventBus 전부 ObservableBus 를 확장하고 있는데요.

이름에서 그런 냄새를 풍기듯이 ObservableBus 를 확인해보면 결국 얘는 rxjs 의 Observable 입니다.

따라서 얘네들은 전부 Observable 이라서 특정 이벤트가 아닌 발행되는 모든 이벤트를 구독하는 것 또한 가능합니다.

이건 대충 보니 이벤트 소싱을 구현하기 위해 사용할 수도 있어 보이네요.

 

아직 팀에서 CQRS 까지는 오버라고 생각하는 상황이라 이를 당장 프로덕션에서 사용할 일은 없을 것 같긴 합니다만,

굳이 CQRS 가 아니더라도 이벤트 기반 프로그래밍을 도입해서 각 컴퍼넌트간의 결합도를 낮추는 데에 유용히

쓸 수 있을 것 같습니다.