반응형

ForkJoinPool은 병렬 처리를 최적화하기 위해 Java 7에서 도입된 고급 멀티스레딩 도구로, 큰 작업을 작은 작업으로 분할(Fork)하고 여러 스레드에서 동시에 처리하여 그 결과를 합친(Join) 후 반환하는 방식으로 동작합니다. 주로 대규모 작업을 병렬로 처리할 때 성능을 최적화하는 데 사용됩니다.

1️⃣ 기본 개념

  • Fork: 큰 작업을 작은 작업으로 나누어 병렬 처리.
  • Join: 병렬로 처리된 작은 작업들을 다시 합쳐 최종 결과 생성.

ForkJoinPool은 분할-정복 알고리즘(divide and conquer)을 사용하여 대규모 작업을 효율적으로 처리합니다. 이를 통해 병렬성을 극대화하고, 여러 코어를 활용하여 성능을 향상시킬 수 있습니다.

2️⃣ ForkJoinPool 사용 이유

✅ 병렬 작업을 쉽게 처리

여러 개의 작은 작업들을 여러 스레드에서 동시에 처리할 수 있게 해줍니다. 일반적인 스레드 풀을 사용할 때보다 효율적으로 자원을 관리할 수 있습니다.

✅ 분할-정복 패턴

큰 작업을 여러 개의 작은 작업으로 나누어 동시에 처리하고, 그 결과를 합칩니다. 예: 병렬 계산, 대규모 데이터 처리 등.

3️⃣ ForkJoinPool 구조

ForkJoinPool은 기본적으로 두 가지 작업을 처리하는 방식으로 설계되었습니다:

  1. Work Stealing: 각 스레드는 자신의 작업 큐를 가집니다. 만약 스레드가 자신의 작업을 다 끝내면, 다른 스레드의 큐에서 작업을 빼와서 처리할 수 있습니다. 이 방법은 로드 밸런싱을 자연스럽게 제공합니다.
  2. RecursiveTask (결과를 반환하는 작업): 반환값이 있는 작업을 할 때 사용됩니다. 예: ForkJoinTask에서 fork()로 작업을 분할하고 join()으로 결과를 합칩니다.
  3. RecursiveAction (결과를 반환하지 않는 작업): 반환값이 없는 작업을 할 때 사용됩니다. 예: 데이터 변환, 파일 쓰기 등.

4️⃣ ForkJoinPool 클래스

ForkJoinPool은 java.util.concurrent 패키지의 클래스이며, 스레드를 동적으로 관리하고 작업을 분할하여 효율적으로 실행합니다. 기본적으로 fork와 join 작업을 관리하는 작업 큐를 사용합니다.

기본 사용 방법:


import java.util.concurrent.*;

public class ForkJoinExample {
    public static void main(String[] args) {
        // ForkJoinPool 생성
        ForkJoinPool pool = new ForkJoinPool();

        // 작업 제출
        RecursiveTask task = new MyRecursiveTask(10);
        Integer result = pool.invoke(task);  // 작업 실행 및 결과 대기

        // 결과 출력
        System.out.println("Result: " + result);
    }
}

class MyRecursiveTask extends RecursiveTask {
    private int n;

    public MyRecursiveTask(int n) {
        this.n = n;
    }

    @Override
    protected Integer compute() {
        if (n <= 1) {
            return 1;
        }

        MyRecursiveTask task1 = new MyRecursiveTask(n - 1);
        task1.fork();  // task1을 병렬로 실행

        return n * task1.join();  // 결과를 합쳐서 반환
    }
}

설명:

  • ForkJoinPool pool = new ForkJoinPool(); - ForkJoinPool 인스턴스를 생성하여, 작업을 이 풀에 제출할 수 있습니다.
  • RecursiveTask task = new MyRecursiveTask(10); - RecursiveTask를 상속한 작업을 정의. 여기서는 1부터 10까지의 팩토리얼을 계산하는 예시입니다.
  • task.fork(); - fork() 메서드를 사용하여 작업을 병렬로 실행합니다.
  • task.join(); - join() 메서드를 사용하여 작업의 결과를 기다리며 받습니다.

5️⃣ ForkJoinPool 주요 메서드

  • fork(): 작업을 비동기적으로 실행시킵니다. 작업이 즉시 시작되며, 현재 스레드는 계속 실행됩니다.
  • join(): 작업이 완료될 때까지 결과를 기다립니다. fork()로 실행된 작업의 결과를 기다리며 받습니다.
  • invoke(): fork()와 join()을 결합한 메서드입니다. 작업을 제출하고, 결과가 반환될 때까지 기다립니다.

6️⃣ ForkJoinPool 장점

✅ 성능 최적화

CPU 코어를 효율적으로 활용하여 병렬 처리가 가능합니다. 작은 작업을 분할하고 동시에 처리하여 성능 향상.

✅ 동적 스레드 관리

동적 스레드 할당 및 작업 큐 관리가 효율적으로 이루어집니다. 필요시 다른 스레드가 대기 중인 작업을 처리할 수 있습니다.

✅ 스케일링 (Scale-out)

ForkJoinPool은 스레드 풀의 크기를 동적으로 조정하면서 여러 작업을 처리할 수 있기 때문에, 대규모 데이터 처리 및 병렬 작업에서 강력한 성능을 발휘합니다.

7️⃣ ForkJoinPool 사용 예시

팩토리얼 계산 예시 (재귀적으로 분할)


import java.util.concurrent.*;

public class ForkJoinExample {
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        FactorialTask task = new FactorialTask(10);
        Integer result = pool.invoke(task);
        System.out.println("Factorial of 10 is: " + result);
    }
}

class FactorialTask extends RecursiveTask {
    private final int n;

    FactorialTask(int n) {
        this.n = n;
    }

    @Override
    protected Integer compute() {
        if (n == 1) {
            return 1;
        } else {
            FactorialTask subTask = new FactorialTask(n - 1);
            subTask.fork();  // 재귀적으로 작업 분할
            return n * subTask.join();  // 하위 작업의 결과를 합산
        }
    }
}

8️⃣ 결론

ForkJoinPool은 병렬 처리에 최적화된 Java 클래스입니다. 작은 작업들을 병렬로 처리하여 성능을 극대화합니다. 특히 대규모 데이터 처리나 분할-정복 알고리즘을 사용할 때 매우 유용하며, 동적 스레드 관리와 작업 큐 최적화 덕분에 효율적으로 멀티스레딩 작업을 처리할 수 있습니다.

728x90
반응형

자바에서 람다 표현식은 익명 함수를 간결하게 표현할 수 있는 기능입니다. 람다 표현식을 사용하면 코드가 더 직관적이고 간결해지며, 특히 함수형 인터페이스를 사용할 때 유용합니다.

1. 람다 표현식 기본 구조

(매개변수들) -> { 실행할 코드 }
    
  • 매개변수들: 람다식에 전달되는 입력 값들.
  • 실행할 코드: 람다식이 실행할 코드 블록.

2. 예시

1) 매개변수가 없는 람다

() -> System.out.println("Hello, World!");
    

이 예시는 매개변수가 없고, "Hello, World!"를 출력하는 람다식입니다.

2) 매개변수가 하나인 람다

x -> x * x
    

매개변수 x를 받아서 x의 제곱을 반환하는 람다식입니다.

3) 매개변수가 여러 개인 람다

(x, y) -> x + y
    

매개변수 x와 y를 받아서 두 값을 더하는 람다식입니다.

3. 람다 표현식 사용 예시

람다 표현식은 주로 함수형 인터페이스에서 사용됩니다. 예를 들어, Runnable 인터페이스를 사용한 예시는 다음과 같습니다:

Runnable task = () -> System.out.println("Hello from a thread!");
task.run();
    

4. 람다 표현식의 장점

  • 간결한 코드: 익명 클래스를 사용하는 것보다 코드가 훨씬 간결해집니다.
  • 가독성 향상: 불필요한 boilerplate 코드가 줄어들어 가독성이 향상됩니다.
  • 함수형 프로그래밍: 자바에서 함수형 프로그래밍 스타일을 사용할 수 있습니다.

5. 함수형 인터페이스

람다 표현식은 함수형 인터페이스와 함께 사용됩니다. 함수형 인터페이스는 하나의 추상 메서드만을 가지는 인터페이스를 말합니다. 예시로 Runnable, Callable, Comparator 등이 있습니다.

@FunctionalInterface
public interface MyFunction {
    void apply();
}
    

위 예시에서 MyFunction 인터페이스는 하나의 추상 메서드를 가지고 있으며, 람다로 구현할 수 있습니다.

MyFunction myFunc = () -> System.out.println("Hello from MyFunction!");
myFunc.apply();
    

6. 람다 표현식의 제한 사항

  • final 또는 effectively final 변수만 사용 가능: 람다식 내에서 참조되는 변수는 반드시 final이거나 사실상 final이어야 합니다.
  • 상태 변경이 어려움: 람다식 내부에서 외부 변수를 변경할 수 없으므로 상태 관리에 제한이 있을 수 있습니다.

7. 결론

람다 표현식은 자바에서 함수형 프로그래밍을 보다 쉽게 구현할 수 있게 해 주는 강력한 도구입니다. 간결한 코드 작성, 가독성 향상 및 함수형 프로그래밍 스타일을 지원하는 데 큰 장점이 있습니다.

728x90
반응형

CountDownLatch와 사이드 프로젝트에서 활용-지선학

CountDownLatch란?

CountDownLatch는 지정된 횟수만큼 countDown() 메서드가 호출될 때까지 현재 스레드를 대기 상태로 만들고, 카운트가 0에 도달하면 대기 중인 스레드를 다시 실행 가능하게 만드는 동기화 도구입니다. 이를 주로 여러 개의 작업 스레드가 완료될 때까지 대기하고, 모든 작업이 끝나면 특정 스레드를 다시 실행하게 할 때 사용합니다.

Method Summary

예시코드

쇼핑몰 사이트에서 특정 사용자가 컴퓨터 부품을 주문하는 상황임

사용자가 물건 주문 요청을 하면 서버가 각각 부품의 재고 상황을 파악하고, 구매 가능하지 파악하는 로직임

사용자가 부품의 재고를 순차적으로 0초, 5초 *4(재고없는 물건) 대략 20초의 시간이 아닌, 다중쓰레드를 이용해서 대략 5초 이내로 처리 속도를 개선할 수 있음

import com.google.gson.Gson;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class ComputerPartsPurchaseSimulation {

    public static void main(String[] args) throws InterruptedException {

        List<String> productOrderRequests = Arrays.asList("CPU", "RAM", "Motherboard", "SSD", "HDD");

        int threadCount = productOrderRequests.size();
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        log.info("서버: 구매 요청 대기 중...");

        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        Queue<PurchaseResult> productOrderRequestResults = new ConcurrentLinkedQueue<>();
        for (String product : productOrderRequests) {
            executorService.submit(() -> {
                try {
                    PurchaseResult response = processPurchase(product);
                    productOrderRequestResults.add(response);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }

        countDownLatch.await();

        log.info("모든 요청 처리 완료!");
        log.info("구매 요청 처리 결과 : ");
        productOrderRequestResults.forEach(result -> log.info(new Gson().toJson(result)));

        executorService.shutdown();
    }

    private static PurchaseResult processPurchase(String product) {
        boolean isAvailable = checkStockAvailability(product);
        return PurchaseResult.builder()
                .product(product)
                .status(isAvailable ? "success" : "failure")
                .message(product + (isAvailable ? " 구매 성공!" : " 재고 부족으로 구매 실패"))
                .build();
    }

    /**
     * CPU만 재고가 있음, 나머지는 재고 없음 재고 없어서 오래 걸림
     * @param product
     * @return
     */
    private static boolean checkStockAvailability(String product) {
        if ("CPU".equals(product)) {
            log.info("CPU 구매 처리 중... 0초 대기-{}", Thread.currentThread().getName());
            return true;
        }

        try {
            log.info("{} 구매 처리 중... 5초 대기-{}", product, Thread.currentThread().getName());
            Thread.sleep(5000);  // 5초 대기
        } catch (InterruptedException interruptedException) {
            interruptedException.printStackTrace();
        }
        return false;
    }

    @Setter
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    public static class PurchaseResult {
        private String product;
        private String status;
        private String message;
    }

}
15:45:42.833 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- 서버: 구매 요청 대기 중...
15:45:42.841 [pool-1-thread-1] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- CPU 구매 처리 중... 0초 대기-pool-1-thread-1
15:45:42.841 [pool-1-thread-2] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- RAM 구매 처리 중... 5초 대기-pool-1-thread-2
15:45:42.841 [pool-1-thread-3] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- Motherboard 구매 처리 중... 5초 대기-pool-1-thread-3
15:45:42.842 [pool-1-thread-5] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- HDD 구매 처리 중... 5초 대기-pool-1-thread-5
15:45:42.841 [pool-1-thread-4] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- SSD 구매 처리 중... 5초 대기-pool-1-thread-4
15:45:47.847 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- 서버: 모든 요청 처리 완료!
15:45:47.848 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- 구매 결과 : 
15:45:47.914 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- {"product":"CPU","status":"success","message":"CPU 구매 성공!"}
15:45:47.914 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- {"product":"RAM","status":"failure","message":"RAM 재고 부족으로 구매 실패"}
15:45:47.914 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- {"product":"Motherboard","status":"failure","message":"Motherboard 재고 부족으로 구매 실패"}
15:45:47.914 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- {"product":"HDD","status":"failure","message":"HDD 재고 부족으로 구매 실패"}
15:45:47.914 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- {"product":"SSD","status":"failure","message":"SSD 재고 부족으로 구매 실패"}

이 밖에 쓰레드 협업용 동기화 도구

CyclicBarrier

  • 목적: 지정된 수의 스레드가 모두 도달할 때까지 기다린 후, 동시에 작업을 진행합니다.
  • CountDownLatch와 달리, 반복적으로 사용 가능.
  • 사용 사례:
    • 여러 스레드가 서로 독립적인 작업을 완료한 후, 다음 단계로 함께 진행해야 할 때.
    • 스레드풀에서 일정 개수의 스레드가 모였을 때 새로운 작업을 시작해야 하는 경우.
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("All threads reached the barrier. Proceeding...");
});

Runnable task = () -> {
    try {
        System.out.println(Thread.currentThread().getName() + " waiting at barrier");
        barrier.await();
        System.out.println(Thread.currentThread().getName() + " proceeding");
    } catch (Exception e) {
        e.printStackTrace();
    }
};

ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
    executor.submit(task);
}
executor.shutdown();

Semaphore

  • 목적: 스레드의 접근을 제어하는 데 사용되며, 허용된 작업의 개수를 제한.
  • 스레드 협업에 사용할 수 있으며, 신호 용도로 활용 가능.
  • 사용 사례:
    • 리소스에 대한 동시 접근을 제한하고 싶을 때.
    • 일정한 스레드 수가 접근해야 작업을 완료하고 신호를 보낼 때.
Semaphore semaphore = new Semaphore(3); // 허용된 동시 작업 수

Runnable task = () -> {
    try {
        semaphore.acquire();
        System.out.println(Thread.currentThread().getName() + " acquired permit");
        Thread.sleep(1000); // 작업 수행
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        System.out.println(Thread.currentThread().getName() + " releasing permit");
        semaphore.release();
    }
};

ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
    executor.submit(task);
}
executor.shutdown();

Phaser

  • 목적: CyclicBarrier와 유사하게 단계별 동기화를 제공하며, 단계마다 참여 스레드 수를 동적으로 조정할 수 있습니다.
  • CountDownLatch와 달리 단계를 명시적으로 관리하고, 재사용 가능.
  • 사용 사례:
    • 여러 단계로 나뉜 스레드 작업을 동기화해야 할 때.
    • 단계별로 스레드의 참여를 동적으로 변경할 수 있어야 할 때.
import java.util.concurrent.Phaser;

public class PhaserTest {

    /**
     * 3개의 작업을 3단계로 걸쳐서 함
     * @param args
     */
    public static void main(String[] args) {
        // Phaser 생성, 3개의 쓰레드를 등록
        Phaser phaser = new Phaser(3);

        // Runnable 작업 정의
        Runnable task = () -> {
            String threadName = Thread.currentThread().getName();

            // 1단계 작업
            System.out.println(threadName + " completed Phase 1");
            phaser.arriveAndAwaitAdvance(); // 1단계 완료 후 대기

            // 2단계 작업
            System.out.println(threadName + " completed Phase 2");
            phaser.arriveAndAwaitAdvance(); // 2단계 완료 후 대기

            // 3단계 작업
            System.out.println(threadName + " completed Phase 3");
            phaser.arriveAndAwaitAdvance(); // 3단계 완료 후 대기*/

            System.out.println(threadName + " finished all phases");
        };

        // 3개의 쓰레드 생성 및 실행
        Thread t1 = new Thread(task, "Thread-1");
        Thread t2 = new Thread(task, "Thread-2");
        Thread t3 = new Thread(task, "Thread-3");

        t1.start();
        t2.start();
        t3.start();
    }
}

Exchanger

  • 목적: 두 스레드가 데이터를 교환하는 데 사용.
  • 사용 사례:
    • 한 스레드가 데이터를 생성하고 다른 스레드가 이를 소비하는 생산자-소비자 패턴.
    • 두 스레드 간의 데이터 교환이 필요한 경우.
Exchanger<String> exchanger = new Exchanger<>();

Runnable producer = () -> {
    try {
        String data = "Data from producer";
        System.out.println("Producer: Sending data");
        String response = exchanger.exchange(data);
        System.out.println("Producer: Received response - " + response);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Runnable consumer = () -> {
    try {
        String data = exchanger.exchange(null);
        System.out.println("Consumer: Received data - " + data);
        exchanger.exchange("Acknowledgement from consumer");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(producer);
executor.submit(consumer);
executor.shutdown();

SynchronousQueue

  • 목적: 한 번에 하나의 작업만 전송하고 수신되도록 동기화된 대기열.
  • 생산자가 데이터를 큐에 넣으려면 소비자가 즉시 데이터를 받아야 함.
  • 사용 사례:
    • 스레드 간의 단방향 데이터 교환.
SynchronousQueue<String> queue = new SynchronousQueue<>();

Runnable producer = () -> {
    try {
        System.out.println("Producer: Sending data");
        queue.put("Data from producer");
        System.out.println("Producer: Data sent");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Runnable consumer = () -> {
    try {
        System.out.println("Consumer: Waiting for data");
        String data = queue.take();
        System.out.println("Consumer: Received data - " + data);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(producer);
executor.submit(consumer);
executor.shutdown();
728x90

+ Recent posts