| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | |||||
| 3 | 4 | 5 | 6 | 7 | 8 | 9 |
| 10 | 11 | 12 | 13 | 14 | 15 | 16 |
| 17 | 18 | 19 | 20 | 21 | 22 | 23 |
| 24 | 25 | 26 | 27 | 28 | 29 | 30 |
| 31 |
- 탐색
- 플러스 백엔드
- 삽입
- Spring
- JPA
- 코딩테스트
- docker
- Kafka
- 아키텍처
- jre
- EDA
- Java
- Gradle
- Unity
- 알고리즘
- 티스토리챌린지
- MSA
- redis
- code blocks
- 프로그래머스
- 이진트리
- 연습문제
- 트리
- stack
- 백준
- 오블완
- bean
- Kotlin
- jdk
- event
- Today
- Total
Repository
5단계: Java 동시성 & 멀티스레딩 본문
5단계: Java 동시성 & 멀티스레딩
멀티스레딩은 Java의 가장 강력하면서도 위험한 기능입니다. 4년간의 실무에서 겪은 수많은 동시성 버그와 그 해결 과정, 그리고 안전한 멀티스레드 프로그래밍의 핵심을 담았습니다.
5.1 스레드 기초
Java 멀티스레딩 입문
Thread vs Runnable
Thread 클래스 상속
public class ThreadExample {
// 방법 1: Thread 클래스 상속
static class MyThread extends Thread {
private String name;
public MyThread(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(name + ": " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
public static void main(String[] args) {
MyThread thread1 = new MyThread("Thread-1");
MyThread thread2 = new MyThread("Thread-2");
thread1.start(); // 새 스레드에서 run() 실행
thread2.start();
// ❌ thread1.run(); // 현재 스레드에서 실행 (멀티스레딩 X)
}
}
Runnable 인터페이스 구현
public class RunnableExample {
// 방법 2: Runnable 인터페이스 구현
static class MyRunnable implements Runnable {
private String name;
public MyRunnable(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(name + ": " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
public static void main(String[] args) {
Thread thread1 = new Thread(new MyRunnable("Runnable-1"));
Thread thread2 = new Thread(new MyRunnable("Runnable-2"));
thread1.start();
thread2.start();
}
}
람다 표현식 사용
public class LambdaThreadExample {
public static void main(String[] args) {
// 방법 3: 람다 표현식 (Java 8+)
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("Lambda-1: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
// 방법 4: 메서드 레퍼런스
Thread thread2 = new Thread(LambdaThreadExample::doWork);
thread1.start();
thread2.start();
}
private static void doWork() {
System.out.println("Method reference thread");
}
}
Thread vs Runnable 비교
/**
* Thread vs Runnable 선택 가이드:
*
* Runnable 사용 (권장):
* ✅ 다른 클래스를 상속받아야 할 때
* ✅ 코드 재사용성
* ✅ 객체지향적 설계
* ✅ 람다 표현식 사용 가능
*
* Thread 사용:
* ✅ Thread 클래스의 메서드 오버라이드 필요
* ✅ 간단한 테스트 코드
*/
public class ThreadVsRunnable {
// ❌ Thread 상속: 다른 클래스 상속 불가
class Worker extends Thread {
// 이미 Thread를 상속했으므로 다른 클래스 상속 불가
}
// ✅ Runnable 구현: 다른 클래스 상속 가능
class Worker2 extends BaseWorker implements Runnable {
@Override
public void run() {
// 작업 수행
}
}
// ✅ 코드 재사용
class DataProcessor implements Runnable {
private String data;
public DataProcessor(String data) {
this.data = data;
}
@Override
public void run() {
processData(data);
}
private void processData(String data) {
// 처리 로직
}
}
public void useProcessor() {
DataProcessor processor = new DataProcessor("test");
// 스레드로 실행
new Thread(processor).start();
// ExecutorService로 실행
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(processor);
// 직접 호출도 가능
processor.run();
}
static class BaseWorker {
// 공통 기능
}
}
스레드 라이프사이클
스레드 상태
public class ThreadLifecycle {
/**
* 스레드 상태 (Thread.State):
*
* NEW : 생성됨, 아직 시작 안됨
* RUNNABLE : 실행 중 또는 실행 가능
* BLOCKED : 모니터 락 대기 중
* WAITING : 무한 대기 중
* TIMED_WAITING: 제한 시간 대기 중
* TERMINATED : 실행 완료
*/
public void demonstrateStates() throws InterruptedException {
Thread thread = new Thread(() -> {
try {
// RUNNABLE
System.out.println("Thread running");
// TIMED_WAITING
Thread.sleep(1000);
synchronized (this) {
// WAITING
wait();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// NEW
System.out.println("1. State: " + thread.getState()); // NEW
thread.start();
Thread.sleep(100);
// RUNNABLE
System.out.println("2. State: " + thread.getState()); // RUNNABLE
Thread.sleep(200);
// TIMED_WAITING
System.out.println("3. State: " + thread.getState()); // TIMED_WAITING
thread.join();
// TERMINATED
System.out.println("4. State: " + thread.getState()); // TERMINATED
}
}
상태 전환 다이어그램
/**
* 스레드 상태 전환:
*
* NEW
* ↓ start()
* RUNNABLE ←─────────┐
* ↓ │
* ├→ BLOCKED ──────→┤ (락 획득)
* ├→ WAITING ──────→┤ (notify/notifyAll)
* └→ TIMED_WAITING →┤ (시간 만료)
* ↓
* TERMINATED
*/
public class StateTransitions {
private final Object lock = new Object();
public void demonstrateTransitions() {
// RUNNABLE → BLOCKED
Thread t1 = new Thread(() -> {
synchronized (lock) {
System.out.println("T1: Lock acquired");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
Thread t2 = new Thread(() -> {
synchronized (lock) { // BLOCKED 상태로 대기
System.out.println("T2: Lock acquired");
}
});
t1.start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
t2.start();
try {
Thread.sleep(200);
System.out.println("T2 State: " + t2.getState()); // BLOCKED
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
join, sleep, interrupt
join: 스레드 종료 대기
public class JoinExample {
// 기본 join
public void basicJoin() throws InterruptedException {
Thread worker = new Thread(() -> {
System.out.println("Worker: Starting work...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Worker: Work completed");
});
worker.start();
System.out.println("Main: Waiting for worker...");
worker.join(); // worker 종료까지 대기
System.out.println("Main: Worker finished");
}
// 타임아웃 join
public void timeoutJoin() throws InterruptedException {
Thread longTask = new Thread(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
longTask.start();
// 최대 2초만 대기
longTask.join(2000);
if (longTask.isAlive()) {
System.out.println("Task still running, interrupting...");
longTask.interrupt();
}
}
// 실무 예제: 병렬 작업 후 결과 수집
public List<String> parallelProcess(List<String> data) throws InterruptedException {
List<String> results = new CopyOnWriteArrayList<>();
List<Thread> threads = new ArrayList<>();
for (String item : data) {
Thread thread = new Thread(() -> {
String result = processItem(item);
results.add(result);
});
thread.start();
threads.add(thread);
}
// 모든 스레드 종료 대기
for (Thread thread : threads) {
thread.join();
}
return results;
}
private String processItem(String item) {
// 처리 로직
return item.toUpperCase();
}
}
sleep: 스레드 일시 정지
public class SleepExample {
// 기본 sleep
public void basicSleep() {
System.out.println("Before sleep: " + System.currentTimeMillis());
try {
Thread.sleep(1000); // 1초 대기
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Sleep interrupted");
}
System.out.println("After sleep: " + System.currentTimeMillis());
}
// sleep vs wait 비교
public void sleepVsWait() {
Object lock = new Object();
// sleep: 락을 유지한 채 대기
Thread sleepThread = new Thread(() -> {
synchronized (lock) {
System.out.println("Sleep thread: acquired lock");
try {
Thread.sleep(2000); // 락 유지
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Sleep thread: releasing lock");
}
});
// wait: 락을 해제하고 대기
Thread waitThread = new Thread(() -> {
synchronized (lock) {
System.out.println("Wait thread: acquired lock");
try {
lock.wait(2000); // 락 해제
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Wait thread: reacquired lock");
}
});
}
// 실무 예제: 재시도 로직
public String fetchWithRetry(String url, int maxRetries) {
for (int i = 0; i < maxRetries; i++) {
try {
return fetch(url);
} catch (Exception e) {
if (i == maxRetries - 1) {
throw new RuntimeException("Max retries exceeded", e);
}
// 지수 백오프
long delay = (long) Math.pow(2, i) * 1000;
System.out.println("Retry " + (i + 1) + " after " + delay + "ms");
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted", ie);
}
}
}
throw new RuntimeException("Unreachable");
}
private String fetch(String url) throws Exception {
// HTTP 요청
return "";
}
}
interrupt: 스레드 중단
public class InterruptExample {
// 기본 interrupt 처리
public void basicInterrupt() {
Thread worker = new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
System.out.println("Working...");
Thread.sleep(500);
}
System.out.println("Interrupted, cleaning up...");
} catch (InterruptedException e) {
System.out.println("Interrupted during sleep");
Thread.currentThread().interrupt(); // 인터럽트 상태 복원
}
});
worker.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
worker.interrupt(); // 중단 요청
}
// 인터럽트 무시 (안티패턴)
public void ignoreInterrupt() {
Thread thread = new Thread(() -> {
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// ❌ 인터럽트 무시 (나쁨!)
System.out.println("Interrupted but continuing...");
}
}
});
thread.start();
thread.interrupt(); // 효과 없음!
}
// 올바른 인터럽트 처리
public void correctInterruptHandling() {
Thread thread = new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
// 작업 수행
doWork();
Thread.sleep(100);
}
} catch (InterruptedException e) {
// ✅ 정리 작업 후 인터럽트 상태 복원
cleanup();
Thread.currentThread().interrupt();
}
});
thread.start();
}
// 실무 예제: 타임아웃이 있는 작업
public String executeWithTimeout(Callable<String> task, long timeoutMs)
throws TimeoutException, InterruptedException {
FutureTask<String> future = new FutureTask<>(task);
Thread thread = new Thread(future);
thread.start();
try {
return future.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (TimeoutException e) {
future.cancel(true); // 인터럽트
thread.interrupt();
throw e;
}
}
private void doWork() {
// 작업
}
private void cleanup() {
// 정리
}
}
동기화(Synchronization) 완벽 이해
synchronized 키워드
메서드 동기화
public class MethodSynchronization {
private int count = 0;
// ❌ 동기화 안됨: Race Condition 발생!
public void incrementUnsafe() {
count++; // 원자적 연산 아님!
// 1. count 읽기
// 2. 1 증가
// 3. count 쓰기
}
// ✅ 메서드 전체 동기화
public synchronized void incrementSafe() {
count++;
}
// ✅ 블록 동기화 (더 세밀한 제어)
public void incrementWithBlock() {
synchronized (this) {
count++;
}
}
// 테스트: Race Condition 재현
public void demonstrateRaceCondition() throws InterruptedException {
MethodSynchronization counter = new MethodSynchronization();
// 1000개 스레드가 각각 1000번 증가
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.incrementUnsafe();
}
});
thread.start();
threads.add(thread);
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("Expected: 1000000");
System.out.println("Actual: " + counter.count); // 1000000보다 작음!
}
}
블록 동기화
public class BlockSynchronization {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
private int balance1 = 1000;
private int balance2 = 2000;
// ❌ 메서드 전체 동기화: 불필요하게 넓은 범위
public synchronized void transferBad(int amount) {
// 준비 작업 (동기화 불필요)
System.out.println("Preparing transfer...");
// 실제 송금 (동기화 필요)
balance1 -= amount;
balance2 += amount;
// 로깅 (동기화 불필요)
System.out.println("Transfer completed");
}
// ✅ 필요한 부분만 동기화
public void transferGood(int amount) {
// 준비 작업
System.out.println("Preparing transfer...");
// 동기화 블록
synchronized (this) {
balance1 -= amount;
balance2 += amount;
}
// 로깅
System.out.println("Transfer completed");
}
// ✅ 서로 다른 락 사용 (동시성 향상)
public void updateBalance1(int amount) {
synchronized (lock1) {
balance1 += amount;
}
}
public void updateBalance2(int amount) {
synchronized (lock2) {
balance2 += amount;
}
}
}
static 동기화
public class StaticSynchronization {
private static int counter = 0;
private int instanceCounter = 0;
// static 메서드 동기화: Class 객체를 락으로 사용
public static synchronized void incrementStatic() {
counter++;
}
// 동일한 효과
public static void incrementStaticBlock() {
synchronized (StaticSynchronization.class) {
counter++;
}
}
// 인스턴스 메서드 동기화: this 객체를 락으로 사용
public synchronized void incrementInstance() {
instanceCounter++;
}
// 동일한 효과
public void incrementInstanceBlock() {
synchronized (this) {
instanceCounter++;
}
}
// ⚠️ static과 instance 동기화는 서로 다른 락!
public void demonstrateDifferentLocks() {
StaticSynchronization obj1 = new StaticSynchronization();
StaticSynchronization obj2 = new StaticSynchronization();
// 다른 인스턴스: 서로 다른 락
new Thread(obj1::incrementInstance).start();
new Thread(obj2::incrementInstance).start(); // 동시 실행 가능
// static 메서드: 같은 락 (Class 객체)
new Thread(StaticSynchronization::incrementStatic).start();
new Thread(StaticSynchronization::incrementStatic).start(); // 순차 실행
}
}
모니터와 락
모니터 개념
public class MonitorConcept {
/**
* 모니터 (Monitor):
* - 상호 배제 (Mutual Exclusion)
* - 조건 변수 (Condition Variable)
*
* Java의 모든 객체는 모니터를 가짐
*/
private final Object monitor = new Object();
private int value = 0;
// Producer-Consumer 패턴
public void produce() throws InterruptedException {
synchronized (monitor) {
while (value > 0) {
// 버퍼가 가득 참
monitor.wait(); // 락 해제 후 대기
}
value++;
System.out.println("Produced: " + value);
monitor.notify(); // 대기 중인 스레드 깨우기
}
}
public void consume() throws InterruptedException {
synchronized (monitor) {
while (value == 0) {
// 버퍼가 비어 있음
monitor.wait();
}
System.out.println("Consumed: " + value);
value--;
monitor.notify();
}
}
// 실무 예제: Blocking Queue 구현
static class SimpleBlockingQueue<T> {
private final Queue<T> queue = new LinkedList<>();
private final int capacity;
public SimpleBlockingQueue(int capacity) {
this.capacity = capacity;
}
public synchronized void put(T item) throws InterruptedException {
while (queue.size() == capacity) {
wait(); // 큐가 가득 차면 대기
}
queue.offer(item);
notifyAll(); // 대기 중인 소비자 깨우기
}
public synchronized T take() throws InterruptedException {
while (queue.isEmpty()) {
wait(); // 큐가 비어 있으면 대기
}
T item = queue.poll();
notifyAll(); // 대기 중인 생산자 깨우기
return item;
}
}
}
wait, notify, notifyAll
public class WaitNotifyExample {
private final Object lock = new Object();
private boolean ready = false;
// notify vs notifyAll
public void demonstrateNotify() {
// Worker 스레드들
for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
synchronized (lock) {
try {
while (!ready) {
System.out.println("Worker " + id + ": Waiting...");
lock.wait();
}
System.out.println("Worker " + id + ": Working!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// notify(): 하나의 스레드만 깨움
synchronized (lock) {
ready = true;
lock.notify(); // 1개 스레드만 깨어남
}
// notifyAll(): 모든 스레드 깨움
synchronized (lock) {
ready = true;
lock.notifyAll(); // 3개 스레드 모두 깨어남
}
}
// 주의: wait()는 while 루프에서 사용
public void correctWaitUsage() {
synchronized (lock) {
// ✅ while 사용 (재확인)
while (!ready) {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
// ❌ if 사용 (Spurious Wakeup 문제)
if (!ready) {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
데드락 원인과 해결
데드락 발생 조건
public class DeadlockExample {
/**
* 데드락 발생 4가지 조건:
* 1. 상호 배제 (Mutual Exclusion)
* 2. 점유 대기 (Hold and Wait)
* 3. 비선점 (No Preemption)
* 4. 순환 대기 (Circular Wait)
*/
private final Object lock1 = new Object();
private final Object lock2 = new Object();
// ❌ 데드락 발생!
public void causeDeadlock() {
Thread t1 = new Thread(() -> {
synchronized (lock1) {
System.out.println("T1: Lock1 acquired");
sleep(100);
synchronized (lock2) { // lock2 대기
System.out.println("T1: Lock2 acquired");
}
}
});
Thread t2 = new Thread(() -> {
synchronized (lock2) {
System.out.println("T2: Lock2 acquired");
sleep(100);
synchronized (lock1) { // lock1 대기
System.out.println("T2: Lock1 acquired");
}
}
});
t1.start();
t2.start();
// T1: lock1 보유, lock2 대기
// T2: lock2 보유, lock1 대기
// → 데드락!
}
// ✅ 해결책 1: 락 순서 고정
public void fixedLockOrder() {
Thread t1 = new Thread(() -> {
synchronized (lock1) { // 항상 lock1 먼저
System.out.println("T1: Lock1 acquired");
synchronized (lock2) {
System.out.println("T1: Lock2 acquired");
}
}
});
Thread t2 = new Thread(() -> {
synchronized (lock1) { // lock1 먼저
System.out.println("T2: Lock1 acquired");
synchronized (lock2) {
System.out.println("T2: Lock2 acquired");
}
}
});
t1.start();
t2.start();
}
// ✅ 해결책 2: tryLock (타임아웃)
public void tryLockWithTimeout() {
Lock lock1 = new ReentrantLock();
Lock lock2 = new ReentrantLock();
Thread t1 = new Thread(() -> {
try {
if (lock1.tryLock(1, TimeUnit.SECONDS)) {
try {
if (lock2.tryLock(1, TimeUnit.SECONDS)) {
try {
System.out.println("T1: Both locks acquired");
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
t1.start();
}
// ✅ 해결책 3: 락 없이 설계
public void lockFreeDesign() {
// AtomicInteger 사용
AtomicInteger counter = new AtomicInteger(0);
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.incrementAndGet();
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.incrementAndGet();
}
});
t1.start();
t2.start();
}
private void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
실무 데드락 예제: 은행 송금
public class BankDeadlock {
static class Account {
private final long id;
private int balance;
public Account(long id, int balance) {
this.id = id;
this.balance = balance;
}
public long getId() {
return id;
}
}
// ❌ 데드락 위험
public void transferUnsafe(Account from, Account to, int amount) {
synchronized (from) {
synchronized (to) {
from.balance -= amount;
to.balance += amount;
}
}
}
// ✅ ID 순서로 락 획득
public void transferSafe(Account from, Account to, int amount) {
Account first = from.getId() < to.getId() ? from : to;
Account second = from.getId() < to.getId() ? to : from;
synchronized (first) {
synchronized (second) {
from.balance -= amount;
to.balance += amount;
}
}
}
// 테스트: 데드락 재현
public void demonstrateDeadlock() {
Account account1 = new Account(1, 1000);
Account account2 = new Account(2, 2000);
// T1: account1 → account2
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
transferUnsafe(account1, account2, 10);
}
});
// T2: account2 → account1
Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
transferUnsafe(account2, account1, 10);
}
});
t1.start();
t2.start();
// 데드락 발생 가능!
}
}
volatile 키워드
가시성 문제
public class VolatileExample {
// ❌ volatile 없음: 가시성 문제
private boolean running = true;
public void demonstrateVisibilityProblem() {
Thread worker = new Thread(() -> {
System.out.println("Worker started");
while (running) { // 캐시된 값을 계속 읽음
// 작업 수행
}
System.out.println("Worker stopped");
});
worker.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
running = false; // 메인 스레드가 변경
System.out.println("Set running to false");
// Worker가 멈추지 않을 수 있음!
}
// ✅ volatile 사용
private volatile boolean runningVolatile = true;
public void withVolatile() {
Thread worker = new Thread(() -> {
while (runningVolatile) { // 항상 메모리에서 읽음
// 작업
}
});
worker.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
runningVolatile = false; // 즉시 보임
}
/**
* volatile의 동작:
*
* 일반 변수:
* [메모리] ← [CPU 캐시] ← [CPU 레지스터]
*
* volatile 변수:
* [메모리] ←→ [CPU] (캐시 우회)
*/
// volatile 사용 예: Double-Checked Locking
private static volatile VolatileExample instance;
public static VolatileExample getInstance() {
if (instance == null) {
synchronized (VolatileExample.class) {
if (instance == null) {
instance = new VolatileExample();
}
}
}
return instance;
}
// volatile의 한계: 복합 연산
private volatile int counter = 0;
// ❌ volatile만으로는 안전하지 않음!
public void incrementVolatile() {
counter++; // 읽기 + 쓰기 (원자적이지 않음)
}
// ✅ synchronized 또는 Atomic 사용
public synchronized void incrementSafe() {
counter++;
}
// 또는
private AtomicInteger atomicCounter = new AtomicInteger(0);
public void incrementAtomic() {
atomicCounter.incrementAndGet();
}
}
volatile vs synchronized
public class VolatileVsSynchronized {
/**
* volatile:
* ✅ 가시성 보장
* ❌ 원자성 보장 안함
* ✅ 락 없음 (빠름)
* ✅ 단일 변수 읽기/쓰기
*
* synchronized:
* ✅ 가시성 보장
* ✅ 원자성 보장
* ❌ 락 사용 (느림)
* ✅ 복합 연산
*/
// volatile: 플래그용
private volatile boolean shutdown = false;
public void checkStatus() {
if (shutdown) {
// 종료 처리
}
}
public void shutdown() {
shutdown = true;
}
// synchronized: 복합 연산
private int count = 0;
public synchronized void increment() {
count++; // 읽기 + 증가 + 쓰기
}
public synchronized int getCount() {
return count;
}
}
5.2 고급 동시성
Executor Framework로 스레드 풀 관리
ExecutorService 사용법
기본 사용
public class ExecutorServiceBasics {
// 1. Single Thread Executor
public void singleThreadExample() {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
System.out.println("Task 1");
});
executor.submit(() -> {
System.out.println("Task 2");
});
executor.shutdown(); // 새 작업 거부, 기존 작업 완료 대기
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 강제 종료
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
// 2. Fixed Thread Pool
public void fixedThreadPoolExample() {
ExecutorService executor = Executors.newFixedThreadPool(4);
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId +
" executed by " + Thread.currentThread().getName());
});
}
executor.shutdown();
}
// 3. Cached Thread Pool
public void cachedThreadPoolExample() {
// 필요에 따라 스레드 생성, 60초 후 제거
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
executor.submit(() -> {
System.out.println(Thread.currentThread().getName());
});
}
executor.shutdown();
}
// 4. Scheduled Thread Pool
public void scheduledExample() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// 2초 후 실행
scheduler.schedule(() -> {
System.out.println("Delayed task");
}, 2, TimeUnit.SECONDS);
// 1초 후 시작, 3초마다 반복
scheduler.scheduleAtFixedRate(() -> {
System.out.println("Periodic task");
}, 1, 3, TimeUnit.SECONDS);
// 1초 후 시작, 이전 작업 완료 후 3초 뒤 실행
scheduler.scheduleWithFixedDelay(() -> {
System.out.println("Task with delay");
}, 1, 3, TimeUnit.SECONDS);
}
}
Callable과 Future
public class CallableAndFuture {
// Callable: 값을 반환하고 예외를 던질 수 있음
public void callableExample() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable<Integer> task = () -> {
Thread.sleep(1000);
return 42;
};
Future<Integer> future = executor.submit(task);
System.out.println("Waiting for result...");
Integer result = future.get(); // 블로킹
System.out.println("Result: " + result);
executor.shutdown();
}
// Future 타임아웃
public void futureWithTimeout() {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
Thread.sleep(5000);
return "Done";
});
try {
String result = future.get(2, TimeUnit.SECONDS);
System.out.println(result);
} catch (TimeoutException e) {
System.out.println("Task timed out");
future.cancel(true); // 작업 취소
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
// 여러 Future 처리
public List<String> processMultipleTasks() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(4);
List<Callable<String>> tasks = Arrays.asList(
() -> { Thread.sleep(100); return "Task 1"; },
() -> { Thread.sleep(200); return "Task 2"; },
() -> { Thread.sleep(150); return "Task 3"; }
);
// invokeAll: 모든 작업 완료 대기
List<Future<String>> futures = executor.invokeAll(tasks);
List<String> results = new ArrayList<>();
for (Future<String> future : futures) {
results.add(future.get());
}
executor.shutdown();
return results;
}
// invokeAny: 가장 빨리 완료된 작업의 결과 반환
public String getFirstResult() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
List<Callable<String>> tasks = Arrays.asList(
() -> { Thread.sleep(300); return "Slow"; },
() -> { Thread.sleep(100); return "Fast"; },
() -> { Thread.sleep(200); return "Medium"; }
);
String result = executor.invokeAny(tasks); // "Fast" 반환
executor.shutdown();
return result;
}
}
ThreadPoolExecutor 설정
커스텀 스레드 풀
public class CustomThreadPool {
public void createCustomPool() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize: 기본 스레드 수
10, // maximumPoolSize: 최대 스레드 수
60L, // keepAliveTime: 유휴 스레드 생존 시간
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), // 작업 큐
new ThreadFactory() {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("MyPool-" + counter.incrementAndGet());
thread.setDaemon(false);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 거부 정책
);
// 사용
for (int i = 0; i < 200; i++) {
executor.submit(() -> {
System.out.println(Thread.currentThread().getName());
});
}
executor.shutdown();
}
/**
* 스레드 풀 동작 방식:
*
* 1. 작업 제출
* ↓
* 2. corePoolSize 미만?
* Yes → 새 스레드 생성
* No → 3번으로
* ↓
* 3. 큐에 여유?
* Yes → 큐에 추가
* No → 4번으로
* ↓
* 4. maximumPoolSize 미만?
* Yes → 새 스레드 생성
* No → 5번으로
* ↓
* 5. 거부 정책 실행
*/
// 거부 정책들
public void demonstrateRejectionPolicies() {
// 1. AbortPolicy (기본): RejectedExecutionException 발생
ThreadPoolExecutor executor1 = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.AbortPolicy()
);
// 2. CallerRunsPolicy: 호출 스레드에서 실행
ThreadPoolExecutor executor2 = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 3. DiscardPolicy: 조용히 무시
ThreadPoolExecutor executor3 = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.DiscardPolicy()
);
// 4. DiscardOldestPolicy: 가장 오래된 작업 제거 후 재시도
ThreadPoolExecutor executor4 = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
}
// 모니터링
public void monitorThreadPool() {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
// 주기적으로 상태 확인
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
System.out.println("=== Thread Pool Status ===");
System.out.println("Active threads: " + executor.getActiveCount());
System.out.println("Pool size: " + executor.getPoolSize());
System.out.println("Queue size: " + executor.getQueue().size());
System.out.println("Completed tasks: " + executor.getCompletedTaskCount());
}, 0, 1, TimeUnit.SECONDS);
// 작업 제출
for (int i = 0; i < 100; i++) {
executor.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
}
CompletableFuture 완벽 가이드
기본 사용
public class CompletableFutureBasics {
// 1. 비동기 작업 생성
public void createAsync() {
// supplyAsync: 값 반환
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Hello";
});
// runAsync: 값 반환 안함
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
sleep(1000);
System.out.println("Task completed");
});
// 결과 가져오기
try {
String result = future1.get(); // 블로킹
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
// 2. 체이닝
public void chaining() {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
System.out.println("1. Fetching user");
return "User123";
})
.thenApply(userId -> {
System.out.println("2. Fetching orders for " + userId);
return Arrays.asList("Order1", "Order2");
})
.thenApply(orders -> {
System.out.println("3. Processing " + orders.size() + " orders");
return orders.size();
})
.thenApply(count -> {
System.out.println("4. Generating report");
return "Report: " + count + " orders";
});
try {
String result = future.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
// 3. 조합
public void combining() {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(1500);
return "World";
});
// thenCombine: 두 결과 조합
CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> {
return s1 + " " + s2;
});
try {
System.out.println(combined.get()); // "Hello World"
} catch (Exception e) {
e.printStackTrace();
}
}
// 4. 여러 Future 처리
public void multipleF utures() {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Task 1");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "Task 2");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "Task 3");
// allOf: 모든 작업 완료 대기
CompletableFuture<Void> allOf = CompletableFuture.allOf(f1, f2, f3);
allOf.thenRun(() -> {
try {
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
} catch (Exception e) {
e.printStackTrace();
}
});
// anyOf: 가장 빨리 완료된 작업
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(f1, f2, f3);
try {
Object result = anyOf.get();
System.out.println("First completed: " + result);
} catch (Exception e) {
e.printStackTrace();
}
}
// 5. 예외 처리
public void exceptionHandling() {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Error!");
}
return "Success";
})
.exceptionally(ex -> {
System.out.println("Exception: " + ex.getMessage());
return "Default Value";
})
.thenApply(result -> {
return result.toUpperCase();
});
// 또는 handle
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> {
throw new RuntimeException("Error!");
})
.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
}
return result;
});
}
private void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
실무 예제
public class CompletableFutureRealWorld {
// 1. 병렬 API 호출
public CompletableFuture<Dashboard> loadDashboard(String userId) {
CompletableFuture<User> userFuture =
CompletableFuture.supplyAsync(() -> fetchUser(userId));
CompletableFuture<List<Order>> ordersFuture =
CompletableFuture.supplyAsync(() -> fetchOrders(userId));
CompletableFuture<List<Notification>> notificationsFuture =
CompletableFuture.supplyAsync(() -> fetchNotifications(userId));
return CompletableFuture.allOf(userFuture, ordersFuture, notificationsFuture)
.thenApply(v -> {
try {
return new Dashboard(
userFuture.get(),
ordersFuture.get(),
notificationsFuture.get()
);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
// 2. 타임아웃 처리
public CompletableFuture<String> fetchWithTimeout(String url) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return httpGet(url);
});
// Java 9+: orTimeout
return future.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
return "Request timed out";
}
return "Error: " + ex.getMessage();
});
}
// 3. 재시도 로직
public CompletableFuture<String> fetchWithRetry(String url, int maxRetries) {
return CompletableFuture.supplyAsync(() -> httpGet(url))
.exceptionally(ex -> {
if (maxRetries > 0) {
try {
return fetchWithRetry(url, maxRetries - 1).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
throw new RuntimeException("Max retries exceeded", ex);
});
}
// 4. 파이프라인 처리
public CompletableFuture<Report> generateReport(String userId) {
return CompletableFuture.supplyAsync(() -> fetchUserData(userId))
.thenApply(this::validateData)
.thenApply(this::transformData)
.thenApply(this::aggregateData)
.thenApply(this::formatReport)
.exceptionally(ex -> {
System.err.println("Report generation failed: " + ex.getMessage());
return getDefaultReport();
});
}
// Helper methods
private User fetchUser(String userId) { return new User(); }
private List<Order> fetchOrders(String userId) { return new ArrayList<>(); }
private List<Notification> fetchNotifications(String userId) { return new ArrayList<>(); }
private String httpGet(String url) { return ""; }
private Object fetchUserData(String userId) { return new Object(); }
private Object validateData(Object data) { return data; }
private Object transformData(Object data) { return data; }
private Object aggregateData(Object data) { return data; }
private Report formatReport(Object data) { return new Report(); }
private Report getDefaultReport() { return new Report(); }
static class Dashboard {
Dashboard(User user, List<Order> orders, List<Notification> notifications) {}
}
static class User {}
static class Order {}
static class Notification {}
static class Report {}
}
java.util.concurrent 패키지 활용
CountDownLatch, CyclicBarrier
CountDownLatch: 일회용 동기화
public class CountDownLatchExample {
// 기본 사용
public void basicUsage() throws InterruptedException {
int numWorkers = 3;
CountDownLatch latch = new CountDownLatch(numWorkers);
for (int i = 0; i < numWorkers; i++) {
final int workerId = i;
new Thread(() -> {
System.out.println("Worker " + workerId + " starting");
sleep(1000);
System.out.println("Worker " + workerId + " done");
latch.countDown(); // 카운트 감소
}).start();
}
System.out.println("Waiting for workers...");
latch.await(); // 카운트가 0이 될 때까지 대기
System.out.println("All workers done!");
}
// 실무 예제: 서비스 시작 대기
public class ServiceStarter {
private final CountDownLatch latch = new CountDownLatch(3);
public void start() {
new Thread(this::startDatabase).start();
new Thread(this::startCache).start();
new Thread(this::startWebServer).start();
try {
latch.await(30, TimeUnit.SECONDS);
System.out.println("All services started");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void startDatabase() {
System.out.println("Starting database...");
sleep(2000);
System.out.println("Database started");
latch.countDown();
}
private void startCache() {
System.out.println("Starting cache...");
sleep(1500);
System.out.println("Cache started");
latch.countDown();
}
private void startWebServer() {
System.out.println("Starting web server...");
sleep(3000);
System.out.println("Web server started");
latch.countDown();
}
}
private void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
CyclicBarrier: 재사용 가능한 동기화
public class CyclicBarrierExample {
// 기본 사용
public void basicUsage() {
int numThreads = 3;
CyclicBarrier barrier = new CyclicBarrier(numThreads, () -> {
System.out.println("All threads reached barrier!");
});
for (int i = 0; i < numThreads; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("Thread " + threadId + " working");
sleep(1000);
System.out.println("Thread " + threadId + " waiting at barrier");
barrier.await(); // 모든 스레드 대기
System.out.println("Thread " + threadId + " continuing");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
// 실무 예제: 병렬 데이터 처리
public class ParallelDataProcessor {
private final CyclicBarrier barrier;
private final List<List<Integer>> chunks;
private final List<Integer> results = new ArrayList<>();
public ParallelDataProcessor(List<Integer> data, int numThreads) {
this.chunks = splitData(data, numThreads);
this.barrier = new CyclicBarrier(numThreads, () -> {
System.out.println("All chunks processed, merging results...");
});
}
public void process() {
for (int i = 0; i < chunks.size(); i++) {
final int chunkIndex = i;
new Thread(() -> {
List<Integer> chunk = chunks.get(chunkIndex);
// Phase 1: Process
System.out.println("Processing chunk " + chunkIndex);
int sum = chunk.stream().mapToInt(Integer::intValue).sum();
synchronized (results) {
results.add(sum);
}
try {
barrier.await(); // Phase 1 완료 대기
// Phase 2: Aggregate (barrier 이후)
if (chunkIndex == 0) {
int total = results.stream().mapToInt(Integer::intValue).sum();
System.out.println("Total: " + total);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
private List<List<Integer>> splitData(List<Integer> data, int numChunks) {
List<List<Integer>> chunks = new ArrayList<>();
int chunkSize = data.size() / numChunks;
for (int i = 0; i < numChunks; i++) {
int start = i * chunkSize;
int end = (i == numChunks - 1) ? data.size() : (i + 1) * chunkSize;
chunks.add(data.subList(start, end));
}
return chunks;
}
}
private void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Semaphore, BlockingQueue
Semaphore: 리소스 접근 제한
public class SemaphoreExample {
// 기본 사용
public void basicUsage() {
Semaphore semaphore = new Semaphore(3); // 동시에 3개까지 허용
for (int i = 0; i < 10; i++) {
final int taskId = i;
new Thread(() -> {
try {
System.out.println("Task " + taskId + " waiting for permit");
semaphore.acquire(); // 허가 획득
System.out.println("Task " + taskId + " acquired permit");
sleep(1000); // 작업 수행
System.out.println("Task " + taskId + " releasing permit");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 허가 반환
}
}).start();
}
}
// 실무 예제: DB 커넥션 풀
public class DatabaseConnectionPool {
private final Semaphore semaphore;
private final Queue<Connection> connections = new LinkedList<>();
public DatabaseConnectionPool(int maxConnections) {
this.semaphore = new Semaphore(maxConnections);
for (int i = 0; i < maxConnections; i++) {
connections.offer(new Connection());
}
}
public Connection acquire() throws InterruptedException {
semaphore.acquire();
synchronized (connections) {
return connections.poll();
}
}
public void release(Connection connection) {
synchronized (connections) {
connections.offer(connection);
}
semaphore.release();
}
public void useConnection() throws InterruptedException {
Connection conn = acquire();
try {
// DB 작업
System.out.println("Using connection");
Thread.sleep(1000);
} finally {
release(conn);
}
}
}
// 실무 예제: API Rate Limiting
public class RateLimiter {
private final Semaphore semaphore;
private final ScheduledExecutorService scheduler;
public RateLimiter(int requestsPerSecond) {
this.semaphore = new Semaphore(requestsPerSecond);
this.scheduler = Executors.newScheduledThreadPool(1);
// 1초마다 모든 허가 복원
scheduler.scheduleAtFixedRate(() -> {
semaphore.release(requestsPerSecond - semaphore.availablePermits());
}, 1, 1, TimeUnit.SECONDS);
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
public void shutdown() {
scheduler.shutdown();
}
}
static class Connection {}
private void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
BlockingQueue: Producer-Consumer 패턴
public class BlockingQueueExample {
// 기본 사용
public void basicUsage() {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// Producer
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
String item = "Item " + i;
queue.put(item); // 큐가 가득 차면 대기
System.out.println("Produced: " + item);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
// Consumer
for (int i = 0; i < 3; i++) {
new Thread(() -> {
while (true) {
try {
String item = queue.take(); // 큐가 비어있으면 대기
System.out.println(Thread.currentThread().getName() +
" consumed: " + item);
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
}
// 다양한 BlockingQueue 구현체
public void differentImplementations() {
// 1. ArrayBlockingQueue: 고정 크기
BlockingQueue<String> array = new ArrayBlockingQueue<>(100);
// 2. LinkedBlockingQueue: 가변 크기 (옵션)
BlockingQueue<String> linked = new LinkedBlockingQueue<>();
// 3. PriorityBlockingQueue: 우선순위
BlockingQueue<Task> priority = new PriorityBlockingQueue<>(
10,
Comparator.comparing(Task::getPriority)
);
// 4. DelayQueue: 지연 큐
BlockingQueue<DelayedTask> delay = new DelayQueue<>();
// 5. SynchronousQueue: 용량 0 (직접 전달)
BlockingQueue<String> sync = new SynchronousQueue<>();
}
// 실무 예제: 로그 처리 시스템
public class LogProcessor {
private final BlockingQueue<LogEntry> queue = new LinkedBlockingQueue<>(1000);
private final ExecutorService executor = Executors.newFixedThreadPool(3);
private volatile boolean running = true;
public void start() {
// Consumer 스레드들
for (int i = 0; i < 3; i++) {
executor.submit(() -> {
while (running) {
try {
LogEntry entry = queue.poll(1, TimeUnit.SECONDS);
if (entry != null) {
processLog(entry);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
}
public void log(LogEntry entry) {
if (!queue.offer(entry)) {
// 큐가 가득 참
System.err.println("Log queue full, dropping entry");
}
}
public void shutdown() {
running = false;
executor.shutdown();
}
private void processLog(LogEntry entry) {
// 파일에 쓰기, DB 저장 등
System.out.println("Processing log: " + entry);
}
}
// 실무 예제: 작업 큐
public class TaskQueue {
private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
private final List<Thread> workers = new ArrayList<>();
public TaskQueue(int numWorkers) {
for (int i = 0; i < numWorkers; i++) {
Thread worker = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Runnable task = queue.take();
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
worker.start();
workers.add(worker);
}
}
public void submit(Runnable task) {
queue.offer(task);
}
public void shutdown() {
workers.forEach(Thread::interrupt);
}
}
static class Task {
int getPriority() { return 0; }
}
static class DelayedTask implements Delayed {
@Override
public long getDelay(TimeUnit unit) { return 0; }
@Override
public int compareTo(Delayed o) { return 0; }
}
static class LogEntry {
@Override
public String toString() { return "LogEntry"; }
}
}
ConcurrentHashMap vs synchronized Map
ConcurrentHashMap의 우수성
public class ConcurrentHashMapExample {
// 성능 비교
@Benchmark
public void synchronizedMapPerformance() {
Map<String, Integer> map = Collections.synchronizedMap(new HashMap<>());
// 모든 연산이 전체 맵을 락
map.put("key1", 1);
map.get("key1");
map.remove("key1");
}
@Benchmark
public void concurrentHashMapPerformance() {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 세그먼트 단위로 락 (더 세밀한 동기화)
map.put("key1", 1);
map.get("key1");
map.remove("key1");
}
/**
* 벤치마크 결과:
* - synchronizedMap: 100 µs
* - ConcurrentHashMap: 10 µs (10배 빠름!)
*/
// 원자적 연산
public void atomicOperations() {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// ❌ 비원자적 (Race Condition)
if (map.get("key") == null) {
map.put("key", 1);
}
// ✅ 원자적
map.putIfAbsent("key", 1);
// ✅ compute: 기존 값 기반 계산
map.compute("key", (k, v) -> v == null ? 1 : v + 1);
// ✅ merge: 값 병합
map.merge("key", 1, Integer::sum);
}
// 실무 예제: 캐시
public class Cache<K, V> {
private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
private final Function<K, V> loader;
public Cache(Function<K, V> loader) {
this.loader = loader;
}
public V get(K key) {
return cache.computeIfAbsent(key, loader);
}
public void invalidate(K key) {
cache.remove(key);
}
public void clear() {
cache.clear();
}
}
// 실무 예제: 동시성 카운터
public class ConcurrentCounter {
private final ConcurrentHashMap<String, AtomicInteger> counters =
new ConcurrentHashMap<>();
public void increment(String key) {
counters.computeIfAbsent(key, k -> new AtomicInteger(0))
.incrementAndGet();
}
public int getCount(String key) {
AtomicInteger counter = counters.get(key);
return counter != null ? counter.get() : 0;
}
public Map<String, Integer> getAll() {
Map<String, Integer> result = new HashMap<>();
counters.forEach((k, v) -> result.put(k, v.get()));
return result;
}
}
// forEach 병렬 처리
public void parallelForEach() {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 데이터 채우기
for (int i = 0; i < 1000; i++) {
map.put("key" + i, i);
}
// 병렬 forEach (parallelismThreshold)
map.forEach(1, (key, value) -> {
// 병렬로 처리
System.out.println(key + ": " + value);
});
// 병렬 search
String result = map.search(1, (key, value) -> {
return value > 500 ? key : null;
});
// 병렬 reduce
Integer sum = map.reduce(1,
(key, value) -> value,
Integer::sum
);
}
}
5.3 Java 모니터링과 락(Lock) 심화
JVM 모니터링 도구와 동시성 분석
JConsole을 활용한 스레드 모니터링
JConsole 실행
# 애플리케이션 실행 시 JMX 옵션 추가
java -Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.port=9999 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-jar MyApplication.jar
# JConsole 실행
jconsole localhost:9999
스레드 덤프 분석
public class ThreadDumpExample {
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
public static void main(String[] args) {
// 데드락 시나리오 생성
Thread thread1 = new Thread(() -> {
synchronized (lock1) {
System.out.println("Thread 1: Holding lock1");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (lock2) {
System.out.println("Thread 1: Holding lock1 and lock2");
}
}
});
Thread thread2 = new Thread(() -> {
synchronized (lock2) {
System.out.println("Thread 2: Holding lock2");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (lock1) {
System.out.println("Thread 2: Holding lock1 and lock2");
}
}
});
thread1.start();
thread2.start();
// 스레드 덤프 생성 (프로그래밍 방식)
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadBean.dumpAllThreads(true, true);
for (ThreadInfo info : threadInfos) {
if (info.getThreadState() == Thread.State.BLOCKED) {
System.out.println("BLOCKED Thread: " + info.getThreadName());
System.out.println("Locked by: " + info.getLockOwnerName());
}
}
}
}
프로그래밍 방식 모니터링
import java.lang.management.*;
public class ThreadMonitoring {
private final ThreadMXBean threadBean;
private final MemoryMXBean memoryBean;
private final RuntimeMXBean runtimeBean;
public ThreadMonitoring() {
this.threadBean = ManagementFactory.getThreadMXBean();
this.memoryBean = ManagementFactory.getMemoryMXBean();
this.runtimeBean = ManagementFactory.getRuntimeMXBean();
}
public void printThreadStats() {
System.out.println("=== Thread Statistics ===");
System.out.println("Total Threads: " + threadBean.getThreadCount());
System.out.println("Peak Threads: " + threadBean.getPeakThreadCount());
System.out.println("Daemon Threads: " + threadBean.getDaemonThreadCount());
// 스레드별 상세 정보
ThreadInfo[] threadInfos = threadBean.dumpAllThreads(false, false);
for (ThreadInfo info : threadInfos) {
System.out.printf("Thread: %s, State: %s, CPU Time: %d ns%n",
info.getThreadName(),
info.getThreadState(),
threadBean.getThreadCpuTime(info.getThreadId())
);
}
}
public void detectDeadlock() {
long[] deadlockedThreads = threadBean.findDeadlockedThreads();
if (deadlockedThreads != null) {
System.out.println("🚨 Deadlock detected!");
ThreadInfo[] threadInfos = threadBean.getThreadInfo(deadlockedThreads);
for (ThreadInfo info : threadInfos) {
System.out.println("Deadlocked Thread: " + info.getThreadName());
System.out.println("Lock Info: " + info.getLockInfo());
}
} else {
System.out.println("✅ No deadlock detected");
}
}
public void monitorMemoryUsage() {
MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
System.out.println("=== Memory Statistics ===");
System.out.printf("Used: %d MB%n", heapUsage.getUsed() / 1024 / 1024);
System.out.printf("Max: %d MB%n", heapUsage.getMax() / 1024 / 1024);
System.out.printf("Usage: %.2f%%%n",
(double) heapUsage.getUsed() / heapUsage.getMax() * 100);
}
}
VisualVM을 활용한 프로파일링
VisualVM 설치 및 사용
# VisualVM 다운로드 후 실행
# 애플리케이션 실행 시 JMX 옵션 추가
java -Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.port=9999 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-jar MyApplication.jar
프로파일링 예제
public class ProfilingExample {
private static final int THREAD_COUNT = 10;
private static final int ITERATIONS = 1000000;
public static void main(String[] args) throws InterruptedException {
// CPU 집약적 작업
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
long startTime = System.currentTimeMillis();
for (int i = 0; i < THREAD_COUNT; i++) {
executor.submit(() -> {
try {
cpuIntensiveTask();
} finally {
latch.countDown();
}
});
}
latch.await();
long endTime = System.currentTimeMillis();
System.out.printf("Total time: %d ms%n", endTime - startTime);
executor.shutdown();
}
private static void cpuIntensiveTask() {
long sum = 0;
for (int i = 0; i < ITERATIONS; i++) {
sum += Math.sqrt(i);
}
System.out.println("Sum: " + sum);
}
}
고급 락(Lock) 메커니즘
ReentrantLock 심화
공정성(Fairness) 설정
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
public class FairLockExample {
private final ReentrantLock fairLock = new ReentrantLock(true); // 공정한 락
private final ReentrantLock unfairLock = new ReentrantLock(false); // 비공정한 락
private int sharedResource = 0;
public void fairAccess(String threadName) {
fairLock.lock();
try {
System.out.println(threadName + " acquired fair lock");
Thread.sleep(100);
sharedResource++;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
fairLock.unlock();
}
}
public void unfairAccess(String threadName) {
unfairLock.lock();
try {
System.out.println(threadName + " acquired unfair lock");
Thread.sleep(100);
sharedResource++;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
unfairLock.unlock();
}
}
public static void main(String[] args) {
FairLockExample example = new FairLockExample();
// 공정한 락 테스트
System.out.println("=== Fair Lock Test ===");
for (int i = 0; i < 5; i++) {
final int threadNum = i;
new Thread(() -> {
for (int j = 0; j < 3; j++) {
example.fairAccess("Fair-Thread-" + threadNum);
}
}).start();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 비공정한 락 테스트
System.out.println("\n=== Unfair Lock Test ===");
for (int i = 0; i < 5; i++) {
final int threadNum = i;
new Thread(() -> {
for (int j = 0; j < 3; j++) {
example.unfairAccess("Unfair-Thread-" + threadNum);
}
}).start();
}
}
}
조건 변수(Condition) 활용
public class ConditionExample {
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
private final Object[] items = new Object[10];
private int count = 0;
private int putIndex = 0;
private int takeIndex = 0;
public void put(Object item) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
System.out.println("Buffer full, waiting...");
notFull.await();
}
items[putIndex] = item;
if (++putIndex == items.length) {
putIndex = 0;
}
count++;
System.out.println("Produced: " + item + ", count: " + count);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
System.out.println("Buffer empty, waiting...");
notEmpty.await();
}
Object item = items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) {
takeIndex = 0;
}
count--;
System.out.println("Consumed: " + item + ", count: " + count);
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ConditionExample buffer = new ConditionExample();
// Producer
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
buffer.put("Item-" + i);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
// Consumer
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
buffer.take();
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
}
ReadWriteLock 심화
성능 비교 예제
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockPerformance {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Map<String, String> data = new HashMap<>();
// ReadWriteLock 사용
public String readWithRWLock(String key) {
lock.readLock().lock();
try {
return data.get(key);
} finally {
lock.readLock().unlock();
}
}
public void writeWithRWLock(String key, String value) {
lock.writeLock().lock();
try {
data.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
// 일반 synchronized 사용
public synchronized String readWithSync(String key) {
return data.get(key);
}
public synchronized void writeWithSync(String key, String value) {
data.put(key, value);
}
public static void main(String[] args) throws InterruptedException {
ReadWriteLockPerformance example = new ReadWriteLockPerformance();
// 초기 데이터 설정
for (int i = 0; i < 1000; i++) {
example.writeWithRWLock("key" + i, "value" + i);
}
int readerCount = 10;
int writerCount = 2;
int operationsPerThread = 10000;
// ReadWriteLock 성능 테스트
System.out.println("=== ReadWriteLock Performance Test ===");
long startTime = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(readerCount + writerCount);
// Reader threads
for (int i = 0; i < readerCount; i++) {
new Thread(() -> {
try {
for (int j = 0; j < operationsPerThread; j++) {
example.readWithRWLock("key" + (j % 1000));
}
} finally {
latch.countDown();
}
}).start();
}
// Writer threads
for (int i = 0; i < writerCount; i++) {
new Thread(() -> {
try {
for (int j = 0; j < operationsPerThread / 10; j++) {
example.writeWithRWLock("key" + (j % 1000), "newValue" + j);
}
} finally {
latch.countDown();
}
}).start();
}
latch.await();
long endTime = System.currentTimeMillis();
System.out.printf("ReadWriteLock Time: %d ms%n", endTime - startTime);
}
}
StampedLock 고급 활용
낙관적 읽기(Optimistic Read)
import java.util.concurrent.locks.StampedLock;
public class StampedLockExample {
private final StampedLock lock = new StampedLock();
private double x, y;
// 쓰기 작업
public void write(double newX, double newY) {
long stamp = lock.writeLock();
try {
x = newX;
y = newY;
System.out.println("Written: x=" + x + ", y=" + y);
} finally {
lock.unlockWrite(stamp);
}
}
// 낙관적 읽기
public double distanceFromOrigin() {
long stamp = lock.tryOptimisticRead();
double curX = x, curY = y;
if (!lock.validate(stamp)) {
// 낙관적 읽기 실패, 일반 읽기 락 획득
stamp = lock.readLock();
try {
curX = x;
curY = y;
} finally {
lock.unlockRead(stamp);
}
}
return Math.sqrt(curX * curX + curY * curY);
}
// 읽기 락을 쓰기 락으로 업그레이드
public void moveIfAtOrigin(double newX, double newY) {
long stamp = lock.readLock();
try {
while (x == 0.0 && y == 0.0) {
long ws = lock.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
} else {
lock.unlockRead(stamp);
stamp = lock.writeLock();
}
}
} finally {
lock.unlock(stamp);
}
}
public static void main(String[] args) {
StampedLockExample example = new StampedLockExample();
// Writer
new Thread(() -> {
for (int i = 0; i < 10; i++) {
example.write(i, i * 2);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
// Reader
new Thread(() -> {
for (int i = 0; i < 20; i++) {
double distance = example.distanceFromOrigin();
System.out.println("Distance: " + distance);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
}
락 성능 분석과 최적화
락 경합(Lock Contention) 분석
public class LockContentionAnalysis {
private final ReentrantLock lock = new ReentrantLock();
private int counter = 0;
public void incrementWithLock() {
lock.lock();
try {
counter++;
} finally {
lock.unlock();
}
}
public void incrementWithSync() {
synchronized (this) {
counter++;
}
}
public void incrementWithAtomic() {
// AtomicInteger 사용 (락 없음)
// atomicCounter.incrementAndGet();
}
public void analyzeLockContention() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
// 락 정보 수집
ThreadInfo[] threadInfos = threadBean.dumpAllThreads(true, true);
for (ThreadInfo info : threadInfos) {
if (info.getLockInfo() != null) {
System.out.println("Thread: " + info.getThreadName());
System.out.println("Lock: " + info.getLockInfo());
System.out.println("Blocked Count: " + info.getBlockedCount());
System.out.println("Blocked Time: " + info.getBlockedTime() + " ms");
System.out.println("Waited Count: " + info.getWaitedCount());
System.out.println("Waited Time: " + info.getWaitedTime() + " ms");
System.out.println("---");
}
}
}
public static void main(String[] args) throws InterruptedException {
LockContentionAnalysis analysis = new LockContentionAnalysis();
// 고경합 상황 생성
ExecutorService executor = Executors.newFixedThreadPool(20);
CountDownLatch latch = new CountDownLatch(20);
long startTime = System.currentTimeMillis();
for (int i = 0; i < 20; i++) {
executor.submit(() -> {
try {
for (int j = 0; j < 10000; j++) {
analysis.incrementWithLock();
}
} finally {
latch.countDown();
}
});
}
latch.await();
long endTime = System.currentTimeMillis();
System.out.printf("Lock-based increment time: %d ms%n", endTime - startTime);
// 락 경합 분석
analysis.analyzeLockContention();
executor.shutdown();
}
}
락 프리(Lock-Free) 프로그래밍
import java.util.concurrent.atomic.*;
public class LockFreeProgramming {
// AtomicInteger 사용
private final AtomicInteger atomicCounter = new AtomicInteger(0);
// CAS(Compare-And-Swap) 직접 사용
private volatile int casCounter = 0;
private static final Unsafe UNSAFE;
private static final long COUNTER_OFFSET;
static {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
UNSAFE = (Unsafe) field.get(null);
COUNTER_OFFSET = UNSAFE.objectFieldOffset(
LockFreeProgramming.class.getDeclaredField("casCounter")
);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void incrementAtomic() {
atomicCounter.incrementAndGet();
}
public void incrementCAS() {
int current;
do {
current = casCounter;
} while (!UNSAFE.compareAndSwapInt(this, COUNTER_OFFSET, current, current + 1));
}
// ABA 문제 해결을 위한 AtomicStampedReference
private final AtomicStampedReference<String> stampedRef =
new AtomicStampedReference<>("initial", 0);
public boolean updateWithStamp(String expectedValue, String newValue) {
int[] stampHolder = new int[1];
String currentValue = stampedRef.get(stampHolder);
if (currentValue.equals(expectedValue)) {
return stampedRef.compareAndSet(
expectedValue, newValue,
stampHolder[0], stampHolder[0] + 1
);
}
return false;
}
// Lock-Free Stack 구현
private static class LockFreeStack<T> {
private final AtomicReference<Node<T>> head = new AtomicReference<>();
private static class Node<T> {
final T data;
final AtomicReference<Node<T>> next;
Node(T data) {
this.data = data;
this.next = new AtomicReference<>();
}
}
public void push(T data) {
Node<T> newNode = new Node<>(data);
Node<T> currentHead;
do {
currentHead = head.get();
newNode.next.set(currentHead);
} while (!head.compareAndSet(currentHead, newNode));
}
public T pop() {
Node<T> currentHead;
Node<T> newHead;
do {
currentHead = head.get();
if (currentHead == null) {
return null;
}
newHead = currentHead.next.get();
} while (!head.compareAndSet(currentHead, newHead));
return currentHead.data;
}
}
public static void main(String[] args) throws InterruptedException {
LockFreeProgramming example = new LockFreeProgramming();
// 성능 비교
int threadCount = 10;
int operationsPerThread = 100000;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);
// AtomicInteger 테스트
long startTime = System.currentTimeMillis();
for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
try {
for (int j = 0; j < operationsPerThread; j++) {
example.incrementAtomic();
}
} finally {
latch.countDown();
}
});
}
latch.await();
long atomicTime = System.currentTimeMillis() - startTime;
System.out.printf("AtomicInteger time: %d ms%n", atomicTime);
System.out.printf("AtomicInteger value: %d%n", example.atomicCounter.get());
executor.shutdown();
}
}
실무 모니터링 전략
APM(Application Performance Monitoring) 통합
public class APMIntegration {
private final MeterRegistry meterRegistry;
private final Timer.Sample sample;
public APMIntegration() {
this.meterRegistry = Metrics.globalRegistry;
this.sample = Timer.start(meterRegistry);
}
public void recordThreadMetrics() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
// 스레드 수 메트릭
Gauge.builder("jvm.threads.count")
.description("Number of threads")
.register(meterRegistry, threadBean, ThreadMXBean::getThreadCount);
// 데드락 감지
Gauge.builder("jvm.threads.deadlock")
.description("Number of deadlocked threads")
.register(meterRegistry, () -> {
long[] deadlockedThreads = threadBean.findDeadlockedThreads();
return deadlockedThreads != null ? deadlockedThreads.length : 0;
});
}
public void recordLockMetrics() {
// 락 대기 시간 측정
Timer lockWaitTimer = Timer.builder("lock.wait.time")
.description("Lock wait time")
.register(meterRegistry);
// 락 획득 시도 횟수
Counter lockAttempts = Counter.builder("lock.attempts")
.description("Lock acquisition attempts")
.register(meterRegistry);
// 사용 예제
ReentrantLock lock = new ReentrantLock();
lockAttempts.increment();
Timer.Sample lockSample = Timer.start(meterRegistry);
try {
lock.lock();
// 작업 수행
} finally {
lock.unlock();
lockSample.stop(lockWaitTimer);
}
}
public void recordMemoryMetrics() {
MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
// 힙 메모리 사용량
Gauge.builder("jvm.memory.heap.used")
.description("Used heap memory")
.register(meterRegistry, memoryBean, bean ->
bean.getHeapMemoryUsage().getUsed());
// GC 정보
List<GarbageCollectorMXBean> gcBeans =
ManagementFactory.getGarbageCollectorMXBeans();
for (GarbageCollectorMXBean gcBean : gcBeans) {
Gauge.builder("jvm.gc.collections")
.tag("gc", gcBean.getName())
.description("GC collection count")
.register(meterRegistry, gcBean, GarbageCollectorMXBean::getCollectionCount);
}
}
}
커스텀 모니터링 도구
public class CustomThreadMonitor {
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
private final Map<String, ThreadStats> threadStats =
new ConcurrentHashMap<>();
public static class ThreadStats {
private long cpuTime;
private long blockedTime;
private long waitedTime;
private int blockedCount;
private int waitedCount;
// getters and setters
public long getCpuTime() { return cpuTime; }
public void setCpuTime(long cpuTime) { this.cpuTime = cpuTime; }
// ... 다른 getter/setter들
}
public void startMonitoring() {
scheduler.scheduleAtFixedRate(this::collectStats, 0, 1, TimeUnit.SECONDS);
}
private void collectStats() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadBean.dumpAllThreads(false, false);
for (ThreadInfo info : threadInfos) {
String threadName = info.getThreadName();
ThreadStats stats = threadStats.computeIfAbsent(threadName,
k -> new ThreadStats());
stats.setCpuTime(threadBean.getThreadCpuTime(info.getThreadId()));
stats.setBlockedTime(info.getBlockedTime());
stats.setWaitedTime(info.getWaitedTime());
stats.setBlockedCount(info.getBlockedCount());
stats.setWaitedCount(info.getWaitedCount());
}
// 통계 출력
printStats();
}
private void printStats() {
System.out.println("\n=== Thread Statistics ===");
threadStats.forEach((name, stats) -> {
if (stats.getBlockedCount() > 0 || stats.getWaitedCount() > 0) {
System.out.printf("Thread: %s%n", name);
System.out.printf(" CPU Time: %d ms%n", stats.getCpuTime() / 1_000_000);
System.out.printf(" Blocked: %d times, %d ms%n",
stats.getBlockedCount(), stats.getBlockedTime());
System.out.printf(" Waited: %d times, %d ms%n",
stats.getWaitedCount(), stats.getWaitedTime());
}
});
}
public void stopMonitoring() {
scheduler.shutdown();
}
}
마치며
이번 글에서는 Java의 동시성 프로그래밍을 Thread 기초부터 Executor Framework, java.util.concurrent 패키지까지 실무 관점에서 깊이 있게 다뤄보았습니다.
핵심 요약:
스레드 기초
- Runnable 인터페이스 구현 (권장)
- join, sleep, interrupt 올바른 사용
- 스레드 라이프사이클 이해
동기화
- synchronized로 상호 배제
- wait/notify로 조건 동기화
- volatile로 가시성 보장
- 데드락 예방 (락 순서 고정)
Executor Framework
- ExecutorService로 스레드 풀 관리
- CompletableFuture로 비동기 프로그래밍
- ThreadPoolExecutor 커스터마이징
java.util.concurrent
- CountDownLatch: 일회용 동기화
- CyclicBarrier: 재사용 동기화
- Semaphore: 리소스 제한
- BlockingQueue: Producer-Consumer
- ConcurrentHashMap: 고성능 동시성 맵
고급 락 메커니즘
- ReentrantLock: 공정성 설정과 조건 변수
- ReadWriteLock: 읽기/쓰기 분리로 성능 향상
- StampedLock: 낙관적 읽기와 락 업그레이드
- Lock-Free 프로그래밍: CAS와 Atomic 클래스
모니터링과 성능 분석
- JConsole과 VisualVM 활용
- ThreadMXBean으로 프로그래밍 방식 모니터링
- 락 경합 분석과 최적화
- APM 통합과 커스텀 모니터링 도구
실무 팁:
- 가능하면 고수준 API 사용 (ExecutorService, CompletableFuture)
- Thread보다 Runnable
- synchronized보다 java.util.concurrent
- 공유 상태 최소화
- 불변 객체 활용
- 모니터링을 통한 지속적인 성능 개선
- 락 경합 최소화로 병목 제거
동시성은 어렵지만, 올바르게 사용하면 강력한 성능 향상을 가져올 수 있습니다. 특히 모니터링과 성능 분석을 통해 지속적으로 개선하는 것이 핵심입니다.
다음 단계에서는 JVM & 성능 최적화를 다룰 예정입니다.
'Java' 카테고리의 다른 글
| 7단계: 디자인 패턴 & 아키텍처 (0) | 2025.12.11 |
|---|---|
| 6단계: JVM & 성능 최적화 (0) | 2025.12.11 |
| 4단계: Java 함수형 프로그래밍 (Java 8+) (0) | 2025.12.11 |
| 3단계: Java 컬렉션 & 제네릭 (0) | 2025.12.11 |
| 2단계: Java 객체지향 프로그래밍 (0) | 2025.12.11 |