티스토리 뷰

Java 1.5 에 추가된 concurrent 패키지엔 유용한 동기화 클래스들을 제공해준다. 이 클래스들을 이용하면 손쉽게 멀티 스레드에 안전한 코드를 작성할 수 있다. 이번 포스팅에서는 concurrent 패키지 내에 있는 3개의 동기화 클래스를 간략하게 알아보고자 한다.

 

1. CountDownLatch

Latch 의 사전적 의미는 '걸쇠' 이다. 원하는 지점에서 await() 메서드를 호출해서(걸쇠를 걸어서) 코드의 진행을 중단시키고, 다른 스레드들에서 원하는 횟수만큼 countDown() 메서드를 호출해주면 그때 비로소 코드가 진행되게 된다. 코드로 확인해보자.

 

원하는 횟수를 지정한 인스턴스를 생성한다. 해당 예제코드에서는 5를 지정했다.

CountDownLatch countDownLatch = new CountDownLatch(5);

조건이 충족되기전까지 코드 진행을 중단해야하는 지점에서 걸쇠를 걸어준다.

countDownLatch.await();

다른 스레드를 이용해서 카운트 다운을 시작한다.

countDownLatch.countDown();

당연한 얘기겠지만 countDown() 메서드를 호출하는 코드가 await() 메서드보다 아래에 있으면 원하는대로 작동하지않는다. 풀 예제코드는 아래와 같다.

public static void main(String[] args) throws Exception {              
    CountDownLatch countDownLatch = new CountDownLatch(5);             
                                                                       
    ExecutorService es = Executors.newFixedThreadPool(5);              
    for(int i = 0; i < 5; i++) {                                       
        int n = i;                                                     
        es.execute(() -> {                                             
            countDownLatch.countDown();                                
            System.out.println("order :: " + n);                       
        });                                                            
    }                                                                  
                                                                       
    countDownLatch.await();                                            
    es.shutdown();                                                     
    System.out.println("finish");
}                                                                      

메인 스레드에서 await() 메서드를 호출해서 코드를 중단시키고 5개의 스레드를 생성해 각각 countDown() 메서드를 호출한다. countDown() 메서드가 5번 호출되지않으면 await() 아래의 코드는 실행되지않는다. 이 코드는 간단한 예제코드라서 의구심을 가질수도있다. for 문의 조건을 5에서 4로 바꾸면 countDown() 이 1번 모자르게 호출되기때문에 await() 메서드 아래가 실행되지않는걸 확인할 수 있다. await() 메서드는 타임아웃을 인자로 받는데 이 타임아웃을 설정할 시에는 해당 시간동안 기다리다가 아래 코드를 실행한다.

 

2. Semaphore

세마포어는 미리 설정한 만큼의 permits를 제공한다. acquire() 메서드를 통해 permits 를 줄여나간다. permits size 를 초과해서 acquire() 메서드를 호출하면 해당 스레드는 대기상태에 들어간다. tryAcquire() 메서드를 이용하면 permits 을 획득했는지 boolean 여부로 리턴을 해주는데 이로인해 permits size 를 초과하게되면 대기상태에 들어가지않고 false 를 리턴하게된다. permits 을 반환할때는 release() 메서드를 이용한다. 세마포어를 이용해서 컬렉션의 사이즈를 제한할 수 있다.

 

class SemaphoreList<T> {
    private int permitsSize = 3;
    private Semaphore semaphore = new Semaphore(permitsSize);
    private List<T> ts = new CopyOnWriteArrayList<>();

    public boolean add(T t) throws InterruptedException {
        ts.add(t);
        semaphore.acquire();
        return true;
    }

    public boolean remove(T t) {
        var result = ts.remove(t);
        if(result) {
            semaphore.release();
        }

        return result;
    }
}

내부 세마포어의 permits 을 3으로 설정했다. 그리고 add() 메서드가 호출될때마다 acquire() 를 호출하고있고, remove() 가 실행되어야 release() 를 호출하고있다.

 

public static void main(String[] args) throws Exception {       
    SemaphoreList<String> strings = new SemaphoreList<>();      
                                                                
    strings.add("a");                                           
    strings.add("a");                                           
    strings.add("a");                                           
    strings.add("a");                                           
}                                                               

해당 세마포어 리스트에 대해 위처럼 add() 메서드를 4번 호출하게되면 스레드는 4번째에서 대기상태에 빠지고 애플리케이션은 멈추게된다.(종료가 아니다.)

public static void main(String[] args) throws Exception {     
    SemaphoreList<String> strings = new SemaphoreList<>();    
    ExecutorService es = Executors.newCachedThreadPool();     
    es.submit(() -> {                                         
        Thread.sleep(5000);                                   
        return strings.remove("a");                           
    });                                                       
                                                              
    strings.add("a");                                         
    strings.add("a");                                         
    strings.add("a");                                         
    strings.add("a");                                         
                                                              
    es.shutdown();                                            
}                                                             

 

코드를 좀 수정했다. 스레드를 새로 만들고 해당 스레드는 5초 뒤에 remove() 메서드를 호출하도록했다. 4번째 add() 메서드에서 메인스레드는 대기상태에 빠지게되지만 약 5초뒤 다른 스레드에서 remove() 가 호출된 뒤에 대기상태에서 해제되어 애플리케이션은 정상 종료된다.

 

3. CyclicBarrier

CyclicBarrier 는 어떻게 보면 CountDownLatch 랑 비슷하다. CountDownLatch 는 await() 을 호출한 스레드만 대기상태가 되고, countDown() 을 호출하는 다른 스레드들은 countDown() 메서드와 무관하게 대기상태에 빠지지않는다. await() 을 호출한 스레드가 countDown 조건을 충족하지못해 영영 대기상태에 빠지더라도 countDown() 을 호출하는 스레드들은 자기할일을 다 하게된다. CountDownLatch 예제코드의 for 문 조건을 5에서 4로 변경해서 메인 스레드가 영영 대기상태에 빠져 "finish" 를 출력하지못해도 다른 스레드들이 출력하는 "order ::" 라는 메세지는 정상 출력되는 이유가 이때문이다.

 

CyclicBarrier 는 해당 장벽에 참여하는 모든 스레드들이 대기상태에 빠진다. 그리고 조건을 충족할때 모든 스레드들이 대기에서 해제되게된다.

 

public static void main(String[] args) throws Exception {           
    CyclicBarrier cyclicBarrier = new CyclicBarrier(5);             
                                                                    
    ExecutorService es = Executors.newFixedThreadPool(4);           
    for(int i = 0; i < 4; i++) {                                    
        int n = i;                                                  
        es.submit(() -> {                                           
            cyclicBarrier.await();                                  
            System.out.println("order :: " + n);                    
            return 1;                                               
        });                                                         
    }                                                               
                                                                    
    Thread.sleep(5000);                                             
    cyclicBarrier.await();                                          
                                                                    
    es.shutdown();                                                  
    System.out.println("finish");                                   
}                                                                   

인스턴스 생성은 CountDownLatch 와 같다. 이번에도 카운트는 5로 지정했다. 스레드를 4개 만들어 4개의 스레드가 각각 출력을 하도록했다. 그리고 장벽이 해제되기까지 기다리도록했다.(await()) CyclicBarrier 의 parties 는 5로 설정했는데 스레드풀의 스레드는 4개를 만든건 CyclicBarrier 의 장벽에 메인스레드도 참여하고있기때문이다. 메인스레드가 await() 을 호출하기위해선 5초간 기다려야하는데 이 사이에 다른 스레드들의 출력은 이루어지지않는다. 메인스레드까지 await() 을 호출하고나서야 모든 출력이 정상적으로 이루어지는걸 볼 수있다. for 문의 조건을 4에서 3으로 변경한다면 그 어떤 출력문도 출력되지않는다.

 

CyclicBarrier 는 barrierAction 이라는 Runnable 타입의 인자를 하나 더 받고있는데, 쉽게 생각해볼수있듯이 장벽이 해제됐을때 실행될 코드를 전달하는 것이다.

// 이런식으로..
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("barrier breaking"));

 

마치며

하는일에 따라 다르겠지만 나같은 경우 이 3개의 클래스를 실제 비즈니스 로직에 작성할 일은 거의 없을것 같다. 하지만 동시성 이슈가 발생했을때 예제 코드로 문제를 단순화 한다거나 할때는 유용하게 사용할 수 있을 것 같다.

 

참고자료

- 자바 병렬 프로그래밍

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함