티스토리 뷰

# ThreadPoolExecutor

java 에서 멀티스레드 프로그램을 구현시 자바 1.5 에 추가된 concurrent 패키지를 많이 이용하게 된다.

concurrent 패키지에 있는 ThreadPoolExecutor 는 Thread 를 직접 생성하고, 관리하는 부분을 추상화하여 작업(task)과 실행(execute)을 분리시켜준다.

 

ThreadPoolExecutor 는 Executors 에 있는 팩토리 메서드를 이용해 간편하게 생성할 수도 있고, 직접 생성자를 호출해서 객체를 생성할 수도 있다.

Executors.newCachedThreadPool();        
Executors.newFixedThreadPool(10);       
Executors.newSingleThreadExecutor();    
Executors.newScheduledThreadPool(10);   

생성자를 이용해 직접 생성할 때는 다양한 옵션들을 필요로한다.

public ThreadPoolExecutor(int corePoolSize,                                 
                          int maximumPoolSize,                              
                          long keepAliveTime,                               
                          TimeUnit unit,                                    
                          BlockingQueue<Runnable> workQueue) {              
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,     
         Executors.defaultThreadFactory(), defaultHandler);                 
}                                                                           

corePoolSize

기본적으로 관리할 쓰레드 숫자

 

maximumPoolSize

corePoolSize 를 초과하여 최대로 만들 쓰레드 숫자

 

keepAliveTime

corePoolSize 를 초과하여 생성된 쓰레드가 작업을 대기할 시간. 이 시간을 초과하면 corePoolSize 를 초과한 개수의 쓰레드를 정리한다.

 

unit

keepAliveTime 의 단위

 

workQueue

모든 쓰레드가 작업 중일때 task 를 보관할 큐

 

여기서 maximumPoolSize 옵션에 유의해야할 부분이 있는데, maximumPoolSize 옵션이 사용되는 시점이다. 단순히 생각하기로는 corePoolSize 만큼 쓰레드들에게 task 를 할당하고, 이 이상 task 가 들어오면 maximumPoolSize 까지 쓰레드를 추가하며 task 를 실행시키다가 maximumPoolSize 까지 쓰레드가 꽉 찼음에도 task 가 더 추가되면 그 때부턴 workQueue 에 task 를 보관한다고 생각하기 쉽다.

 

하지만 javadoc 문서를 확인하면 이런 순서로 동작하지 않는다.


A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize()) according to the bounds set by corePoolSize (see getCorePoolSize()) and maximumPoolSize (see getMaximumPoolSize()). When a new task is submitted in method execute(java.lang.Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize(int) and setMaximumPoolSize(int).


간략히 설명하자면 corePoolSize -> maximumPoolSize -> workQueue 순으로 동작하는게 아니라, corePoolSize -> workQueue -> maximumPoolSize 로 동작한다는 얘기다. 그러므로 workQueue 의 크기를 굉장히 크게 잡거나 혹은 크기 지정을 아예 하지않는다면 쓰레드풀은 maximumPoolSize 로 절대 확장되지 않는다.

# Sample

해당 스펙을 확인해볼 수 있는 간단한 예제코드를 만들어봤다.

CyclicBarrier barrier = new CyclicBarrier(5);                                                                                             
                                                                                                                                          
Runnable r = () -> {                                                                                                                      
    try {                                                                                                                                 
        barrier.await();                                                                                                                  
    } catch (Exception e) {                                                                                                               
        e.printStackTrace();                                                                                                              
    }                                                                                                                                     
                                                                                                                                          
    System.out.println("Thread name :: " + Thread.currentThread().getName());                                                             
};                                                                                                                                        
                                                                                                                                          
var executor = new ThreadPoolExecutor(2, 5, 600, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));                                         
                                                                                                                                          
executor.execute(r);                                                                                                                      
executor.execute(r);                                                                                                                      
executor.execute(r);                                                                                                                      
executor.execute(r);                                                                                                                      
                                                                                                                                          
barrier.await();                                                                                                                          
executor.shutdown();                                                                                                                      

(CyclicBarrier 는 이전 포스팅 https://multifrontgarden.tistory.com/266 에서 다루고 있다.)

 

corePoolSize 를 2개로 하고, maximumPoolSize 는 5개로 설정했다. workQueue 의 크기는 1로 잡았다. CyclicBarrier 는 5개의 쓰레드가 await() 메서드를 호출해줘야 하므로 현재 코드상으로 execute() 를 실행한 쓰레드에서 4번, 메인 쓰레드에서 1번을 호출해주면 정상적으로 종료될 수 있다.

 

하지만 이 코드는 직접 실행해보면 종료되지 않는다.

main 쓰레드에서 1번, corePoolSize 로 설정된 2개의 쓰레드에서 각 1번씩 2번, corePoolSize 를 초과한 task 가 2개 들어와서 workQueue 를 초과했으므로 maximumPoolSize 이내에서 1개의 쓰레드를 더 만들어서 1번 총 4번의 await() 메서드가 호출되고 마지막 5번째를 호출해줘야할 마지막 task 는 workQueue 의 크기가 1이므로 큐에만 들어가고 실행되지 않기 때문에 CyclicBarrier 를 넘어갈 수 없기 때문이다.

var executor = new ThreadPoolExecutor(2, 5, 600, TimeUnit.SECONDS, new SynchronousQueue<>());

위와 같이 다른 설정은 모두 동일하고 workQueue 만 바꿔보도록 하자. SynchronousQueue 는 capacity 가 0 인 큐라고 생각하면 된다. 크기가 0 이므로 task 들을 바로바로 쓰레드에 할당한다. 실행해보면 알겠지만 정상적으로 잘 실행되고 애플리케이션이 정상 종료되는걸 확인할 수 있다.

 

# Spring 에서는?

위에서 ThreadPoolExecutor 에 대해서 알아봤지만 사실 엔터프라이즈 환경에서 ThreadPoolExecutor 를 직접 이용하는 경우는 흔치 않다. Spring Framework 가 거의 표준화되다시피 했기 때문에 추가 쓰레드를 이용할때도 ThreadPoolExecutor 를 직접 이용하기보다는 Spring 에서 제공하는 ThreadPoolTaskExecutor 를 이용하게 된다. 사실 ThreadPoolExecutor 를 보면서 corePoolSize -> workQueue -> maximumPoolSize 순서로 동작하는게 조금은 비직관적이지 않나 라고 생각했었다. 그래서 Spring 에서 인터페이스를 구현하면서 혹시나 ThreadPoolTaskExecutor 는 저 순서를 변경할 수도 있지 않을까 라는 생각에 한번 확인을 해봤었다.

 

하지만 ThreadPoolTaskExecutor 도 내부에서 ThreadPoolExecutor 를 사용하고 있기 때문에 위 스펙은 유의하면서 사용해야할 것 같다.

댓글
댓글쓰기 폼