본문 바로가기

Backend/Golang

Go 언어 - [7장 - (1)] (동시성)

※ 디스커버리 Go 언어 라는 책 내용을 전반적으로 다루지만 책 리뷰보다는 좀 더 간단하게 정리해보겠습니다. 7장은 내용이 길어서 두개의 포스팅으로 나눠서 작성하겠습니다.

 

7. 동시성

 

이번 장에서는 동시성에 대한 개념을 알아보고 구현하며, 동시성을 이용하면 좋은 경우와 패턴들 또한 보도록 하겠습니다.

 

7.1 고루틴

 

고루틴은 가벼운 스레드와 같은 것으로 현재 수행 흐름과 별개의 흐름을 만들어주며, 생성 방법은 간단합니다.

 

f(x, y, z) // 일반적인 함수 호출

go f(x, y, z) // 고루틴 호출

 

앞에 go 를 붙여서 함수를 호출하게 되면 f(x, y, z) 의 호출과 현재 함수의 흐름은 메모리를 공유하는 논리적으로 별개의 흐름이 됩니다.

 

7.1.1 병렬성과 병행성

 

물리적으로 별개의 흐름이랑 동시에 각각의 흐름이 수행되는 경우를 뜻합니다. 두 사람이 동시에 각각 업무를 보고 있다면 물리적으로 별개의 업무를 수행하는 것이며 이를 병렬성 (Parallelism) 이라 합니다.

 

반면 예를 들어 커피를 마시면서 신문을 볼 때 커피를 마시기 위해 신문 보는 것을 잠깐 중단하고, 커피를 한모금 한 후 신문을 다시 보기 시작하는 경우는 물리적으로 두 흐름이 있지는 않지만 동시에 두가지를 하고 있으며, 이를 동시성 혹은 병행성 (Concurrency) 라고 합니다.

 

동시성은 병렬성과는 다르지만 동시성이 있어야 병렬성이 생깁니다.

 

func main() {
  go func() {
    fmt.Println("In goroutine")
  }()
  fmt.Println("In main routine")
}

 

위 코드처럼 작성하면 In goroutine, In main routine 둘 중 어느 것이 먼저 출력될지 알 수 없습니다.

 

7.1.2 고루틴 기다리기

 

여러 이미지 파일들을 동시에 다운로드 받은 뒤 zip 으로 압축하는 프로그램을 보도록 하겠습니다.

 

// url 을 받아 다운로드 후 contents 와 error 를 리턴
func download(url string) (string, error) {
  resp, err := http.Get(url)
  if err != nil {
    return "", err
  }
  filename, err := urlToFilename(url)
  if err != nil {
    return "", err
  }
  f, err := os.Create(filename)
  if err != nil {
    return "", err
  }
  defer f.Close()
  _, err = io.Copy(f, resp.Body)
  return filename, err
}

// rawurl 로부터 filename 리턴
func urlToFilename(rawurl string) (string, error) {
  url, err := url.Parse(rawurl)
  if err != nil {
    return "", err
  }
  return filepath.Base(url.Path), nil
}

// zip 파일 생성
func writeZip(outFilename string, filenames []string) error {
  outf, err := os.Create(outFilename)
  if err != nil {
    return err
  }
  zw := zip.NewWriter(outf)
  for _, filename := range filenames {
    w, err := zw.Create(filename)
    if err != nil {
      return err
    }
    f, err := os.Open(filename)
    if err != nil {
      return err
    }
    defer f.Close()
    _, err = io.Copy(w, f)
    if err != nil {
      return err
    }
  }
  return zw.Close()
}

func main() {
  var urls = []string{
    "http://image.com/img01.jpg",
    "http://image.com/img02.jpg",
    "http://image.com/img03.jpg",
  }
  
  for_, url := range urls {
    go func(url string) {
      if _, err := download(url); err != nil {
        log.Fatal(err)
      }
    }(url)
  }
  
  filenames, err := filepath.Glob("*.jpg")
  if err != nil {
    log.Fatal(err)
  }
  err = writeZip("images.zip", filenames)
  if err != nil {
    log.Fata(err)
  }
}

 

위 코드는 main 함수에서 다운로드 함수 호출 부분에 go 키워드가 붙어있어 원하는 대로 실행이 안됩니다. 파일이 다운로드되기도 전에 압축하려고 할 수 있기 때문입니다. 압축작업은 파일들이 모두 다운로드 될 때까지 기다린 후 수행되어야 하는데, 이럴 때 이용할 수 있는 것이 sync.WaitGroup 입니다.

 

var wg sync.WaitGroup // wg 에는 기본값이 0 으로 맞춰져 있는 카운터가 들어가있음
wg.Add(len(urls)) // 호출될때마다 숫자를 더함
for _, url := range urls {
  go func(url string) {
    defer wg.Done() // == wg.Add(-1)
    if _, err = download(url); err != nil {
      log.Fatal(err)
    }
  }(url)
}
wg.Wait() // 모든 고루틴이 끝나 카운터가 0 이 되기 전까지 이 부분에서 멈춰있음

 

이전에 작성한 코드 main 의 for 반복문 부분을 위의 코드로 변경하면 됩니다. 위처럼 코드를 작성하는 경우는 미리 고루틴이 몇 개 생길지 알기 때문에 작성 가능한 패턴이며, 고루틴이 몇 개 생길지 알 수 없는 경우엔 고루틴을 띄우기 전에 wg.Add(1) 을 수행해 하나씩 카운터를 증가시킬 수 있습니다.

 

var wg sync.WaitGroup
for _, url := range urls {
  wg.Add(1) // for 문 안에서 증가시킴
  go func(url string) {
    defer wg.Done()
    if _, err := download(url); err != nil {
      log.Fatal(err)
    }
  }(url)
}
wg.Wait()

 

고루틴 내부의 wg.Add(1) 이 수행되기 전에 메인 고루틴이 wg.Wait() 를 통과해버릴 가능성이 있기 때문에 wg.Add(1) 을 고루틴 내부에 포함시키면 안된다는 점을 주의해야겠습니다. 이런 상태를 레이스 컨디션이라고 합니다.

 

- 공유 메모리와 병렬 최소값 찾기

 

고루틴들은 메모리도 서로 공유합니다. 큰 배열에 계산 결과를 넣어야 할 때 영역별 슬라이스로 나눠서 여러 고루틴에 각각의 슬라이스들을 나누어 합쳐진 결과값을 받을 수 있습니다. 공유 메모리를 이용해 병렬화가 잘 되는 최소값 찾기 문제를 보도록 하겠습니다.

 

// 병렬화 하지 않은 코드
func Min(a []int) int {
  if len(a) == 0 {
    return 0
  }
  min := a[0]
  for _, e := range a[1:] {
    if min > e {
      min = e
    }
  }
  return min
}

// 병렬화 코드
func ParallelMin(a []int, n int) int { // n : 고루틴 갯수
  if len(a) < n {
    return Min(a)
  }
  mins := make([]int, n)
  size := (len(a) + n - 1) / n
  var wg sync.WaitGroup
  for i := 0; i < n ; i++ {
    wg.Add(1)
    go func(i int) {
      defer wg.Done()
      begin, end := i*size, (i+1)*size
      if end > len(a) {
        end = len(a)
      }
      mins[i] = Min(a[begin:end])
    }(i)
  }
  wg.Wait()
  return Min(mins)
}

 

고루틴의 갯수만큼 슬라이스로 나눠서 각 고루틴에서 Min 을 찾은 후 해당 결과들 중 Min 을 한번 더 찾는 것입니다.

 

7.2 채널

 

위에서 봤던 메모리나 파일시스템을 공유하는 방법도 좋지만, 고루틴에선 서로 통신하기 위해 채널을 사용할 수 있습니다.

채널은 넣은 데이터를 뽑아낼 수 있는 파이프와 같은 형태의 자료구조입니다. 채널에 데이터를 넣고 뽑으면서 다른 고루틴 사이에 통신을 할 수 있습니다. 채널 역시 일급 시민이며, 양방향 채널 / 단방향 채널이 있습니다.

채널은 맵처럼 생성을 해야 쓸 수 있습니다. make(chan int) 와 같이 정수 채널을 만들 수 있습니다.

 

c1 := make(chan int) // c1 : 정수 채널
var chan int c2 = c1 // c2 에 c1 을 할당, c2 와 c1 은 동일한 채널
var <-chan int c3 = c1 // c3 는 자료를 뺄 수만 있는 채널
var chan<- int c4 = c1 // c4 는 자료를 넣을 수만 있는 채널

c1 <- 100 // c1 채널에 100 을 보냄
data := <- c1 // c1 채널에서 자료를 받아 data 에 담음

 

c3 를 받기 전용, c4 를 보내기 전용 채널이라 합니다. 굳이 양방향 채널을 단방향으로 바꾸는 이유는, 넘겨받는 함수에서 받은 채널은 반대 방향으로 이용할 수 없으니 채널을 이용한 프로그래밍을 단순하게 하고 실수를 방지해주기에 이와 같은 방법을 적극 권장합니다.

 

7.2.1 일대일 단방향 채널 소통

 

가장 단순한 형태이며 고루틴 하나에서는 보내고 다른 고루틴에서는 받는 형태입니다.

 

c := make(chan int)
go func() {
  c <- 1
  c <- 2
  c <- 3
}()

fmt.Println(<-c)
fmt.Println(<-c)
fmt.Println(<-c)
// Output:
// 1
// 2
// 3

 

위처럼 사용하려면 보내고 받는 고루틴이 서로 몇개의 데이터를 보내는지 알아야하며, 이 숫자가 맞지 않으면 고루틴이 멈춥니다.

서로 데이터 갯수를 알지 못하더라도 동작하는 코드를 보도록 하겠습니다.

 

c := func() <-chan int {
  c := make(chan int)
  go func() {
    defer close(c)
    c <- 1
    c <- 2
    c <- 3
  }()
  return c
}()
for num := range c {
  fmt.Println(num)
}
// Output:
// 1
// 2
// 3

 

위 코드에선 함수가 채널을 반환하게 만드는 패턴을 사용했으며, 자주 쓰이는 패턴이니 알아두도록 합니다.

 

7.2.2 생성기 패턴

 

// 생성기 패턴 & 클로저를 이용
func FibonacciGenerator(max int) func() int {
  next, a, b := 0, 0, 1
  return func() int {
    next, a, b = a, b, a+b
    if next > max {
      return -1
    }
    return next
  }
}

 

이러한 생성기 패턴을 사용하면 채널을 이용하는 방법에 아래와 같은 장점이 있습니다.

  • 생성하는 쪽에서 상태 저장 방법을 고민할 필요 없음
  • 받는 쪽에서 for 의 range 이용 가능
  • 채널 버퍼를 이용해 멀티 코어를 활용하거나 입출력 성능상의 장점 이용 가능

 

7.2.3 버퍼 있는 채널

 

지금까지 알아본 채널은 보내는 고루틴에서 값을 보낼 때, 받는 쪽도 준비가 되어 있어야 하는 버퍼가 없는 채널입니다.

받는 쪽이 준비가 되어 있지 않아도 보내는 쪽이 미리 보내고 싶으면 채널에 버퍼를 잡아 주면 되며, 이 경우 버퍼가 가득 차기 전까지는 받는 쪽에서 준비가 되어 있지 않아도 보낼 수 있습니다.

 

c := make(chan int, 10)

 

위처럼 버퍼 크기가 10인 정수 채널을 만들었습니다. 보내는 쪽과 받는 쪽의 코드가 균일한 속도로 수행되지 않는 경우 버퍼있는 채널을 만들면 성능상의 장점이 있습니다. 두 고루틴 간에 어느정도 격차가 생겨도 계속 동작할 수 있기에 성능 향상이 일어나는 것입니다.

 

그러나 논리적인 오류 등으로 버퍼가 가득 차면 다른 고루틴이 채널에서 받아가지 않는 이상 고루틴이 채널에 보내는 곳에서 계속 멈춰있기 때문에 무조건 버퍼를 잡는게 좋진 않습니다. 이 경우 버퍼 크기를 늘리는 식으로 문제를 해결하면 코드가 더 복잡해지고 예측 불가능한 코드가 됩니다. 버퍼 없는 채널로 동작하는 코드를 만들고 필요에 따라 성능향상을 위해 버퍼 값을 조절해주는 것이 좋습니다.

 

7.2.4 닫힌 채널

 

채널이 닫히면 for range 를 이용할 때 반복이 종료됩니다.

 

val, ok := <- c // 첫번째 변수엔 채널에서 받은 값, 두번째 변수엔 채널이 열려 있는지 여부

 

위의 경우 채널이 닫혀있다면, val 에는 기본값이 들어옵니다. 채널이 열려있으면 채널에 받을 값이 없을 땐 멈춰있지만, 채널이 닫혀 있으면 값이 보내질 리 없으므로 전혀 기다리지 않습니다. 채널에 값을 보내지 않아도 무한정 기본값을 받아올 수 있는 채널이 됩니다.

 

c := make(chan int)
close(c)
fmt.Println(<-c)
fmt.Println(<-c)
fmt.Println(<-c)
// Output:
// 0
// 0
// 0

 

이미 닫은 채널을 또 닫을 경우 패닉이 발생합니다.

 

7.3 동시성 패턴

 

두 고루틴에서 한 쪽에선 보내고 다른 쪽에선 받기만 하는 패턴 외의 다른 패턴을 보도록 하겠습니다.

 

7.3.1 파이프라인 패턴

 

파이프라인은 한 단계의 출력이 다음 단계의 입력으로 이어지는 구조입니다.

파이프라인 패턴은 생성기 패턴의 일종입니다. 생성기 패턴과 동일하게 받기 전용 채널을 반환하지만 그 채널을 넘겨받아 입력으로 활용한다는 점에서 차이가 있습니다.

반환된 받기 전용 채널을 다른 파이프라인의 입력으로 넘겨줄수 있기 때문에 자연스럽게 출력을 입력으로 연결하여 연결된 파이프라인을 구성할 수 있습니다.

 

// in 에서 받은 num 들을 1 씩 증가시킨 채널을 반환
func PlusOne(in <-chan int) <-chan int {
  out := make(chan int)
  go func() {
    defer close(out)
    for num := range in {
      out <- num + 1
    }
  }()
  return out
}

func main() {
  c := make(chan int)
  go func() {
    defer close(c)
    c <- 5
    c <- 3
    c <- 8
  }()
  for num := range PlusOne(PlusOne(c)) { // PlusOne 을 두번 연속 호출
    fmt.Println(num) // -> 7 5 10
  }
}

 

같은 함수뿐만 아니라 형태만 같다면 서로 다른 함수들도 이어붙여 아래와 같이 일직선 파이프라인을 구성할 수 있습니다.

 

type IntPipe func(<-chan int) <- chan int

 

생성기 패턴과 마찬가지로 데이터를 보내는 쪽에서 채널을 닫아야 합니다. 그렇지 않으면 파이프라인이 꼬입니다.

사슬처럼 이어진 파이프라인을 하나로 보이게 만들어야 할 때는 아래와 같이 사용합니다.

 

func Chain(ps ...IntPipe) IntPipe {
  return func(in <-chan int) <-chan int {
    c := in
    for _, p := range ps {
      c = p(c)
    }
    return c
  }
}

// 사용 예시
PlusTwo := Chain(PlusOne, PlusOne)

 

7.3.2 채널 공유로 팬아웃하기

 

팬아웃 (Fan-out) : 게이트 하나의 출력이 게이트 여러 입력으로 들어가는 경우

 

채널로 팬아웃을 하는 방법은, 채널 하나를 여럿에게 공유하면 됩니다.

 

func main() {
  c := make(chan int)
  for i := 0; i < 3; i++ {
    go func(i int) {
      for n := range c {
        time.Sleep(1)
        fmt.Println(i, n)
      }
    }(i)
  }
  for i := 0; i < 10; i++{
    c <- i
  }
  close(c)
}

// Output
// 1 2
// 2 0
// 0 1
// 0 5
// 1 3
// 2 4
// 2 8
// 0 6
// 1 7

 

채널 하나를 생성하고 for 문 안에서 받는 고루틴을 3개 생성합니다. time.Sleep 은 짧은 시간동안 다른 고루틴도 실행될 수 있도록 추가한 부분입니다. 이후 보내는 부분은 해당 채널로 0 부터 9 까지 숫자들을 보냅니다. Output 은 위와 같을 수도 있고, 다를 수도 있습니다.

 

위 코드에서 채널에 숫자들을 보낸 후 닫아 주고 있는데, 이런 습관을 기르는게 좋습니다. 만약 여기서 프로그램이 종료되지 않는 경우에 숫자들을 기다리는 3개의 고루틴이 종료되지 않아 계속 메모리에 남게 되고, 이로 인해 메모리 누수가 발생하게 됩니다.

 

또한 팬아웃에서 채널을 닫는것은 방송(broadcast) 효과가 있습니다. 채널을 닫는 대신 미리 약속한 특수한 값을 채널로 전달해 종료되었다는 것을 알린다면 채널에서 자료를 받는 고루틴 중에서 하나만 이것을 전달받게 되며, 이걸 보완하기 위해 받는 고루틴의 수만큼 특수한 값을 보내야 하는데, 이렇게 하지 않고 그냥 채널을 닫아버리면 몇개의 고루틴이 기다리고 있는지 상관없이 모두 채널이 닫혔다는 것을 알게 됩니다.

 

7.3.3 팬인하기

 

팬인 (Fan-in) : 하나의 게이트에 여러 개의 입력선이 들어가는 경우

 

팬인하는 방법은 마찬가지로 채널을 공유하는 것이며, 같은 채널에 여러 고루틴이 값을 보내도 받아가는 곳에서 모든 값을 받아갈 수 있습니다. 보내는 고루틴이 여럿이므로 이전에 알아본 패턴처럼 보내는 고루틴에서 채널을 닫으면 패닉이 발생하기 때문에 채널을 닫기 위한 하나의 고루틴을 만들고 보내는 모든 고루틴들이 종료된 뒤에 채널을 닫아 종료하게끔 구성하면 됩니다.

 

func FanIn(ins ...<- chan int) <- chan int {
  out := make(chan int)
  var wg sync.WaitGroup
  wg.Add(len(ins))
  for _, in := range ins {
    go func(in <-chan int) {
      defer wg.Done()
      for num := range in {
        out <- num
      }
    }(in)
  }
  go func() {
    wg.Wait()
    close(out)
  }()
  return out
}

// 사용 예시
c := FanIn(c1, c2, c3)

 

c1, c2, c3 채널에서 나온 자료들은 모두 c 로 나오게 됩니다.

 

7.3.4 분산처리

 

팬아웃해서 파이프라인을 통과시키고 다시 팬인시키면 분산처리됩니다.

분산처리로 인해 고루틴의 갯수가 많은 것은 걱정하지 않아도 됩니다. Go 에서는 고루틴마다 스레드를 모두 할당하지 않으며, 동시에 수행될 필요가 없는 고루틴들은 모두 하나의 스레드에서 순차적으로 수행됩니다. 이 부분은 컴파일 시간에 예측 가능한 경우가 많으므로 스레드를 많이 만드는 경우에 생길 수 있는 비용은 발생하지 않습니다.

 

7.3.5 select

 

select 를 이용하면 동시에 여러 채널과 통신할 수 있습니다. select 의 형태는 switch 문과 비슷하지만 동시성 프로그래밍에 사용되며 아래와 같은 특징이 있습니다.

  • 모든 case 가 계산됨. 함수 호출 등이 있으면 select 를 수행할 때 모두 호출됨. 아래 예제에선 c3 채널이 준비되있지 않더라도 f() 는 반드시 호출됨.
  • 각 case 는 채널에 입출력하는 형태가 되며 막히지 않고 입출력이 가능한 case 가 있으면 그 중에 하나가 선택되고 해당 case 코드만 수행됨.
  • default 가 있으면 모든 case 에 입출력이 불가능할 때 코드가 수행됨. default 가 없고 모든 case 에 입출력이 불가능하면 어느 하나라도 가능해질 때까지 기다림.
select {
case n := <-c1:
  fmt.Println(n, "is from c1")
case n := <=c2:
  fmt.Println(n, "is from c2")
case c3 <- f():
  fmt.Println("1 is sent to c3")
default:
  fmt.Println("No channel is ready")
}

 

- 팬인하기

 

select 를 링요하면 고루틴을 여러 개 이용하지 않고도 팬인 할 수 있으며, 방법은 위의 예제에서 받는 부분만 떼면 됩니다.

 

select {
case n := <-c1: c <- n
case n := <-c2: c <- n
case n := <-c3: c <- n
}

 

c1, c2, c3 중 어느 채널이라도 자료가 준비되어 있으면 그것을 c 로 바로 보냅니다. 위의 select 문을 for 반복문으로 두르면 팬인이 가능합니다. 만일 c1, c2, c3 중 닫혀있는 채널이 있다면 닫힌 채널은 막히지 않고 기본값을 계속해서 ㅂ다아갈 수 있기 때문에 이 경우 닫힌 채널의 case 가 선택되어 기본값이 받아질 가능성이 있습니다.

 

func FanIn(in1, in2, in3 <- chan int) <-chan int {
  out := make(chan int)
  openCnt := 3
  closeChan := func(c *<-chan int) bool {
    *c = nil
    openCnt--
    return openCnt == 0
  }
  go func() {
    defer close(out)
    for {
      select {
        case n, ok := <-in1:
          if ok {
            out <- n
          } else if closeChan(&in1) {
            return
          }
        case n, ok := <-in2:
          if ok {
            out <- n
          } else if closeChan(&in2)  {
            return
          }
        case n, ok := <-in3:
          if ok {
            out <- n
          } else if closeChan(&in3) {
            return
          }
        }
      }
    }()
    return out
  }

 

위 코드는 닫힌 채널에서 기본값이 받아질 가능성을 없애기 위한 코드이며 특징은 닫힌 채널은 nil 로 바꾸어준 부분입니다. nil 채널에는 보내기 및 받기가 모두 막히게 됩니다. 물론 채널 자체를 바꾼 것이 아니라 채널 변수를 nil 로 바꾼 것이기 때문에 만약 다른 고루틴이 이 채널에서 자료를 받아가고 있었다고 해도 그쪽은 전혀 영향을 받지 않습니다.

 

- 채널을 기다리지 않고 받기

 

채널값이 있으면 받고, 없으면 스킵하는 흐름을 구성하려면 select 를 이용하면 됩니다.

 

select {
case n := <-c:
  fmt.Println(n)
default:
  fmt.Println("Data is not ready. Skipping...")
}

 

- 시간 제한

 

채널과 통신을 기다리되 일정 시간 동안만 기다리겠다면 time.After 함수를 사용할 수 있습니다. 이 함수는 채널을 돌려주는데 지정된 시간이 지나면 이 채널로 현재 시간이 전달됩니다.

 

select {
case n := <-recv:
  fmt.Println(n)
case send <- 1:
  fmt.Println("sent 1")
case <-time.After(5 * time.Second)
  fmt.Println("No send and receive communication for 5 seconds")
  return
}

 

이 select 는 만약 for 문으로 둘러싸여 있다면 매번 반복될때마다 타이머가 새로 생성되므로, 한 번의 채널 통신마다 5초의 제한 시간이 생깁니다. 전체 제한시간을 걸고 싶으면 for 문 이전에 타이머 채널을 만들어 두고 이용하면 됩니다.

 

7장의 나머지 부분은 다음 포스팅에서 이어가도록 하겠습니다.

 

 

출처 : 디스커버리 Go 언어

 

 

'Backend > Golang' 카테고리의 다른 글

Go 언어 - [8장] (실무 패턴)  (0) 2020.09.19
Go 언어 - [7장 - (2)] (동시성)  (0) 2020.09.12
Go 언어 - [6장] (웹 어플리케이션 작성)  (0) 2020.08.29
Go 언어 - [5장] (구조체)  (0) 2020.08.22
Go 언어 - [4장] (함수)  (0) 2020.08.16