CountDownLatch와 사이드 프로젝트에서 활용-지선학
CountDownLatch란?
CountDownLatch는 지정된 횟수만큼 countDown() 메서드가 호출될 때까지 현재 스레드를 대기 상태로 만들고, 카운트가 0에 도달하면 대기 중인 스레드를 다시 실행 가능하게 만드는 동기화 도구입니다. 이를 주로 여러 개의 작업 스레드가 완료될 때까지 대기하고, 모든 작업이 끝나면 특정 스레드를 다시 실행하게 할 때 사용합니다.
Method Summary
예시코드
쇼핑몰 사이트에서 특정 사용자가 컴퓨터 부품을 주문하는 상황임
사용자가 물건 주문 요청을 하면 서버가 각각 부품의 재고 상황을 파악하고, 구매 가능하지 파악하는 로직임
사용자가 부품의 재고를 순차적으로 0초, 5초 *4(재고없는 물건) 대략 20초의 시간이 아닌, 다중쓰레드를 이용해서 대략 5초 이내로 처리 속도를 개선할 수 있음
import com.google.gson.Gson;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class ComputerPartsPurchaseSimulation {
public static void main(String[] args) throws InterruptedException {
List<String> productOrderRequests = Arrays.asList("CPU", "RAM", "Motherboard", "SSD", "HDD");
int threadCount = productOrderRequests.size();
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
log.info("서버: 구매 요청 대기 중...");
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
Queue<PurchaseResult> productOrderRequestResults = new ConcurrentLinkedQueue<>();
for (String product : productOrderRequests) {
executorService.submit(() -> {
try {
PurchaseResult response = processPurchase(product);
productOrderRequestResults.add(response);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("모든 요청 처리 완료!");
log.info("구매 요청 처리 결과 : ");
productOrderRequestResults.forEach(result -> log.info(new Gson().toJson(result)));
executorService.shutdown();
}
private static PurchaseResult processPurchase(String product) {
boolean isAvailable = checkStockAvailability(product);
return PurchaseResult.builder()
.product(product)
.status(isAvailable ? "success" : "failure")
.message(product + (isAvailable ? " 구매 성공!" : " 재고 부족으로 구매 실패"))
.build();
}
/**
* CPU만 재고가 있음, 나머지는 재고 없음 재고 없어서 오래 걸림
* @param product
* @return
*/
private static boolean checkStockAvailability(String product) {
if ("CPU".equals(product)) {
log.info("CPU 구매 처리 중... 0초 대기-{}", Thread.currentThread().getName());
return true;
}
try {
log.info("{} 구매 처리 중... 5초 대기-{}", product, Thread.currentThread().getName());
Thread.sleep(5000); // 5초 대기
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
return false;
}
@Setter
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class PurchaseResult {
private String product;
private String status;
private String message;
}
}
15:45:42.833 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- 서버: 구매 요청 대기 중...
15:45:42.841 [pool-1-thread-1] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- CPU 구매 처리 중... 0초 대기-pool-1-thread-1
15:45:42.841 [pool-1-thread-2] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- RAM 구매 처리 중... 5초 대기-pool-1-thread-2
15:45:42.841 [pool-1-thread-3] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- Motherboard 구매 처리 중... 5초 대기-pool-1-thread-3
15:45:42.842 [pool-1-thread-5] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- HDD 구매 처리 중... 5초 대기-pool-1-thread-5
15:45:42.841 [pool-1-thread-4] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- SSD 구매 처리 중... 5초 대기-pool-1-thread-4
15:45:47.847 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- 서버: 모든 요청 처리 완료!
15:45:47.848 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- 구매 결과 :
15:45:47.914 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- {"product":"CPU","status":"success","message":"CPU 구매 성공!"}
15:45:47.914 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- {"product":"RAM","status":"failure","message":"RAM 재고 부족으로 구매 실패"}
15:45:47.914 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- {"product":"Motherboard","status":"failure","message":"Motherboard 재고 부족으로 구매 실패"}
15:45:47.914 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- {"product":"HDD","status":"failure","message":"HDD 재고 부족으로 구매 실패"}
15:45:47.914 [main] INFO com.dodam.dicegame.dicegame.util.ComputerPartsPurchaseSimulation -- {"product":"SSD","status":"failure","message":"SSD 재고 부족으로 구매 실패"}
이 밖에 쓰레드 협업용 동기화 도구
CyclicBarrier
- 목적: 지정된 수의 스레드가 모두 도달할 때까지 기다린 후, 동시에 작업을 진행합니다.
- CountDownLatch와 달리, 반복적으로 사용 가능.
- 사용 사례:
- 여러 스레드가 서로 독립적인 작업을 완료한 후, 다음 단계로 함께 진행해야 할 때.
- 스레드풀에서 일정 개수의 스레드가 모였을 때 새로운 작업을 시작해야 하는 경우.
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("All threads reached the barrier. Proceeding...");
});
Runnable task = () -> {
try {
System.out.println(Thread.currentThread().getName() + " waiting at barrier");
barrier.await();
System.out.println(Thread.currentThread().getName() + " proceeding");
} catch (Exception e) {
e.printStackTrace();
}
};
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
executor.submit(task);
}
executor.shutdown();
Semaphore
- 목적: 스레드의 접근을 제어하는 데 사용되며, 허용된 작업의 개수를 제한.
- 스레드 협업에 사용할 수 있으며, 신호 용도로 활용 가능.
- 사용 사례:
- 리소스에 대한 동시 접근을 제한하고 싶을 때.
- 일정한 스레드 수가 접근해야 작업을 완료하고 신호를 보낼 때.
Semaphore semaphore = new Semaphore(3); // 허용된 동시 작업 수
Runnable task = () -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " acquired permit");
Thread.sleep(1000); // 작업 수행
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + " releasing permit");
semaphore.release();
}
};
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executor.submit(task);
}
executor.shutdown();
Phaser
- 목적: CyclicBarrier와 유사하게 단계별 동기화를 제공하며, 단계마다 참여 스레드 수를 동적으로 조정할 수 있습니다.
- CountDownLatch와 달리 단계를 명시적으로 관리하고, 재사용 가능.
- 사용 사례:
- 여러 단계로 나뉜 스레드 작업을 동기화해야 할 때.
- 단계별로 스레드의 참여를 동적으로 변경할 수 있어야 할 때.
import java.util.concurrent.Phaser;
public class PhaserTest {
/**
* 3개의 작업을 3단계로 걸쳐서 함
* @param args
*/
public static void main(String[] args) {
// Phaser 생성, 3개의 쓰레드를 등록
Phaser phaser = new Phaser(3);
// Runnable 작업 정의
Runnable task = () -> {
String threadName = Thread.currentThread().getName();
// 1단계 작업
System.out.println(threadName + " completed Phase 1");
phaser.arriveAndAwaitAdvance(); // 1단계 완료 후 대기
// 2단계 작업
System.out.println(threadName + " completed Phase 2");
phaser.arriveAndAwaitAdvance(); // 2단계 완료 후 대기
// 3단계 작업
System.out.println(threadName + " completed Phase 3");
phaser.arriveAndAwaitAdvance(); // 3단계 완료 후 대기*/
System.out.println(threadName + " finished all phases");
};
// 3개의 쓰레드 생성 및 실행
Thread t1 = new Thread(task, "Thread-1");
Thread t2 = new Thread(task, "Thread-2");
Thread t3 = new Thread(task, "Thread-3");
t1.start();
t2.start();
t3.start();
}
}
Exchanger
- 목적: 두 스레드가 데이터를 교환하는 데 사용.
- 사용 사례:
- 한 스레드가 데이터를 생성하고 다른 스레드가 이를 소비하는 생산자-소비자 패턴.
- 두 스레드 간의 데이터 교환이 필요한 경우.
Exchanger<String> exchanger = new Exchanger<>();
Runnable producer = () -> {
try {
String data = "Data from producer";
System.out.println("Producer: Sending data");
String response = exchanger.exchange(data);
System.out.println("Producer: Received response - " + response);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable consumer = () -> {
try {
String data = exchanger.exchange(null);
System.out.println("Consumer: Received data - " + data);
exchanger.exchange("Acknowledgement from consumer");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(producer);
executor.submit(consumer);
executor.shutdown();
SynchronousQueue
- 목적: 한 번에 하나의 작업만 전송하고 수신되도록 동기화된 대기열.
- 생산자가 데이터를 큐에 넣으려면 소비자가 즉시 데이터를 받아야 함.
- 사용 사례:
- 스레드 간의 단방향 데이터 교환.
SynchronousQueue<String> queue = new SynchronousQueue<>();
Runnable producer = () -> {
try {
System.out.println("Producer: Sending data");
queue.put("Data from producer");
System.out.println("Producer: Data sent");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable consumer = () -> {
try {
System.out.println("Consumer: Waiting for data");
String data = queue.take();
System.out.println("Consumer: Received data - " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(producer);
executor.submit(consumer);
executor.shutdown();
'[개발관련] > JAVA' 카테고리의 다른 글
[이펙티브 자바] 아이템 12. toString을 항상 재정의하라 (0) | 2023.10.20 |
---|---|
[이펙티브 자바] 아이템 11. equals를 재정의하려거든 hashCode도 재정의하라 (0) | 2023.10.18 |
[이펙티브 자바] 아이템 09. try-finally보다는 try-with-resources를 사용하라. (0) | 2023.10.17 |
[이펙티브 자바] 아이템 08. finalizer와 cleaner 사용을 피하라 (2) | 2023.10.16 |
[이펙티브 자바] 아이템 07. 다 쓴 객체 참조를 해제하라 (0) | 2023.09.10 |