ForkJoinPool은 병렬 처리를 최적화하기 위해 Java 7에서 도입된 고급 멀티스레딩 도구로, 큰 작업을 작은 작업으로 분할(Fork)하고 여러 스레드에서 동시에 처리하여 그 결과를 합친(Join) 후 반환하는 방식으로 동작합니다. 주로 대규모 작업을 병렬로 처리할 때 성능을 최적화하는 데 사용됩니다.
1️⃣ 기본 개념
Fork: 큰 작업을 작은 작업으로 나누어 병렬 처리.
Join: 병렬로 처리된 작은 작업들을 다시 합쳐 최종 결과 생성.
ForkJoinPool은 분할-정복 알고리즘(divide and conquer)을 사용하여 대규모 작업을 효율적으로 처리합니다. 이를 통해 병렬성을 극대화하고, 여러 코어를 활용하여 성능을 향상시킬 수 있습니다.
2️⃣ ForkJoinPool 사용 이유
✅ 병렬 작업을 쉽게 처리
여러 개의 작은 작업들을 여러 스레드에서 동시에 처리할 수 있게 해줍니다. 일반적인 스레드 풀을 사용할 때보다 효율적으로 자원을 관리할 수 있습니다.
✅ 분할-정복 패턴
큰 작업을 여러 개의 작은 작업으로 나누어 동시에 처리하고, 그 결과를 합칩니다. 예: 병렬 계산, 대규모 데이터 처리 등.
3️⃣ ForkJoinPool 구조
ForkJoinPool은 기본적으로 두 가지 작업을 처리하는 방식으로 설계되었습니다:
Work Stealing: 각 스레드는 자신의 작업 큐를 가집니다. 만약 스레드가 자신의 작업을 다 끝내면, 다른 스레드의 큐에서 작업을 빼와서 처리할 수 있습니다. 이 방법은 로드 밸런싱을 자연스럽게 제공합니다.
RecursiveTask (결과를 반환하는 작업): 반환값이 있는 작업을 할 때 사용됩니다. 예: ForkJoinTask에서 fork()로 작업을 분할하고 join()으로 결과를 합칩니다.
RecursiveAction (결과를 반환하지 않는 작업): 반환값이 없는 작업을 할 때 사용됩니다. 예: 데이터 변환, 파일 쓰기 등.
4️⃣ ForkJoinPool 클래스
ForkJoinPool은 java.util.concurrent 패키지의 클래스이며, 스레드를 동적으로 관리하고 작업을 분할하여 효율적으로 실행합니다. 기본적으로 fork와 join 작업을 관리하는 작업 큐를 사용합니다.
기본 사용 방법:
import java.util.concurrent.*;
public class ForkJoinExample {
public static void main(String[] args) {
// ForkJoinPool 생성
ForkJoinPool pool = new ForkJoinPool();
// 작업 제출
RecursiveTask task = new MyRecursiveTask(10);
Integer result = pool.invoke(task); // 작업 실행 및 결과 대기
// 결과 출력
System.out.println("Result: " + result);
}
}
class MyRecursiveTask extends RecursiveTask {
private int n;
public MyRecursiveTask(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return 1;
}
MyRecursiveTask task1 = new MyRecursiveTask(n - 1);
task1.fork(); // task1을 병렬로 실행
return n * task1.join(); // 결과를 합쳐서 반환
}
}
설명:
ForkJoinPool pool = new ForkJoinPool(); - ForkJoinPool 인스턴스를 생성하여, 작업을 이 풀에 제출할 수 있습니다.
RecursiveTask task = new MyRecursiveTask(10); - RecursiveTask를 상속한 작업을 정의. 여기서는 1부터 10까지의 팩토리얼을 계산하는 예시입니다.
task.fork(); - fork() 메서드를 사용하여 작업을 병렬로 실행합니다.
task.join(); - join() 메서드를 사용하여 작업의 결과를 기다리며 받습니다.
5️⃣ ForkJoinPool 주요 메서드
fork(): 작업을 비동기적으로 실행시킵니다. 작업이 즉시 시작되며, 현재 스레드는 계속 실행됩니다.
join(): 작업이 완료될 때까지 결과를 기다립니다. fork()로 실행된 작업의 결과를 기다리며 받습니다.
invoke(): fork()와 join()을 결합한 메서드입니다. 작업을 제출하고, 결과가 반환될 때까지 기다립니다.
6️⃣ ForkJoinPool 장점
✅ 성능 최적화
CPU 코어를 효율적으로 활용하여 병렬 처리가 가능합니다. 작은 작업을 분할하고 동시에 처리하여 성능 향상.
✅ 동적 스레드 관리
동적 스레드 할당 및 작업 큐 관리가 효율적으로 이루어집니다. 필요시 다른 스레드가 대기 중인 작업을 처리할 수 있습니다.
✅ 스케일링 (Scale-out)
ForkJoinPool은 스레드 풀의 크기를 동적으로 조정하면서 여러 작업을 처리할 수 있기 때문에, 대규모 데이터 처리 및 병렬 작업에서 강력한 성능을 발휘합니다.
7️⃣ ForkJoinPool 사용 예시
팩토리얼 계산 예시 (재귀적으로 분할)
import java.util.concurrent.*;
public class ForkJoinExample {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
FactorialTask task = new FactorialTask(10);
Integer result = pool.invoke(task);
System.out.println("Factorial of 10 is: " + result);
}
}
class FactorialTask extends RecursiveTask {
private final int n;
FactorialTask(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n == 1) {
return 1;
} else {
FactorialTask subTask = new FactorialTask(n - 1);
subTask.fork(); // 재귀적으로 작업 분할
return n * subTask.join(); // 하위 작업의 결과를 합산
}
}
}
8️⃣ 결론
ForkJoinPool은 병렬 처리에 최적화된 Java 클래스입니다. 작은 작업들을 병렬로 처리하여 성능을 극대화합니다. 특히 대규모 데이터 처리나 분할-정복 알고리즘을 사용할 때 매우 유용하며, 동적 스레드 관리와 작업 큐 최적화 덕분에 효율적으로 멀티스레딩 작업을 처리할 수 있습니다.
CountDownLatch는 지정된 횟수만큼 countDown() 메서드가 호출될 때까지 현재 스레드를 대기 상태로 만들고, 카운트가 0에 도달하면 대기 중인 스레드를 다시 실행 가능하게 만드는 동기화 도구입니다. 이를 주로 여러 개의 작업 스레드가 완료될 때까지 대기하고, 모든 작업이 끝나면 특정 스레드를 다시 실행하게 할 때 사용합니다.
Method Summary
예시코드
쇼핑몰 사이트에서 특정 사용자가 컴퓨터 부품을 주문하는 상황임
사용자가 물건 주문 요청을 하면 서버가 각각 부품의 재고 상황을 파악하고, 구매 가능하지 파악하는 로직임
사용자가 부품의 재고를 순차적으로 0초, 5초 *4(재고없는 물건) 대략 20초의 시간이 아닌, 다중쓰레드를 이용해서 대략 5초 이내로 처리 속도를 개선할 수 있음
15:45:42.833 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- 서버: 구매 요청 대기 중...
15:45:42.841 [pool-1-thread-1] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- CPU 구매 처리 중... 0초 대기-pool-1-thread-1
15:45:42.841 [pool-1-thread-2] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- RAM 구매 처리 중... 5초 대기-pool-1-thread-2
15:45:42.841 [pool-1-thread-3] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- Motherboard 구매 처리 중... 5초 대기-pool-1-thread-3
15:45:42.842 [pool-1-thread-5] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- HDD 구매 처리 중... 5초 대기-pool-1-thread-5
15:45:42.841 [pool-1-thread-4] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- SSD 구매 처리 중... 5초 대기-pool-1-thread-4
15:45:47.847 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- 서버: 모든 요청 처리 완료!
15:45:47.848 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- 구매 결과 :
15:45:47.914 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- {"product":"CPU","status":"success","message":"CPU 구매 성공!"}
15:45:47.914 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- {"product":"RAM","status":"failure","message":"RAM 재고 부족으로 구매 실패"}
15:45:47.914 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- {"product":"Motherboard","status":"failure","message":"Motherboard 재고 부족으로 구매 실패"}
15:45:47.914 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- {"product":"HDD","status":"failure","message":"HDD 재고 부족으로 구매 실패"}
15:45:47.914 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- {"product":"SSD","status":"failure","message":"SSD 재고 부족으로 구매 실패"}
이 밖에 쓰레드 협업용 동기화 도구
CyclicBarrier
목적: 지정된 수의 스레드가 모두 도달할 때까지 기다린 후, 동시에 작업을 진행합니다.
CountDownLatch와 달리, 반복적으로 사용 가능.
사용 사례:
여러 스레드가 서로 독립적인 작업을 완료한 후, 다음 단계로 함께 진행해야 할 때.
스레드풀에서 일정 개수의 스레드가 모였을 때 새로운 작업을 시작해야 하는 경우.
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("All threads reached the barrier. Proceeding...");
});
Runnable task = () -> {
try {
System.out.println(Thread.currentThread().getName() + " waiting at barrier");
barrier.await();
System.out.println(Thread.currentThread().getName() + " proceeding");
} catch (Exception e) {
e.printStackTrace();
}
};
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
executor.submit(task);
}
executor.shutdown();
Semaphore
목적: 스레드의 접근을 제어하는 데 사용되며, 허용된 작업의 개수를 제한.
스레드 협업에 사용할 수 있으며, 신호 용도로 활용 가능.
사용 사례:
리소스에 대한 동시 접근을 제한하고 싶을 때.
일정한 스레드 수가 접근해야 작업을 완료하고 신호를 보낼 때.
Semaphore semaphore = new Semaphore(3); // 허용된 동시 작업 수
Runnable task = () -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " acquired permit");
Thread.sleep(1000); // 작업 수행
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + " releasing permit");
semaphore.release();
}
};
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executor.submit(task);
}
executor.shutdown();
Phaser
목적: CyclicBarrier와 유사하게 단계별 동기화를 제공하며, 단계마다 참여 스레드 수를 동적으로 조정할 수 있습니다.
CountDownLatch와 달리 단계를 명시적으로 관리하고, 재사용 가능.
사용 사례:
여러 단계로 나뉜 스레드 작업을 동기화해야 할 때.
단계별로 스레드의 참여를 동적으로 변경할 수 있어야 할 때.
import java.util.concurrent.Phaser;
public class PhaserTest {
/**
* 3개의 작업을 3단계로 걸쳐서 함
* @param args
*/
public static void main(String[] args) {
// Phaser 생성, 3개의 쓰레드를 등록
Phaser phaser = new Phaser(3);
// Runnable 작업 정의
Runnable task = () -> {
String threadName = Thread.currentThread().getName();
// 1단계 작업
System.out.println(threadName + " completed Phase 1");
phaser.arriveAndAwaitAdvance(); // 1단계 완료 후 대기
// 2단계 작업
System.out.println(threadName + " completed Phase 2");
phaser.arriveAndAwaitAdvance(); // 2단계 완료 후 대기
// 3단계 작업
System.out.println(threadName + " completed Phase 3");
phaser.arriveAndAwaitAdvance(); // 3단계 완료 후 대기*/
System.out.println(threadName + " finished all phases");
};
// 3개의 쓰레드 생성 및 실행
Thread t1 = new Thread(task, "Thread-1");
Thread t2 = new Thread(task, "Thread-2");
Thread t3 = new Thread(task, "Thread-3");
t1.start();
t2.start();
t3.start();
}
}
Exchanger
목적: 두 스레드가 데이터를 교환하는 데 사용.
사용 사례:
한 스레드가 데이터를 생성하고 다른 스레드가 이를 소비하는 생산자-소비자 패턴.
두 스레드 간의 데이터 교환이 필요한 경우.
Exchanger<String> exchanger = new Exchanger<>();
Runnable producer = () -> {
try {
String data = "Data from producer";
System.out.println("Producer: Sending data");
String response = exchanger.exchange(data);
System.out.println("Producer: Received response - " + response);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable consumer = () -> {
try {
String data = exchanger.exchange(null);
System.out.println("Consumer: Received data - " + data);
exchanger.exchange("Acknowledgement from consumer");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(producer);
executor.submit(consumer);
executor.shutdown();
SynchronousQueue
목적: 한 번에 하나의 작업만 전송하고 수신되도록 동기화된 대기열.
생산자가 데이터를 큐에 넣으려면 소비자가 즉시 데이터를 받아야 함.
사용 사례:
스레드 간의 단방향 데이터 교환.
SynchronousQueue<String> queue = new SynchronousQueue<>();
Runnable producer = () -> {
try {
System.out.println("Producer: Sending data");
queue.put("Data from producer");
System.out.println("Producer: Data sent");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable consumer = () -> {
try {
System.out.println("Consumer: Waiting for data");
String data = queue.take();
System.out.println("Consumer: Received data - " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(producer);
executor.submit(consumer);
executor.shutdown();