본문 바로가기

Backend/Golang

Go 언어 - [7장 - (2)] (동시성)

 

※ 이전 내용에 이어서 7장의 나머지 부분을 정리해보겠습니다.

 

7.3.6 파이프라인 중단하기

 

파이프라인을 구성할 때, 받기만 할 뿐 그만 보내달라고 요청할 수 없었습니다.

채널에서 받다가 끝까지 받지 않고 break 등으로 빠져나온다면, 보내는 쪽에서 채널이 막히게 되고 이 고루틴은 종료되지 않고 계속 막혀있게 됩니다. 채널이 닫힐 때까지 자료를 모두 빼주어야 종료되면서 고루틴이 소멸됩니다.

 

모두 자료를 소진시키지 않으면 해제되지 않은 고루틴들이 메모리에 남아 누수를 일으킵니다. 모든 자료를 소진시킨다해도 그만큼 계속 데이터를 받아오면 더 많은 네트워크 트래픽과 배터리 소모가 발생할 것입니다. 채널을 억지로 닫아버린다면 패닉이 발생하기에 보내는 쪽에서만 닫는것으로 패턴을 정형화해야 합니다.

 

유용한 패턴 중 하나는 done 채널을 하나 더 두는 것입니다. 보내는 고루틴에서 이 채널로부터 close 신호가 감지되면 보내는 것을 중단하고 채널을 닫으면 됩니다. 아래 예시)

 

func PlusOne(done <-chan struct{}, in <-chan int) <-chan int {
  out := make(chan int)
  go func() {
    defer close(out)
    for num := range in {
      select {
      case out <- num + 1:
      case <- done:
        return
      }
    }
  }()
  return out
}

func main() {
  c := make(chan int)
  go func() {
    defer close(c)
    for i := 3; i < 103; i += 10 {
      c <- i
    }
  }()
  done := make(chan struct{})
  nums := PlusOne(done, PlusOne(done, PlusOne(done, PlusOne(done, PlusOne(done, c)))))
  for num := range nums {
    fmt.Println(num)
    if num == 18 {
      break
    }
  }
  close(done)
  time.Sleep(100 * time.Millisecond)
  fmt.Println("NumGoroutine: ", runtime.NumGoroutine())
  for _ = range nums {
    // Consume all nums
  }
  time.Sleep(100 * time.Millisecond)
  fmt.Println("NumGoroutine: ", runtime.Numgoroutine())
}

 

7.3.7 컨텍스트(context.Context) 활용하기

 

done 채널을 운영하는것도 좋지만 여러 고루틴에 종료 신호 이외에도 다른 공유되어야 하는 정보 (예를 들어 사용자의 인증 정보) 가 있어야 하는 등의 복잡한 상황에선 context 패턴을 사용하면 좋습니다. 기본 라이브러리가 아니므로 아래와 같이 설치합니다.

 

> go get golang.org/x/net/context

 

위의 코드는 이제 아래처럼 대체될 수 있습니다.

 

func PlusOne(ctx context.Context, in <-chan int) <-chan int {
  out := make(chan int)
  go func() {
    defer close(out)
    for num := range in {
      select {
      case out <- num + 1:
      case <-ctx.Done():
        return
      }
    }
  }()
  return out
}

func main() {
  c := make(chan int)
  go func() {
    defer close(c)
    for i := 3; i < 103; i += 10 {
      c <- i
    }
  }()
  ctx, cancel := context.WithCancel(context.Background())
  nums := PlusOne(ctx, PlusOne(ctx, PlusOne(ctx, PlusOne(ctx, PlusOne(ctx, c)))))
  for num := range nums {
    fmt.Println(num)
    if num == 18 {
      cancel()
      break
    }
  }
}

 

밖에서 취소 신호를 받을 수 있는 경우에는 context.Context 를 넘겨받아 이용합니다. context.Context 는 계층 구조로 되어있고, context.Background() 가 가장 상위에 있어 프로그램이 끝날 때까지 절대 취소되지 않고 계속 살아있습니다.

위에선 context.WithCancel 로 context.Background() 밑에 취소 기능을 갖춘 컨텍스트를 붙였습니다. ctx, cancel 두 변수로 받았는데 ctx  에는 새로 생성한 컨텍스트, cancel 에는 이 컨텍스트를 취소하는데 호출할 수 있는 함수가 들어갑니다.

컨텍스트 관례상 다른 구조체 안에 넣지 않고 함수의 맨 첫 번째 인자로 넘겨주고 받습니다. 관련링크

 

7.3.8 요청과 응답 짝짓기

 

채널을 통해 요청을 보내고 응답을 받을때 이것이 어느 요청에 의한 응답인지 알아야 하는 경우 (분산처리때문에 어느것이 먼저 나올지 알수 없어서) 가 있습니다. 한 가지 방법은 채널로 넘겨주고 받는 자료에 ID 번호를 같이 넘겨 확인하는 것입니다. 하지만 이 경우 요청에 대한 응답을 다른 고루틴이 받아갈 수 있어 적절한 방법은 아닙니다. 따라서 두번째 방법은, 요청을 보낼 때 결과를 받고 싶은 채널을 함께 실어서 보내는 방법입니다. 아래 예시)

 

type Request struct {
  Num int
  Resp chan Response
}

type Response struct {
  Num int
  WorkerID int
}

func PlusOneService(reqs <-chan Request, workerID int) {
  for req := range reqs {
    go func(req Request) {
      defer close(req.Resp)
      req.Resp <- Response{req.Num + 1, workerID}
    }(req)
  }
}

func main() {
  reqs := make(chan Request)
  defer close(reqs)
  for i := 0; i < 3; i++ {
    go PlusOneService(reqs, i)
  }
  var wg sync.WaitGroup
  for i := 3; i < 53; i += 10 {
    wg.Add(1)
    go func(i int) {
      defer wg.Done()
      resps := make(chan Response)
      reqs <- Request{i, resps}
      fmt.Println(i, "=>", <-resps)
    }(i)
  }
  wg.Wait()
}

 

7.3.9 동적으로 고루틴 이어붙이기

 

필요에 따라 동적으로 채널을 통해 고루틴들을 이어붙일 수 있습니다. 소수 생성기 예제를 보도록 하겠습니다.

 

// start 부터 시작해서 step 만큼 더하면서 정수를 무한정 생성
func Range(ctx context.Context, start, step int) <-chan int {
  out := make(chan int)
  go func() {
    defer close(out)
    for i := start ; i += step {
      select {
      case out <- i:
      case <-ctx.Done():
        return
      }
    }
  }()
  return out
}

// n 의 배수를 걸러내는 파이프라인 반환
func FilterMultiple(n int) IntPipe {
  return func(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
      defer close(out)
      for x := range in {
        if x%n == 0 {
          continue
        }
        select {
        case out <- x :
        case <- ctx.Done():
          return
        }
      }
    }()
    return out
  }
}

// 무한 소수 생성기
func Primes(ctx context.Context) <-chan int {
  out := make(chan int)
  go func() {
    defer close(out)
    c := Range(ctx, 2, 1)
    for {
      select {
      case i := <-c:
        c = FilterMultiple(i)(ctx, c)
        select {
        case out <- i:
        case <-ctx.Done():
          return
        }
      case <-ctx.Done():
        return
      }
      /*
        L46 에서 막힐 수 있고 그 후 L49 에서 막혀있을 수 있기 때문에 select 를 다중으로
        이렇게 작성해야 취소되었을 때 막혀서 계속 살아있는 좀비 고루틴이 없어짐
      */
    }
  }()
  return out
}

// Primes 함수를 활용
func main() {
  max := 10
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel() // 이 부분에 의해 함수가 종료되었을 때 Primes 로 넘어간 ctx 가 취소되고 생성되었던 고루틴들이 모두 소멸
  for prime := range Primes(ctx) {
    if prime > max {
      break
    }
    fmt.Print(prime, " ")
  }
  fmt.Println()
}

 

7.3.10 주의점

 

보내는 쪽에서 채널을 닫는 것이 중요합니다. 다음의 생산자 - 소비자 패턴 예시를 통해 흔히 할 수 있는 실수를 보도록 하겠습니다.

 

// Bad Example
c := make(chan int)
done := make(chan bool)
go func() { // <- 생산자 고루틴
  for i= 0; i < 10; i++ {
    c <- i
  }
  done <- true
}()
go func() { // <- 소비자 고루틴
  for {
    fmt.Println(<-c)
  }
}()
<-done

 

위 예제는 몇 가지 문제점이 있는데, 우선 두번째 고루틴이 끝나지 않습니다. 또한 done 에는 생산이 끝난 뒤에 값이 들어가므로 소비가 끝나기 전에 메인 고루틴이 끝나버릴 가능성이 있습니다.

 

이러한 실수를 하지 않도록 정형화된 패턴을 아래에서 다시 정리해보도록 하겠습니다.

 

  • 자료를 보내는 채널은 보내는 쪽에서 닫아야 함
  • 보내는 쪽에서 반복문 등을 활용해서 보내다가 중간에 return 을 할 수 있으므로 닫을 때는 defer 를 이용해야 함
  • 받는 쪽이 끝날때까지 기다리는 것이 모든 자료의 처리가 끝나는 시점까지 기다리는 방법으로 더 안정적
  • 특별한 이유가 없다면 받는 쪽에서 range 를 이용하는 것이 좋음. 생산자가 채널을 닫은 경우에 반복문을 빠져나오게 되기 때문에 편리
  • 루틴이 끝났음을 알리고 다른 쪽에서 기다리는 것은 sync.WaitGroup 을 이용하게 더 나을 때가 많음
  • 끝났음을 알리는 done 채널은 자료를 보내는 쪽에서 결정할 사항이 아님. 자료를 보내는 쪽에선 채널을 닫아서 자료가 끝났음을 알리고 done 채널은 받는 쪽에서 자료를 더 이상 보내지 말아달라는 cancel 요청으로 보는 것이 낫다.
  • done 채널에 자료를 보내어 신호를 주는 것 보단 close(done) 으로 채널을 닫는 것이 나을 때가 많다.
// Bad
for i := 0; i < 10; i++ {
  go func() {
    fmt.Println(i)
  }()
}

// Good
for i := 0; i < 10; i++ {
  go func() {
    fmt.Println(i)
  }(i) // i 를 인자로 넘겨주도록 한다
}

 

7.4 경쟁 상태

 

고루틴들을 다루다 보면, 모든 고루틴들이 막혀서 교착 상태 (deadlock) 가 발생해 프로그램이 더 이상 진행이 안되어 출력된 오류로 버그를 발견하는 경우가 아닌 쉽게 발견하지 못하는 버그들이 있을때가 있는데, 그 중 하나가 경쟁 상태 (race condition) 입니다.

 

경쟁 상태는 어떤 공유된 자원에 둘 이상의 프로세스가 동시에 접근하여 잘못된 결과가 나올 수 있는 상태를 말합니다.

 

채널을 잘 활용하면 경쟁 상태 문제를 많이 해결할 수 있습니다. 몇 가지 경우에는 채널보다 sync 라이브러리를 활용하는 것이 더 간단하며 atomic 라이브러리를 활용해야 하는 경우 또한 생길 수 있습니다.

 

7.4.1 동시성 디버그

 

경쟁 상태 탐지 기능이라는 것이 있습니다. go 도구를 이용할 때 -race 옵션을 주면 됩니다.

 

> go test -race mypkg // to test the package
> go run -race mysrc.go // to run the source file
> go build -race mycmd // to build the command
> go install -race mypkg // to install the package

 

runtime 에 있는 고루틴과 시스템 자원에 관련된 몇 가지 함수를 보겠습니다.

 

  • runtime.Goroutine() : 현재 동작하는 고루틴의 수
  • runtime.NumCPU() : 현재 사용 가능한 CPU
  • runtime.GOMAXPROCS() : 사용할 최대 CPU 수 설정

고루틴의 개수가 점점 더 늘어난다면 어디선가 고루틴이 막혀 있을 가능성이 높습니다. 적당한 시점에 panic 을 발생시키면 고루틴들의 스택 추적이 출력되기에, 이런 기술을 잘 활용하면 동시성 디버그가 가능합니다.

 

7.4.2 atomic 과 sync.WaitGroup

 

func main() {
  cnt := int64(10)
  for i := 0; i < 10; i++ {
    go func() {
      cnt--
    }()
  }
  for cnt > 0 {
    time.Sleep(100 * time.Millisecond)
  }
  fmt.Println(cnt)
}

 

위 코드엔 경쟁 상태가 있습니다. -race 옵션을 주면 경쟁 상태가 있다는 오류를 발생시킵니다. cnt-- 부분과 cnt > 0 부분입니다.

메모리에서 값을 읽고 나서 다른 고루틴이 메모리를 읽은 다음에 값을 감소시켜서 저장하고, 다시 이전의 고루틴으로 돌아와 값을 저장한다면 값이 2 감소하지 않고 1 만 감소될 수 있습니다.

 

sync/atomic 패키지에는 이런 경우를 대비하기 위한 함수들이 있습니다. cnt-- 를 atomic.AddInt64(&cnt, -1) 로, cnt > 0 을 atomic.LoadInt64(&cnt) > 0 으로 바꿔주면 됩니다. 하지만 atomic 을 쓰지 않고 채널을 이용해서도 동시성 문제를 해결할 수 있습니다.

 

func main() {
  req, resp := make(chan struct{}), make(chan int64)
  cnt := int64(10)
  go func(cnt int64) {
    defer close(resp)
    for _ = range req {
      cnt--
      resp <- cnt
    }
  }(cnt)
  for i := 0; i < 10; i++ {
    go func() {
      req <- struct{}{}
    }()
  }
  for cnt = <-resp; cnt > 0; cnt = <-resp {
  }
  close(req)
  fmt.Println(cnt)
}

 

채널이 싱크를 맞춰주기 때문에, 위처럼 atomic 을 안써도 경쟁 상태가 생기지 않습니다.

 

7.4.3 sync.Once

 

한 번만 어떤 코드를 수행하고자 할 때 쓸 수 있는 것이 sync.Once 입니다. 분산 처리를 할 때, 초기화 코드에 이용할 수 있습니다.

 

func main() {
  var once sync.Once
  var wg sync.WaitGroup
  for i := 0; i < 3; i++ {
    wg.Add(1)
    go func(i int) {
      defer wg.Done()
      once.Do(func() {
        fmt.Println("Initialized")
      })
      fmt.Println("Goroutine:", i)
    }(i)
  }
  wg.Wait()
}

 

위 코드에서 Initialized 는 한 번만 수행됩니다.

 

7.4.4 Mutex 와 RWMutex

 

뮤텍스 (Mutex) 는 상호 배타 잠금 기능이 있습니다. 동시에 둘 이상의 고루틴에서 코드의 흐름을 제어할 수 있습니다. 외부 자원에 접근하는 경우 이것을 활용하면 효과적일 수 있씁니다. 뮤텍스를 잘 활용하는 방법은 자원 포인터와 뮤텍스 포인터를 하나의 구조체에 넣어두고 사용하는 것입니다.

 

type Accessor struct {
  R *Resource
  L *sync.Mutex
}

func (acc *Accessor) Use() {
  acc.L.Lock()
  // Use acc.R
  acc.L.Unlock()
}

 

sync.RWMutex 는 좀 더 복잡합니다. 읽어가는 것은 상관없지만, 프로세스 하나라도 쓰기를 한다면 다른 어떤 프로세스도 그 동안에 접근할 수 없는 경우에 이용됩니다. sync.Mutex 를 이용하여 읽어하는 것 역시 프로세스 하나만 허용해도 문제없이 동작하지만 성능은 많이 저하될 것입니다. Go 에서 기본으로 제공하는 맵이 RWMutex 를 이용하기 더 적합한 성질을 갖고 있습니다.

 

type ConcurrentMap struct {
  M map[string]string
  L *sync.RWMutex
}

func (m ConcurrentMap) Get(key string) string {
  m.L.RLock()
  defer m.L.RUnlock()
  return m.M[key]
}

func (m ConcurrentMap) Set(key, value string) {
  m.L.Lock()
  m.M[key] = value
  m.L.Unlock()
}

 

RWMutex 도 Mutex 의 일종이기에 RLock 과 RUnlock 을 사용하지 않으면 Mutex 와 동일합니다.

 

7.5 문맥 전환

 

문맥 전환 (context switching) 이란 프로그램이 여러 프로세스 혹은 스레드에서 동작할 때 기존에 하던 작업들을 메모리에 보관해두고 다른 작업을 시작하는 것을 말합니다. 문멕 전환 비용도 있고 스레드마다 필요한 자원이 있기 때문에 무작정 스레드를 많이 만드는 것은 좋지 않습니다.

 

고루틴은 스레드보다 더 저렴합니다. 고루틴을 여러 개 만든다고 해서 스레드가 그만큼 많이 만들어지는 것은 아닙니다. 여러 개의 고루틴들이 하나의 스레드에 대응됩니다. Go 컴파일러는 주로 아래의 경우에 문맥 전환을 하는 코드를 생성할 수 있습니다.

 

  • 파일이나 네트워크 연산처럼 시간이 오래 걸리는 입출력 연산이 있을 때
  • 채널에 보내거나 받을 때
  • go 로 고루틴이 생성될 때
  • 가비지 컬렉션 사이클이 지난 뒤

GO 의 입출력은 주로 블럭킹입니다. 연산이 완료될 때까지 기다리는 것으로 동기 입출력이라고도 하며, 이때 다른 고루틴으로 문맥 전환하는 것은 매우 현명합니다.

채널에 보내거나 받을 때, 고루틴 간의 문맥 전환이 일어나는 것은 자연스럽습니다. 이것을 컴파일 시간에 예측하면 변수들을 레지스터에 할당하는 전략을 더 잘 세워 성능 향상에 도움이 됩니다.

문맥 전환을 강제로 시키기 위한 코드 중에 하나가 time.Sleep(0) 입니다. 0초간 쉬는 아무 의미없어 보이는 코드가 문맥 전환을 강제 전환시키기에 가끔 필요할 수 있습니다.

 

7.6 Summary

 

  • 병렬성은 물리적으로 동시에 수행되는 성질을 말하며 병행성은 논리적으로 동시에 실행될 수 있는 성질을 말함
  • 함수 호출할 때 go 를 앞에 붙이면 새로운 고루틴이 생성되어 병행적으로 수행됨. 고루틴은 스레드보다 가볍고 여러 고루틴이 하나의 스레드에 할당되기 때문에 비교적 많이 생성 가능
  • 다른 고루틴들이 종료될 때까지 기다릴 때에는 sync.WaitGroup 을 활용
  • 채널을 이용해 고루틴끼리 통신하는 데 사용할 수 있음
  • 버퍼 없는 채널의 경우 같은 채널에 보내고 받는 고루틴이 짝을 이루면 한 쪽에서 다른 쪽으로 자료가 전송되고 많은 경우에 서로 다른 고루틴으로 문맥 전환이 일어나기도 함
  • 채널은 버퍼를 두어서 보내는 고루틴과 받는 고루틴이 서로 동기화하지 않아도 동작하게 할 수 있음
  • 생성기 패턴을 고루틴과 채널을 이용해 작성 가능
  • 파이프라인 패턴 : 보내는 쪽에서 전송이 끝나면 채널을 닫고 받는 쪽에서 채널이 닫힐 때까지 자료를 받음
  • 채널을 닫아 해당 채널에서 받으려고 대기하고 있는 모든 고루틴에 방송 효과를 줄 수 있음
  • select 를 이용해 여러 개의 채널을 동시에 다룰 수 있으며 채널을 통해 오는 자료가 올 때까지 기다리지않거나 시간 제한을 둘 수 있음
  • 받는 쪽에서 그만 받고자 할 때, done 채널을 이용해 제어할 수 있고 context.Context 를 이용하면 편리
  • 동시성 디버깅 기능을 기본으로 제공. 경쟁 상태가 발생하면 공유 메모리를 활용하기보다 채널을 활용하거나, atomic 과 sync 패키지의 기능들을 활용해 해결

 

 

- 출처 : 디스커버리 Go 언어

 

'Backend > Golang' 카테고리의 다른 글

Go 언어 - [8장] (실무 패턴)  (0) 2020.09.19
Go 언어 - [7장 - (1)] (동시성)  (0) 2020.09.05
Go 언어 - [6장] (웹 어플리케이션 작성)  (0) 2020.08.29
Go 언어 - [5장] (구조체)  (0) 2020.08.22
Go 언어 - [4장] (함수)  (0) 2020.08.16