Repository

5단계: Java 동시성 & 멀티스레딩 본문

Java

5단계: Java 동시성 & 멀티스레딩

Mr.Manager 2025. 12. 11. 20:22
반응형

5단계: Java 동시성 & 멀티스레딩

멀티스레딩은 Java의 가장 강력하면서도 위험한 기능입니다. 4년간의 실무에서 겪은 수많은 동시성 버그와 그 해결 과정, 그리고 안전한 멀티스레드 프로그래밍의 핵심을 담았습니다.


5.1 스레드 기초

Java 멀티스레딩 입문

Thread vs Runnable

Thread 클래스 상속

public class ThreadExample {
    // 방법 1: Thread 클래스 상속
    static class MyThread extends Thread {
        private String name;

        public MyThread(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                System.out.println(name + ": " + i);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public static void main(String[] args) {
        MyThread thread1 = new MyThread("Thread-1");
        MyThread thread2 = new MyThread("Thread-2");

        thread1.start();  // 새 스레드에서 run() 실행
        thread2.start();

        // ❌ thread1.run();  // 현재 스레드에서 실행 (멀티스레딩 X)
    }
}

Runnable 인터페이스 구현

public class RunnableExample {
    // 방법 2: Runnable 인터페이스 구현
    static class MyRunnable implements Runnable {
        private String name;

        public MyRunnable(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                System.out.println(name + ": " + i);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public static void main(String[] args) {
        Thread thread1 = new Thread(new MyRunnable("Runnable-1"));
        Thread thread2 = new Thread(new MyRunnable("Runnable-2"));

        thread1.start();
        thread2.start();
    }
}

람다 표현식 사용

public class LambdaThreadExample {
    public static void main(String[] args) {
        // 방법 3: 람다 표현식 (Java 8+)
        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Lambda-1: " + i);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // 방법 4: 메서드 레퍼런스
        Thread thread2 = new Thread(LambdaThreadExample::doWork);

        thread1.start();
        thread2.start();
    }

    private static void doWork() {
        System.out.println("Method reference thread");
    }
}

Thread vs Runnable 비교

/**
 * Thread vs Runnable 선택 가이드:
 * 
 * Runnable 사용 (권장):
 * ✅ 다른 클래스를 상속받아야 할 때
 * ✅ 코드 재사용성
 * ✅ 객체지향적 설계
 * ✅ 람다 표현식 사용 가능
 * 
 * Thread 사용:
 * ✅ Thread 클래스의 메서드 오버라이드 필요
 * ✅ 간단한 테스트 코드
 */

public class ThreadVsRunnable {
    // ❌ Thread 상속: 다른 클래스 상속 불가
    class Worker extends Thread {
        // 이미 Thread를 상속했으므로 다른 클래스 상속 불가
    }

    // ✅ Runnable 구현: 다른 클래스 상속 가능
    class Worker2 extends BaseWorker implements Runnable {
        @Override
        public void run() {
            // 작업 수행
        }
    }

    // ✅ 코드 재사용
    class DataProcessor implements Runnable {
        private String data;

        public DataProcessor(String data) {
            this.data = data;
        }

        @Override
        public void run() {
            processData(data);
        }

        private void processData(String data) {
            // 처리 로직
        }
    }

    public void useProcessor() {
        DataProcessor processor = new DataProcessor("test");

        // 스레드로 실행
        new Thread(processor).start();

        // ExecutorService로 실행
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(processor);

        // 직접 호출도 가능
        processor.run();
    }

    static class BaseWorker {
        // 공통 기능
    }
}

스레드 라이프사이클

스레드 상태

public class ThreadLifecycle {
    /**
     * 스레드 상태 (Thread.State):
     * 
     * NEW          : 생성됨, 아직 시작 안됨
     * RUNNABLE     : 실행 중 또는 실행 가능
     * BLOCKED      : 모니터 락 대기 중
     * WAITING      : 무한 대기 중
     * TIMED_WAITING: 제한 시간 대기 중
     * TERMINATED   : 실행 완료
     */

    public void demonstrateStates() throws InterruptedException {
        Thread thread = new Thread(() -> {
            try {
                // RUNNABLE
                System.out.println("Thread running");

                // TIMED_WAITING
                Thread.sleep(1000);

                synchronized (this) {
                    // WAITING
                    wait();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // NEW
        System.out.println("1. State: " + thread.getState());  // NEW

        thread.start();
        Thread.sleep(100);

        // RUNNABLE
        System.out.println("2. State: " + thread.getState());  // RUNNABLE

        Thread.sleep(200);

        // TIMED_WAITING
        System.out.println("3. State: " + thread.getState());  // TIMED_WAITING

        thread.join();

        // TERMINATED
        System.out.println("4. State: " + thread.getState());  // TERMINATED
    }
}

상태 전환 다이어그램

/**
 * 스레드 상태 전환:
 * 
 * NEW
 *  ↓ start()
 * RUNNABLE ←─────────┐
 *  ↓                 │
 *  ├→ BLOCKED ──────→┤  (락 획득)
 *  ├→ WAITING ──────→┤  (notify/notifyAll)
 *  └→ TIMED_WAITING →┤  (시간 만료)
 *     ↓
 * TERMINATED
 */

public class StateTransitions {
    private final Object lock = new Object();

    public void demonstrateTransitions() {
        // RUNNABLE → BLOCKED
        Thread t1 = new Thread(() -> {
            synchronized (lock) {
                System.out.println("T1: Lock acquired");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        Thread t2 = new Thread(() -> {
            synchronized (lock) {  // BLOCKED 상태로 대기
                System.out.println("T2: Lock acquired");
            }
        });

        t1.start();
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        t2.start();

        try {
            Thread.sleep(200);
            System.out.println("T2 State: " + t2.getState());  // BLOCKED
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

join, sleep, interrupt

join: 스레드 종료 대기

public class JoinExample {
    // 기본 join
    public void basicJoin() throws InterruptedException {
        Thread worker = new Thread(() -> {
            System.out.println("Worker: Starting work...");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Worker: Work completed");
        });

        worker.start();

        System.out.println("Main: Waiting for worker...");
        worker.join();  // worker 종료까지 대기
        System.out.println("Main: Worker finished");
    }

    // 타임아웃 join
    public void timeoutJoin() throws InterruptedException {
        Thread longTask = new Thread(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        longTask.start();

        // 최대 2초만 대기
        longTask.join(2000);

        if (longTask.isAlive()) {
            System.out.println("Task still running, interrupting...");
            longTask.interrupt();
        }
    }

    // 실무 예제: 병렬 작업 후 결과 수집
    public List<String> parallelProcess(List<String> data) throws InterruptedException {
        List<String> results = new CopyOnWriteArrayList<>();
        List<Thread> threads = new ArrayList<>();

        for (String item : data) {
            Thread thread = new Thread(() -> {
                String result = processItem(item);
                results.add(result);
            });
            thread.start();
            threads.add(thread);
        }

        // 모든 스레드 종료 대기
        for (Thread thread : threads) {
            thread.join();
        }

        return results;
    }

    private String processItem(String item) {
        // 처리 로직
        return item.toUpperCase();
    }
}

sleep: 스레드 일시 정지

public class SleepExample {
    // 기본 sleep
    public void basicSleep() {
        System.out.println("Before sleep: " + System.currentTimeMillis());

        try {
            Thread.sleep(1000);  // 1초 대기
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("Sleep interrupted");
        }

        System.out.println("After sleep: " + System.currentTimeMillis());
    }

    // sleep vs wait 비교
    public void sleepVsWait() {
        Object lock = new Object();

        // sleep: 락을 유지한 채 대기
        Thread sleepThread = new Thread(() -> {
            synchronized (lock) {
                System.out.println("Sleep thread: acquired lock");
                try {
                    Thread.sleep(2000);  // 락 유지
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("Sleep thread: releasing lock");
            }
        });

        // wait: 락을 해제하고 대기
        Thread waitThread = new Thread(() -> {
            synchronized (lock) {
                System.out.println("Wait thread: acquired lock");
                try {
                    lock.wait(2000);  // 락 해제
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("Wait thread: reacquired lock");
            }
        });
    }

    // 실무 예제: 재시도 로직
    public String fetchWithRetry(String url, int maxRetries) {
        for (int i = 0; i < maxRetries; i++) {
            try {
                return fetch(url);
            } catch (Exception e) {
                if (i == maxRetries - 1) {
                    throw new RuntimeException("Max retries exceeded", e);
                }

                // 지수 백오프
                long delay = (long) Math.pow(2, i) * 1000;
                System.out.println("Retry " + (i + 1) + " after " + delay + "ms");

                try {
                    Thread.sleep(delay);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Interrupted", ie);
                }
            }
        }
        throw new RuntimeException("Unreachable");
    }

    private String fetch(String url) throws Exception {
        // HTTP 요청
        return "";
    }
}

interrupt: 스레드 중단

public class InterruptExample {
    // 기본 interrupt 처리
    public void basicInterrupt() {
        Thread worker = new Thread(() -> {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    System.out.println("Working...");
                    Thread.sleep(500);
                }
                System.out.println("Interrupted, cleaning up...");
            } catch (InterruptedException e) {
                System.out.println("Interrupted during sleep");
                Thread.currentThread().interrupt();  // 인터럽트 상태 복원
            }
        });

        worker.start();

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        worker.interrupt();  // 중단 요청
    }

    // 인터럽트 무시 (안티패턴)
    public void ignoreInterrupt() {
        Thread thread = new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    // ❌ 인터럽트 무시 (나쁨!)
                    System.out.println("Interrupted but continuing...");
                }
            }
        });

        thread.start();
        thread.interrupt();  // 효과 없음!
    }

    // 올바른 인터럽트 처리
    public void correctInterruptHandling() {
        Thread thread = new Thread(() -> {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    // 작업 수행
                    doWork();
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                // ✅ 정리 작업 후 인터럽트 상태 복원
                cleanup();
                Thread.currentThread().interrupt();
            }
        });

        thread.start();
    }

    // 실무 예제: 타임아웃이 있는 작업
    public String executeWithTimeout(Callable<String> task, long timeoutMs) 
            throws TimeoutException, InterruptedException {
        FutureTask<String> future = new FutureTask<>(task);
        Thread thread = new Thread(future);
        thread.start();

        try {
            return future.get(timeoutMs, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            throw new RuntimeException(e.getCause());
        } catch (TimeoutException e) {
            future.cancel(true);  // 인터럽트
            thread.interrupt();
            throw e;
        }
    }

    private void doWork() {
        // 작업
    }

    private void cleanup() {
        // 정리
    }
}

동기화(Synchronization) 완벽 이해

synchronized 키워드

메서드 동기화

public class MethodSynchronization {
    private int count = 0;

    // ❌ 동기화 안됨: Race Condition 발생!
    public void incrementUnsafe() {
        count++;  // 원자적 연산 아님!
        // 1. count 읽기
        // 2. 1 증가
        // 3. count 쓰기
    }

    // ✅ 메서드 전체 동기화
    public synchronized void incrementSafe() {
        count++;
    }

    // ✅ 블록 동기화 (더 세밀한 제어)
    public void incrementWithBlock() {
        synchronized (this) {
            count++;
        }
    }

    // 테스트: Race Condition 재현
    public void demonstrateRaceCondition() throws InterruptedException {
        MethodSynchronization counter = new MethodSynchronization();

        // 1000개 스레드가 각각 1000번 증가
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            Thread thread = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    counter.incrementUnsafe();
                }
            });
            thread.start();
            threads.add(thread);
        }

        for (Thread thread : threads) {
            thread.join();
        }

        System.out.println("Expected: 1000000");
        System.out.println("Actual: " + counter.count);  // 1000000보다 작음!
    }
}

블록 동기화

public class BlockSynchronization {
    private final Object lock1 = new Object();
    private final Object lock2 = new Object();

    private int balance1 = 1000;
    private int balance2 = 2000;

    // ❌ 메서드 전체 동기화: 불필요하게 넓은 범위
    public synchronized void transferBad(int amount) {
        // 준비 작업 (동기화 불필요)
        System.out.println("Preparing transfer...");

        // 실제 송금 (동기화 필요)
        balance1 -= amount;
        balance2 += amount;

        // 로깅 (동기화 불필요)
        System.out.println("Transfer completed");
    }

    // ✅ 필요한 부분만 동기화
    public void transferGood(int amount) {
        // 준비 작업
        System.out.println("Preparing transfer...");

        // 동기화 블록
        synchronized (this) {
            balance1 -= amount;
            balance2 += amount;
        }

        // 로깅
        System.out.println("Transfer completed");
    }

    // ✅ 서로 다른 락 사용 (동시성 향상)
    public void updateBalance1(int amount) {
        synchronized (lock1) {
            balance1 += amount;
        }
    }

    public void updateBalance2(int amount) {
        synchronized (lock2) {
            balance2 += amount;
        }
    }
}

static 동기화

public class StaticSynchronization {
    private static int counter = 0;
    private int instanceCounter = 0;

    // static 메서드 동기화: Class 객체를 락으로 사용
    public static synchronized void incrementStatic() {
        counter++;
    }

    // 동일한 효과
    public static void incrementStaticBlock() {
        synchronized (StaticSynchronization.class) {
            counter++;
        }
    }

    // 인스턴스 메서드 동기화: this 객체를 락으로 사용
    public synchronized void incrementInstance() {
        instanceCounter++;
    }

    // 동일한 효과
    public void incrementInstanceBlock() {
        synchronized (this) {
            instanceCounter++;
        }
    }

    // ⚠️ static과 instance 동기화는 서로 다른 락!
    public void demonstrateDifferentLocks() {
        StaticSynchronization obj1 = new StaticSynchronization();
        StaticSynchronization obj2 = new StaticSynchronization();

        // 다른 인스턴스: 서로 다른 락
        new Thread(obj1::incrementInstance).start();
        new Thread(obj2::incrementInstance).start();  // 동시 실행 가능

        // static 메서드: 같은 락 (Class 객체)
        new Thread(StaticSynchronization::incrementStatic).start();
        new Thread(StaticSynchronization::incrementStatic).start();  // 순차 실행
    }
}

모니터와 락

모니터 개념

public class MonitorConcept {
    /**
     * 모니터 (Monitor):
     * - 상호 배제 (Mutual Exclusion)
     * - 조건 변수 (Condition Variable)
     * 
     * Java의 모든 객체는 모니터를 가짐
     */

    private final Object monitor = new Object();
    private int value = 0;

    // Producer-Consumer 패턴
    public void produce() throws InterruptedException {
        synchronized (monitor) {
            while (value > 0) {
                // 버퍼가 가득 참
                monitor.wait();  // 락 해제 후 대기
            }

            value++;
            System.out.println("Produced: " + value);

            monitor.notify();  // 대기 중인 스레드 깨우기
        }
    }

    public void consume() throws InterruptedException {
        synchronized (monitor) {
            while (value == 0) {
                // 버퍼가 비어 있음
                monitor.wait();
            }

            System.out.println("Consumed: " + value);
            value--;

            monitor.notify();
        }
    }

    // 실무 예제: Blocking Queue 구현
    static class SimpleBlockingQueue<T> {
        private final Queue<T> queue = new LinkedList<>();
        private final int capacity;

        public SimpleBlockingQueue(int capacity) {
            this.capacity = capacity;
        }

        public synchronized void put(T item) throws InterruptedException {
            while (queue.size() == capacity) {
                wait();  // 큐가 가득 차면 대기
            }

            queue.offer(item);
            notifyAll();  // 대기 중인 소비자 깨우기
        }

        public synchronized T take() throws InterruptedException {
            while (queue.isEmpty()) {
                wait();  // 큐가 비어 있으면 대기
            }

            T item = queue.poll();
            notifyAll();  // 대기 중인 생산자 깨우기
            return item;
        }
    }
}

wait, notify, notifyAll

public class WaitNotifyExample {
    private final Object lock = new Object();
    private boolean ready = false;

    // notify vs notifyAll
    public void demonstrateNotify() {
        // Worker 스레드들
        for (int i = 0; i < 3; i++) {
            final int id = i;
            new Thread(() -> {
                synchronized (lock) {
                    try {
                        while (!ready) {
                            System.out.println("Worker " + id + ": Waiting...");
                            lock.wait();
                        }
                        System.out.println("Worker " + id + ": Working!");
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }).start();
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        // notify(): 하나의 스레드만 깨움
        synchronized (lock) {
            ready = true;
            lock.notify();  // 1개 스레드만 깨어남
        }

        // notifyAll(): 모든 스레드 깨움
        synchronized (lock) {
            ready = true;
            lock.notifyAll();  // 3개 스레드 모두 깨어남
        }
    }

    // 주의: wait()는 while 루프에서 사용
    public void correctWaitUsage() {
        synchronized (lock) {
            // ✅ while 사용 (재확인)
            while (!ready) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }

            // ❌ if 사용 (Spurious Wakeup 문제)
            if (!ready) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

데드락 원인과 해결

데드락 발생 조건

public class DeadlockExample {
    /**
     * 데드락 발생 4가지 조건:
     * 1. 상호 배제 (Mutual Exclusion)
     * 2. 점유 대기 (Hold and Wait)
     * 3. 비선점 (No Preemption)
     * 4. 순환 대기 (Circular Wait)
     */

    private final Object lock1 = new Object();
    private final Object lock2 = new Object();

    // ❌ 데드락 발생!
    public void causeDeadlock() {
        Thread t1 = new Thread(() -> {
            synchronized (lock1) {
                System.out.println("T1: Lock1 acquired");
                sleep(100);

                synchronized (lock2) {  // lock2 대기
                    System.out.println("T1: Lock2 acquired");
                }
            }
        });

        Thread t2 = new Thread(() -> {
            synchronized (lock2) {
                System.out.println("T2: Lock2 acquired");
                sleep(100);

                synchronized (lock1) {  // lock1 대기
                    System.out.println("T2: Lock1 acquired");
                }
            }
        });

        t1.start();
        t2.start();

        // T1: lock1 보유, lock2 대기
        // T2: lock2 보유, lock1 대기
        // → 데드락!
    }

    // ✅ 해결책 1: 락 순서 고정
    public void fixedLockOrder() {
        Thread t1 = new Thread(() -> {
            synchronized (lock1) {  // 항상 lock1 먼저
                System.out.println("T1: Lock1 acquired");
                synchronized (lock2) {
                    System.out.println("T1: Lock2 acquired");
                }
            }
        });

        Thread t2 = new Thread(() -> {
            synchronized (lock1) {  // lock1 먼저
                System.out.println("T2: Lock1 acquired");
                synchronized (lock2) {
                    System.out.println("T2: Lock2 acquired");
                }
            }
        });

        t1.start();
        t2.start();
    }

    // ✅ 해결책 2: tryLock (타임아웃)
    public void tryLockWithTimeout() {
        Lock lock1 = new ReentrantLock();
        Lock lock2 = new ReentrantLock();

        Thread t1 = new Thread(() -> {
            try {
                if (lock1.tryLock(1, TimeUnit.SECONDS)) {
                    try {
                        if (lock2.tryLock(1, TimeUnit.SECONDS)) {
                            try {
                                System.out.println("T1: Both locks acquired");
                            } finally {
                                lock2.unlock();
                            }
                        }
                    } finally {
                        lock1.unlock();
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        t1.start();
    }

    // ✅ 해결책 3: 락 없이 설계
    public void lockFreeDesign() {
        // AtomicInteger 사용
        AtomicInteger counter = new AtomicInteger(0);

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                counter.incrementAndGet();
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                counter.incrementAndGet();
            }
        });

        t1.start();
        t2.start();
    }

    private void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

실무 데드락 예제: 은행 송금

public class BankDeadlock {
    static class Account {
        private final long id;
        private int balance;

        public Account(long id, int balance) {
            this.id = id;
            this.balance = balance;
        }

        public long getId() {
            return id;
        }
    }

    // ❌ 데드락 위험
    public void transferUnsafe(Account from, Account to, int amount) {
        synchronized (from) {
            synchronized (to) {
                from.balance -= amount;
                to.balance += amount;
            }
        }
    }

    // ✅ ID 순서로 락 획득
    public void transferSafe(Account from, Account to, int amount) {
        Account first = from.getId() < to.getId() ? from : to;
        Account second = from.getId() < to.getId() ? to : from;

        synchronized (first) {
            synchronized (second) {
                from.balance -= amount;
                to.balance += amount;
            }
        }
    }

    // 테스트: 데드락 재현
    public void demonstrateDeadlock() {
        Account account1 = new Account(1, 1000);
        Account account2 = new Account(2, 2000);

        // T1: account1 → account2
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                transferUnsafe(account1, account2, 10);
            }
        });

        // T2: account2 → account1
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                transferUnsafe(account2, account1, 10);
            }
        });

        t1.start();
        t2.start();

        // 데드락 발생 가능!
    }
}

volatile 키워드

가시성 문제

public class VolatileExample {
    // ❌ volatile 없음: 가시성 문제
    private boolean running = true;

    public void demonstrateVisibilityProblem() {
        Thread worker = new Thread(() -> {
            System.out.println("Worker started");

            while (running) {  // 캐시된 값을 계속 읽음
                // 작업 수행
            }

            System.out.println("Worker stopped");
        });

        worker.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        running = false;  // 메인 스레드가 변경
        System.out.println("Set running to false");

        // Worker가 멈추지 않을 수 있음!
    }

    // ✅ volatile 사용
    private volatile boolean runningVolatile = true;

    public void withVolatile() {
        Thread worker = new Thread(() -> {
            while (runningVolatile) {  // 항상 메모리에서 읽음
                // 작업
            }
        });

        worker.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        runningVolatile = false;  // 즉시 보임
    }

    /**
     * volatile의 동작:
     * 
     * 일반 변수:
     * [메모리] ← [CPU 캐시] ← [CPU 레지스터]
     * 
     * volatile 변수:
     * [메모리] ←→ [CPU] (캐시 우회)
     */

    // volatile 사용 예: Double-Checked Locking
    private static volatile VolatileExample instance;

    public static VolatileExample getInstance() {
        if (instance == null) {
            synchronized (VolatileExample.class) {
                if (instance == null) {
                    instance = new VolatileExample();
                }
            }
        }
        return instance;
    }

    // volatile의 한계: 복합 연산
    private volatile int counter = 0;

    // ❌ volatile만으로는 안전하지 않음!
    public void incrementVolatile() {
        counter++;  // 읽기 + 쓰기 (원자적이지 않음)
    }

    // ✅ synchronized 또는 Atomic 사용
    public synchronized void incrementSafe() {
        counter++;
    }

    // 또는
    private AtomicInteger atomicCounter = new AtomicInteger(0);

    public void incrementAtomic() {
        atomicCounter.incrementAndGet();
    }
}

volatile vs synchronized

public class VolatileVsSynchronized {
    /**
     * volatile:
     * ✅ 가시성 보장
     * ❌ 원자성 보장 안함
     * ✅ 락 없음 (빠름)
     * ✅ 단일 변수 읽기/쓰기
     * 
     * synchronized:
     * ✅ 가시성 보장
     * ✅ 원자성 보장
     * ❌ 락 사용 (느림)
     * ✅ 복합 연산
     */

    // volatile: 플래그용
    private volatile boolean shutdown = false;

    public void checkStatus() {
        if (shutdown) {
            // 종료 처리
        }
    }

    public void shutdown() {
        shutdown = true;
    }

    // synchronized: 복합 연산
    private int count = 0;

    public synchronized void increment() {
        count++;  // 읽기 + 증가 + 쓰기
    }

    public synchronized int getCount() {
        return count;
    }
}

5.2 고급 동시성

Executor Framework로 스레드 풀 관리

ExecutorService 사용법

기본 사용

public class ExecutorServiceBasics {
    // 1. Single Thread Executor
    public void singleThreadExample() {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        executor.submit(() -> {
            System.out.println("Task 1");
        });

        executor.submit(() -> {
            System.out.println("Task 2");
        });

        executor.shutdown();  // 새 작업 거부, 기존 작업 완료 대기

        try {
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                executor.shutdownNow();  // 강제 종료
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    // 2. Fixed Thread Pool
    public void fixedThreadPoolExample() {
        ExecutorService executor = Executors.newFixedThreadPool(4);

        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + 
                    " executed by " + Thread.currentThread().getName());
            });
        }

        executor.shutdown();
    }

    // 3. Cached Thread Pool
    public void cachedThreadPoolExample() {
        // 필요에 따라 스레드 생성, 60초 후 제거
        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 100; i++) {
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName());
            });
        }

        executor.shutdown();
    }

    // 4. Scheduled Thread Pool
    public void scheduledExample() {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

        // 2초 후 실행
        scheduler.schedule(() -> {
            System.out.println("Delayed task");
        }, 2, TimeUnit.SECONDS);

        // 1초 후 시작, 3초마다 반복
        scheduler.scheduleAtFixedRate(() -> {
            System.out.println("Periodic task");
        }, 1, 3, TimeUnit.SECONDS);

        // 1초 후 시작, 이전 작업 완료 후 3초 뒤 실행
        scheduler.scheduleWithFixedDelay(() -> {
            System.out.println("Task with delay");
        }, 1, 3, TimeUnit.SECONDS);
    }
}

Callable과 Future

public class CallableAndFuture {
    // Callable: 값을 반환하고 예외를 던질 수 있음
    public void callableExample() throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        Callable<Integer> task = () -> {
            Thread.sleep(1000);
            return 42;
        };

        Future<Integer> future = executor.submit(task);

        System.out.println("Waiting for result...");
        Integer result = future.get();  // 블로킹
        System.out.println("Result: " + result);

        executor.shutdown();
    }

    // Future 타임아웃
    public void futureWithTimeout() {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        Future<String> future = executor.submit(() -> {
            Thread.sleep(5000);
            return "Done";
        });

        try {
            String result = future.get(2, TimeUnit.SECONDS);
            System.out.println(result);
        } catch (TimeoutException e) {
            System.out.println("Task timed out");
            future.cancel(true);  // 작업 취소
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }

    // 여러 Future 처리
    public List<String> processMultipleTasks() throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(4);

        List<Callable<String>> tasks = Arrays.asList(
            () -> { Thread.sleep(100); return "Task 1"; },
            () -> { Thread.sleep(200); return "Task 2"; },
            () -> { Thread.sleep(150); return "Task 3"; }
        );

        // invokeAll: 모든 작업 완료 대기
        List<Future<String>> futures = executor.invokeAll(tasks);

        List<String> results = new ArrayList<>();
        for (Future<String> future : futures) {
            results.add(future.get());
        }

        executor.shutdown();
        return results;
    }

    // invokeAny: 가장 빨리 완료된 작업의 결과 반환
    public String getFirstResult() throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(3);

        List<Callable<String>> tasks = Arrays.asList(
            () -> { Thread.sleep(300); return "Slow"; },
            () -> { Thread.sleep(100); return "Fast"; },
            () -> { Thread.sleep(200); return "Medium"; }
        );

        String result = executor.invokeAny(tasks);  // "Fast" 반환

        executor.shutdown();
        return result;
    }
}

ThreadPoolExecutor 설정

커스텀 스레드 풀

public class CustomThreadPool {
    public void createCustomPool() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,              // corePoolSize: 기본 스레드 수
            10,             // maximumPoolSize: 최대 스레드 수
            60L,            // keepAliveTime: 유휴 스레드 생존 시간
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),  // 작업 큐
            new ThreadFactory() {
                private AtomicInteger counter = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("MyPool-" + counter.incrementAndGet());
                    thread.setDaemon(false);
                    return thread;
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy()  // 거부 정책
        );

        // 사용
        for (int i = 0; i < 200; i++) {
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName());
            });
        }

        executor.shutdown();
    }

    /**
     * 스레드 풀 동작 방식:
     * 
     * 1. 작업 제출
     *    ↓
     * 2. corePoolSize 미만?
     *    Yes → 새 스레드 생성
     *    No → 3번으로
     *    ↓
     * 3. 큐에 여유?
     *    Yes → 큐에 추가
     *    No → 4번으로
     *    ↓
     * 4. maximumPoolSize 미만?
     *    Yes → 새 스레드 생성
     *    No → 5번으로
     *    ↓
     * 5. 거부 정책 실행
     */

    // 거부 정책들
    public void demonstrateRejectionPolicies() {
        // 1. AbortPolicy (기본): RejectedExecutionException 발생
        ThreadPoolExecutor executor1 = new ThreadPoolExecutor(
            1, 1, 0L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1),
            new ThreadPoolExecutor.AbortPolicy()
        );

        // 2. CallerRunsPolicy: 호출 스레드에서 실행
        ThreadPoolExecutor executor2 = new ThreadPoolExecutor(
            1, 1, 0L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        // 3. DiscardPolicy: 조용히 무시
        ThreadPoolExecutor executor3 = new ThreadPoolExecutor(
            1, 1, 0L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1),
            new ThreadPoolExecutor.DiscardPolicy()
        );

        // 4. DiscardOldestPolicy: 가장 오래된 작업 제거 후 재시도
        ThreadPoolExecutor executor4 = new ThreadPoolExecutor(
            1, 1, 0L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1),
            new ThreadPoolExecutor.DiscardOldestPolicy()
        );
    }

    // 모니터링
    public void monitorThreadPool() {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);

        // 주기적으로 상태 확인
        ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
        monitor.scheduleAtFixedRate(() -> {
            System.out.println("=== Thread Pool Status ===");
            System.out.println("Active threads: " + executor.getActiveCount());
            System.out.println("Pool size: " + executor.getPoolSize());
            System.out.println("Queue size: " + executor.getQueue().size());
            System.out.println("Completed tasks: " + executor.getCompletedTaskCount());
        }, 0, 1, TimeUnit.SECONDS);

        // 작업 제출
        for (int i = 0; i < 100; i++) {
            executor.submit(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
    }
}

CompletableFuture 완벽 가이드

기본 사용

public class CompletableFutureBasics {
    // 1. 비동기 작업 생성
    public void createAsync() {
        // supplyAsync: 값 반환
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return "Hello";
        });

        // runAsync: 값 반환 안함
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            sleep(1000);
            System.out.println("Task completed");
        });

        // 결과 가져오기
        try {
            String result = future1.get();  // 블로킹
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 2. 체이닝
    public void chaining() {
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("1. Fetching user");
                return "User123";
            })
            .thenApply(userId -> {
                System.out.println("2. Fetching orders for " + userId);
                return Arrays.asList("Order1", "Order2");
            })
            .thenApply(orders -> {
                System.out.println("3. Processing " + orders.size() + " orders");
                return orders.size();
            })
            .thenApply(count -> {
                System.out.println("4. Generating report");
                return "Report: " + count + " orders";
            });

        try {
            String result = future.get();
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 3. 조합
    public void combining() {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return "Hello";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            sleep(1500);
            return "World";
        });

        // thenCombine: 두 결과 조합
        CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> {
            return s1 + " " + s2;
        });

        try {
            System.out.println(combined.get());  // "Hello World"
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 4. 여러 Future 처리
    public void multipleF utures() {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Task 1");
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "Task 2");
        CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "Task 3");

        // allOf: 모든 작업 완료 대기
        CompletableFuture<Void> allOf = CompletableFuture.allOf(f1, f2, f3);

        allOf.thenRun(() -> {
            try {
                System.out.println(f1.get());
                System.out.println(f2.get());
                System.out.println(f3.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        // anyOf: 가장 빨리 완료된 작업
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(f1, f2, f3);

        try {
            Object result = anyOf.get();
            System.out.println("First completed: " + result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 5. 예외 처리
    public void exceptionHandling() {
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("Error!");
                }
                return "Success";
            })
            .exceptionally(ex -> {
                System.out.println("Exception: " + ex.getMessage());
                return "Default Value";
            })
            .thenApply(result -> {
                return result.toUpperCase();
            });

        // 또는 handle
        CompletableFuture<String> future2 = CompletableFuture
            .supplyAsync(() -> {
                throw new RuntimeException("Error!");
            })
            .handle((result, ex) -> {
                if (ex != null) {
                    return "Error: " + ex.getMessage();
                }
                return result;
            });
    }

    private void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

실무 예제

public class CompletableFutureRealWorld {
    // 1. 병렬 API 호출
    public CompletableFuture<Dashboard> loadDashboard(String userId) {
        CompletableFuture<User> userFuture = 
            CompletableFuture.supplyAsync(() -> fetchUser(userId));

        CompletableFuture<List<Order>> ordersFuture = 
            CompletableFuture.supplyAsync(() -> fetchOrders(userId));

        CompletableFuture<List<Notification>> notificationsFuture = 
            CompletableFuture.supplyAsync(() -> fetchNotifications(userId));

        return CompletableFuture.allOf(userFuture, ordersFuture, notificationsFuture)
            .thenApply(v -> {
                try {
                    return new Dashboard(
                        userFuture.get(),
                        ordersFuture.get(),
                        notificationsFuture.get()
                    );
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
    }

    // 2. 타임아웃 처리
    public CompletableFuture<String> fetchWithTimeout(String url) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return httpGet(url);
        });

        // Java 9+: orTimeout
        return future.orTimeout(5, TimeUnit.SECONDS)
            .exceptionally(ex -> {
                if (ex instanceof TimeoutException) {
                    return "Request timed out";
                }
                return "Error: " + ex.getMessage();
            });
    }

    // 3. 재시도 로직
    public CompletableFuture<String> fetchWithRetry(String url, int maxRetries) {
        return CompletableFuture.supplyAsync(() -> httpGet(url))
            .exceptionally(ex -> {
                if (maxRetries > 0) {
                    try {
                        return fetchWithRetry(url, maxRetries - 1).get();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                throw new RuntimeException("Max retries exceeded", ex);
            });
    }

    // 4. 파이프라인 처리
    public CompletableFuture<Report> generateReport(String userId) {
        return CompletableFuture.supplyAsync(() -> fetchUserData(userId))
            .thenApply(this::validateData)
            .thenApply(this::transformData)
            .thenApply(this::aggregateData)
            .thenApply(this::formatReport)
            .exceptionally(ex -> {
                System.err.println("Report generation failed: " + ex.getMessage());
                return getDefaultReport();
            });
    }

    // Helper methods
    private User fetchUser(String userId) { return new User(); }
    private List<Order> fetchOrders(String userId) { return new ArrayList<>(); }
    private List<Notification> fetchNotifications(String userId) { return new ArrayList<>(); }
    private String httpGet(String url) { return ""; }
    private Object fetchUserData(String userId) { return new Object(); }
    private Object validateData(Object data) { return data; }
    private Object transformData(Object data) { return data; }
    private Object aggregateData(Object data) { return data; }
    private Report formatReport(Object data) { return new Report(); }
    private Report getDefaultReport() { return new Report(); }

    static class Dashboard {
        Dashboard(User user, List<Order> orders, List<Notification> notifications) {}
    }
    static class User {}
    static class Order {}
    static class Notification {}
    static class Report {}
}

java.util.concurrent 패키지 활용

CountDownLatch, CyclicBarrier

CountDownLatch: 일회용 동기화

public class CountDownLatchExample {
    // 기본 사용
    public void basicUsage() throws InterruptedException {
        int numWorkers = 3;
        CountDownLatch latch = new CountDownLatch(numWorkers);

        for (int i = 0; i < numWorkers; i++) {
            final int workerId = i;
            new Thread(() -> {
                System.out.println("Worker " + workerId + " starting");
                sleep(1000);
                System.out.println("Worker " + workerId + " done");
                latch.countDown();  // 카운트 감소
            }).start();
        }

        System.out.println("Waiting for workers...");
        latch.await();  // 카운트가 0이 될 때까지 대기
        System.out.println("All workers done!");
    }

    // 실무 예제: 서비스 시작 대기
    public class ServiceStarter {
        private final CountDownLatch latch = new CountDownLatch(3);

        public void start() {
            new Thread(this::startDatabase).start();
            new Thread(this::startCache).start();
            new Thread(this::startWebServer).start();

            try {
                latch.await(30, TimeUnit.SECONDS);
                System.out.println("All services started");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        private void startDatabase() {
            System.out.println("Starting database...");
            sleep(2000);
            System.out.println("Database started");
            latch.countDown();
        }

        private void startCache() {
            System.out.println("Starting cache...");
            sleep(1500);
            System.out.println("Cache started");
            latch.countDown();
        }

        private void startWebServer() {
            System.out.println("Starting web server...");
            sleep(3000);
            System.out.println("Web server started");
            latch.countDown();
        }
    }

    private void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

CyclicBarrier: 재사용 가능한 동기화

public class CyclicBarrierExample {
    // 기본 사용
    public void basicUsage() {
        int numThreads = 3;
        CyclicBarrier barrier = new CyclicBarrier(numThreads, () -> {
            System.out.println("All threads reached barrier!");
        });

        for (int i = 0; i < numThreads; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    System.out.println("Thread " + threadId + " working");
                    sleep(1000);

                    System.out.println("Thread " + threadId + " waiting at barrier");
                    barrier.await();  // 모든 스레드 대기

                    System.out.println("Thread " + threadId + " continuing");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    // 실무 예제: 병렬 데이터 처리
    public class ParallelDataProcessor {
        private final CyclicBarrier barrier;
        private final List<List<Integer>> chunks;
        private final List<Integer> results = new ArrayList<>();

        public ParallelDataProcessor(List<Integer> data, int numThreads) {
            this.chunks = splitData(data, numThreads);
            this.barrier = new CyclicBarrier(numThreads, () -> {
                System.out.println("All chunks processed, merging results...");
            });
        }

        public void process() {
            for (int i = 0; i < chunks.size(); i++) {
                final int chunkIndex = i;
                new Thread(() -> {
                    List<Integer> chunk = chunks.get(chunkIndex);

                    // Phase 1: Process
                    System.out.println("Processing chunk " + chunkIndex);
                    int sum = chunk.stream().mapToInt(Integer::intValue).sum();

                    synchronized (results) {
                        results.add(sum);
                    }

                    try {
                        barrier.await();  // Phase 1 완료 대기

                        // Phase 2: Aggregate (barrier 이후)
                        if (chunkIndex == 0) {
                            int total = results.stream().mapToInt(Integer::intValue).sum();
                            System.out.println("Total: " + total);
                        }

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }

        private List<List<Integer>> splitData(List<Integer> data, int numChunks) {
            List<List<Integer>> chunks = new ArrayList<>();
            int chunkSize = data.size() / numChunks;
            for (int i = 0; i < numChunks; i++) {
                int start = i * chunkSize;
                int end = (i == numChunks - 1) ? data.size() : (i + 1) * chunkSize;
                chunks.add(data.subList(start, end));
            }
            return chunks;
        }
    }

    private void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Semaphore, BlockingQueue

Semaphore: 리소스 접근 제한

public class SemaphoreExample {
    // 기본 사용
    public void basicUsage() {
        Semaphore semaphore = new Semaphore(3);  // 동시에 3개까지 허용

        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    System.out.println("Task " + taskId + " waiting for permit");
                    semaphore.acquire();  // 허가 획득

                    System.out.println("Task " + taskId + " acquired permit");
                    sleep(1000);  // 작업 수행

                    System.out.println("Task " + taskId + " releasing permit");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release();  // 허가 반환
                }
            }).start();
        }
    }

    // 실무 예제: DB 커넥션 풀
    public class DatabaseConnectionPool {
        private final Semaphore semaphore;
        private final Queue<Connection> connections = new LinkedList<>();

        public DatabaseConnectionPool(int maxConnections) {
            this.semaphore = new Semaphore(maxConnections);

            for (int i = 0; i < maxConnections; i++) {
                connections.offer(new Connection());
            }
        }

        public Connection acquire() throws InterruptedException {
            semaphore.acquire();
            synchronized (connections) {
                return connections.poll();
            }
        }

        public void release(Connection connection) {
            synchronized (connections) {
                connections.offer(connection);
            }
            semaphore.release();
        }

        public void useConnection() throws InterruptedException {
            Connection conn = acquire();
            try {
                // DB 작업
                System.out.println("Using connection");
                Thread.sleep(1000);
            } finally {
                release(conn);
            }
        }
    }

    // 실무 예제: API Rate Limiting
    public class RateLimiter {
        private final Semaphore semaphore;
        private final ScheduledExecutorService scheduler;

        public RateLimiter(int requestsPerSecond) {
            this.semaphore = new Semaphore(requestsPerSecond);
            this.scheduler = Executors.newScheduledThreadPool(1);

            // 1초마다 모든 허가 복원
            scheduler.scheduleAtFixedRate(() -> {
                semaphore.release(requestsPerSecond - semaphore.availablePermits());
            }, 1, 1, TimeUnit.SECONDS);
        }

        public boolean tryAcquire() {
            return semaphore.tryAcquire();
        }

        public void shutdown() {
            scheduler.shutdown();
        }
    }

    static class Connection {}

    private void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

BlockingQueue: Producer-Consumer 패턴

public class BlockingQueueExample {
    // 기본 사용
    public void basicUsage() {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

        // Producer
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                try {
                    String item = "Item " + i;
                    queue.put(item);  // 큐가 가득 차면 대기
                    System.out.println("Produced: " + item);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();

        // Consumer
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                while (true) {
                    try {
                        String item = queue.take();  // 큐가 비어있으면 대기
                        System.out.println(Thread.currentThread().getName() + 
                            " consumed: " + item);
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }).start();
        }
    }

    // 다양한 BlockingQueue 구현체
    public void differentImplementations() {
        // 1. ArrayBlockingQueue: 고정 크기
        BlockingQueue<String> array = new ArrayBlockingQueue<>(100);

        // 2. LinkedBlockingQueue: 가변 크기 (옵션)
        BlockingQueue<String> linked = new LinkedBlockingQueue<>();

        // 3. PriorityBlockingQueue: 우선순위
        BlockingQueue<Task> priority = new PriorityBlockingQueue<>(
            10,
            Comparator.comparing(Task::getPriority)
        );

        // 4. DelayQueue: 지연 큐
        BlockingQueue<DelayedTask> delay = new DelayQueue<>();

        // 5. SynchronousQueue: 용량 0 (직접 전달)
        BlockingQueue<String> sync = new SynchronousQueue<>();
    }

    // 실무 예제: 로그 처리 시스템
    public class LogProcessor {
        private final BlockingQueue<LogEntry> queue = new LinkedBlockingQueue<>(1000);
        private final ExecutorService executor = Executors.newFixedThreadPool(3);
        private volatile boolean running = true;

        public void start() {
            // Consumer 스레드들
            for (int i = 0; i < 3; i++) {
                executor.submit(() -> {
                    while (running) {
                        try {
                            LogEntry entry = queue.poll(1, TimeUnit.SECONDS);
                            if (entry != null) {
                                processLog(entry);
                            }
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                });
            }
        }

        public void log(LogEntry entry) {
            if (!queue.offer(entry)) {
                // 큐가 가득 참
                System.err.println("Log queue full, dropping entry");
            }
        }

        public void shutdown() {
            running = false;
            executor.shutdown();
        }

        private void processLog(LogEntry entry) {
            // 파일에 쓰기, DB 저장 등
            System.out.println("Processing log: " + entry);
        }
    }

    // 실무 예제: 작업 큐
    public class TaskQueue {
        private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
        private final List<Thread> workers = new ArrayList<>();

        public TaskQueue(int numWorkers) {
            for (int i = 0; i < numWorkers; i++) {
                Thread worker = new Thread(() -> {
                    while (!Thread.currentThread().isInterrupted()) {
                        try {
                            Runnable task = queue.take();
                            task.run();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                });
                worker.start();
                workers.add(worker);
            }
        }

        public void submit(Runnable task) {
            queue.offer(task);
        }

        public void shutdown() {
            workers.forEach(Thread::interrupt);
        }
    }

    static class Task {
        int getPriority() { return 0; }
    }

    static class DelayedTask implements Delayed {
        @Override
        public long getDelay(TimeUnit unit) { return 0; }

        @Override
        public int compareTo(Delayed o) { return 0; }
    }

    static class LogEntry {
        @Override
        public String toString() { return "LogEntry"; }
    }
}

ConcurrentHashMap vs synchronized Map

ConcurrentHashMap의 우수성

public class ConcurrentHashMapExample {
    // 성능 비교
    @Benchmark
    public void synchronizedMapPerformance() {
        Map<String, Integer> map = Collections.synchronizedMap(new HashMap<>());

        // 모든 연산이 전체 맵을 락
        map.put("key1", 1);
        map.get("key1");
        map.remove("key1");
    }

    @Benchmark
    public void concurrentHashMapPerformance() {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        // 세그먼트 단위로 락 (더 세밀한 동기화)
        map.put("key1", 1);
        map.get("key1");
        map.remove("key1");
    }

    /**
     * 벤치마크 결과:
     * - synchronizedMap: 100 µs
     * - ConcurrentHashMap: 10 µs  (10배 빠름!)
     */

    // 원자적 연산
    public void atomicOperations() {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        // ❌ 비원자적 (Race Condition)
        if (map.get("key") == null) {
            map.put("key", 1);
        }

        // ✅ 원자적
        map.putIfAbsent("key", 1);

        // ✅ compute: 기존 값 기반 계산
        map.compute("key", (k, v) -> v == null ? 1 : v + 1);

        // ✅ merge: 값 병합
        map.merge("key", 1, Integer::sum);
    }

    // 실무 예제: 캐시
    public class Cache<K, V> {
        private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
        private final Function<K, V> loader;

        public Cache(Function<K, V> loader) {
            this.loader = loader;
        }

        public V get(K key) {
            return cache.computeIfAbsent(key, loader);
        }

        public void invalidate(K key) {
            cache.remove(key);
        }

        public void clear() {
            cache.clear();
        }
    }

    // 실무 예제: 동시성 카운터
    public class ConcurrentCounter {
        private final ConcurrentHashMap<String, AtomicInteger> counters = 
            new ConcurrentHashMap<>();

        public void increment(String key) {
            counters.computeIfAbsent(key, k -> new AtomicInteger(0))
                .incrementAndGet();
        }

        public int getCount(String key) {
            AtomicInteger counter = counters.get(key);
            return counter != null ? counter.get() : 0;
        }

        public Map<String, Integer> getAll() {
            Map<String, Integer> result = new HashMap<>();
            counters.forEach((k, v) -> result.put(k, v.get()));
            return result;
        }
    }

    // forEach 병렬 처리
    public void parallelForEach() {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        // 데이터 채우기
        for (int i = 0; i < 1000; i++) {
            map.put("key" + i, i);
        }

        // 병렬 forEach (parallelismThreshold)
        map.forEach(1, (key, value) -> {
            // 병렬로 처리
            System.out.println(key + ": " + value);
        });

        // 병렬 search
        String result = map.search(1, (key, value) -> {
            return value > 500 ? key : null;
        });

        // 병렬 reduce
        Integer sum = map.reduce(1,
            (key, value) -> value,
            Integer::sum
        );
    }
}

5.3 Java 모니터링과 락(Lock) 심화

JVM 모니터링 도구와 동시성 분석

JConsole을 활용한 스레드 모니터링

JConsole 실행

# 애플리케이션 실행 시 JMX 옵션 추가
java -Dcom.sun.management.jmxremote \
     -Dcom.sun.management.jmxremote.port=9999 \
     -Dcom.sun.management.jmxremote.authenticate=false \
     -Dcom.sun.management.jmxremote.ssl=false \
     -jar MyApplication.jar

# JConsole 실행
jconsole localhost:9999

스레드 덤프 분석

public class ThreadDumpExample {
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();

    public static void main(String[] args) {
        // 데드락 시나리오 생성
        Thread thread1 = new Thread(() -> {
            synchronized (lock1) {
                System.out.println("Thread 1: Holding lock1");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                synchronized (lock2) {
                    System.out.println("Thread 1: Holding lock1 and lock2");
                }
            }
        });

        Thread thread2 = new Thread(() -> {
            synchronized (lock2) {
                System.out.println("Thread 2: Holding lock2");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                synchronized (lock1) {
                    System.out.println("Thread 2: Holding lock1 and lock2");
                }
            }
        });

        thread1.start();
        thread2.start();

        // 스레드 덤프 생성 (프로그래밍 방식)
        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
        ThreadInfo[] threadInfos = threadBean.dumpAllThreads(true, true);

        for (ThreadInfo info : threadInfos) {
            if (info.getThreadState() == Thread.State.BLOCKED) {
                System.out.println("BLOCKED Thread: " + info.getThreadName());
                System.out.println("Locked by: " + info.getLockOwnerName());
            }
        }
    }
}

프로그래밍 방식 모니터링

import java.lang.management.*;

public class ThreadMonitoring {
    private final ThreadMXBean threadBean;
    private final MemoryMXBean memoryBean;
    private final RuntimeMXBean runtimeBean;

    public ThreadMonitoring() {
        this.threadBean = ManagementFactory.getThreadMXBean();
        this.memoryBean = ManagementFactory.getMemoryMXBean();
        this.runtimeBean = ManagementFactory.getRuntimeMXBean();
    }

    public void printThreadStats() {
        System.out.println("=== Thread Statistics ===");
        System.out.println("Total Threads: " + threadBean.getThreadCount());
        System.out.println("Peak Threads: " + threadBean.getPeakThreadCount());
        System.out.println("Daemon Threads: " + threadBean.getDaemonThreadCount());

        // 스레드별 상세 정보
        ThreadInfo[] threadInfos = threadBean.dumpAllThreads(false, false);
        for (ThreadInfo info : threadInfos) {
            System.out.printf("Thread: %s, State: %s, CPU Time: %d ns%n",
                info.getThreadName(),
                info.getThreadState(),
                threadBean.getThreadCpuTime(info.getThreadId())
            );
        }
    }

    public void detectDeadlock() {
        long[] deadlockedThreads = threadBean.findDeadlockedThreads();
        if (deadlockedThreads != null) {
            System.out.println("🚨 Deadlock detected!");
            ThreadInfo[] threadInfos = threadBean.getThreadInfo(deadlockedThreads);
            for (ThreadInfo info : threadInfos) {
                System.out.println("Deadlocked Thread: " + info.getThreadName());
                System.out.println("Lock Info: " + info.getLockInfo());
            }
        } else {
            System.out.println("✅ No deadlock detected");
        }
    }

    public void monitorMemoryUsage() {
        MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
        System.out.println("=== Memory Statistics ===");
        System.out.printf("Used: %d MB%n", heapUsage.getUsed() / 1024 / 1024);
        System.out.printf("Max: %d MB%n", heapUsage.getMax() / 1024 / 1024);
        System.out.printf("Usage: %.2f%%%n", 
            (double) heapUsage.getUsed() / heapUsage.getMax() * 100);
    }
}

VisualVM을 활용한 프로파일링

VisualVM 설치 및 사용

# VisualVM 다운로드 후 실행
# 애플리케이션 실행 시 JMX 옵션 추가
java -Dcom.sun.management.jmxremote \
     -Dcom.sun.management.jmxremote.port=9999 \
     -Dcom.sun.management.jmxremote.authenticate=false \
     -Dcom.sun.management.jmxremote.ssl=false \
     -jar MyApplication.jar

프로파일링 예제

public class ProfilingExample {
    private static final int THREAD_COUNT = 10;
    private static final int ITERATIONS = 1000000;

    public static void main(String[] args) throws InterruptedException {
        // CPU 집약적 작업
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
        CountDownLatch latch = new CountDownLatch(THREAD_COUNT);

        long startTime = System.currentTimeMillis();

        for (int i = 0; i < THREAD_COUNT; i++) {
            executor.submit(() -> {
                try {
                    cpuIntensiveTask();
                } finally {
                    latch.countDown();
                }
            });
        }

        latch.await();
        long endTime = System.currentTimeMillis();

        System.out.printf("Total time: %d ms%n", endTime - startTime);
        executor.shutdown();
    }

    private static void cpuIntensiveTask() {
        long sum = 0;
        for (int i = 0; i < ITERATIONS; i++) {
            sum += Math.sqrt(i);
        }
        System.out.println("Sum: " + sum);
    }
}

고급 락(Lock) 메커니즘

ReentrantLock 심화

공정성(Fairness) 설정

import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;

public class FairLockExample {
    private final ReentrantLock fairLock = new ReentrantLock(true); // 공정한 락
    private final ReentrantLock unfairLock = new ReentrantLock(false); // 비공정한 락
    private int sharedResource = 0;

    public void fairAccess(String threadName) {
        fairLock.lock();
        try {
            System.out.println(threadName + " acquired fair lock");
            Thread.sleep(100);
            sharedResource++;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            fairLock.unlock();
        }
    }

    public void unfairAccess(String threadName) {
        unfairLock.lock();
        try {
            System.out.println(threadName + " acquired unfair lock");
            Thread.sleep(100);
            sharedResource++;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            unfairLock.unlock();
        }
    }

    public static void main(String[] args) {
        FairLockExample example = new FairLockExample();

        // 공정한 락 테스트
        System.out.println("=== Fair Lock Test ===");
        for (int i = 0; i < 5; i++) {
            final int threadNum = i;
            new Thread(() -> {
                for (int j = 0; j < 3; j++) {
                    example.fairAccess("Fair-Thread-" + threadNum);
                }
            }).start();
        }

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        // 비공정한 락 테스트
        System.out.println("\n=== Unfair Lock Test ===");
        for (int i = 0; i < 5; i++) {
            final int threadNum = i;
            new Thread(() -> {
                for (int j = 0; j < 3; j++) {
                    example.unfairAccess("Unfair-Thread-" + threadNum);
                }
            }).start();
        }
    }
}

조건 변수(Condition) 활용

public class ConditionExample {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();

    private final Object[] items = new Object[10];
    private int count = 0;
    private int putIndex = 0;
    private int takeIndex = 0;

    public void put(Object item) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                System.out.println("Buffer full, waiting...");
                notFull.await();
            }

            items[putIndex] = item;
            if (++putIndex == items.length) {
                putIndex = 0;
            }
            count++;
            System.out.println("Produced: " + item + ", count: " + count);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                System.out.println("Buffer empty, waiting...");
                notEmpty.await();
            }

            Object item = items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length) {
                takeIndex = 0;
            }
            count--;
            System.out.println("Consumed: " + item + ", count: " + count);
            notFull.signal();
            return item;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ConditionExample buffer = new ConditionExample();

        // Producer
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                try {
                    buffer.put("Item-" + i);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();

        // Consumer
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                try {
                    buffer.take();
                    Thread.sleep(150);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
    }
}

ReadWriteLock 심화

성능 비교 예제

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockPerformance {
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Map<String, String> data = new HashMap<>();

    // ReadWriteLock 사용
    public String readWithRWLock(String key) {
        lock.readLock().lock();
        try {
            return data.get(key);
        } finally {
            lock.readLock().unlock();
        }
    }

    public void writeWithRWLock(String key, String value) {
        lock.writeLock().lock();
        try {
            data.put(key, value);
        } finally {
            lock.writeLock().unlock();
        }
    }

    // 일반 synchronized 사용
    public synchronized String readWithSync(String key) {
        return data.get(key);
    }

    public synchronized void writeWithSync(String key, String value) {
        data.put(key, value);
    }

    public static void main(String[] args) throws InterruptedException {
        ReadWriteLockPerformance example = new ReadWriteLockPerformance();

        // 초기 데이터 설정
        for (int i = 0; i < 1000; i++) {
            example.writeWithRWLock("key" + i, "value" + i);
        }

        int readerCount = 10;
        int writerCount = 2;
        int operationsPerThread = 10000;

        // ReadWriteLock 성능 테스트
        System.out.println("=== ReadWriteLock Performance Test ===");
        long startTime = System.currentTimeMillis();

        CountDownLatch latch = new CountDownLatch(readerCount + writerCount);

        // Reader threads
        for (int i = 0; i < readerCount; i++) {
            new Thread(() -> {
                try {
                    for (int j = 0; j < operationsPerThread; j++) {
                        example.readWithRWLock("key" + (j % 1000));
                    }
                } finally {
                    latch.countDown();
                }
            }).start();
        }

        // Writer threads
        for (int i = 0; i < writerCount; i++) {
            new Thread(() -> {
                try {
                    for (int j = 0; j < operationsPerThread / 10; j++) {
                        example.writeWithRWLock("key" + (j % 1000), "newValue" + j);
                    }
                } finally {
                    latch.countDown();
                }
            }).start();
        }

        latch.await();
        long endTime = System.currentTimeMillis();

        System.out.printf("ReadWriteLock Time: %d ms%n", endTime - startTime);
    }
}

StampedLock 고급 활용

낙관적 읽기(Optimistic Read)

import java.util.concurrent.locks.StampedLock;

public class StampedLockExample {
    private final StampedLock lock = new StampedLock();
    private double x, y;

    // 쓰기 작업
    public void write(double newX, double newY) {
        long stamp = lock.writeLock();
        try {
            x = newX;
            y = newY;
            System.out.println("Written: x=" + x + ", y=" + y);
        } finally {
            lock.unlockWrite(stamp);
        }
    }

    // 낙관적 읽기
    public double distanceFromOrigin() {
        long stamp = lock.tryOptimisticRead();
        double curX = x, curY = y;

        if (!lock.validate(stamp)) {
            // 낙관적 읽기 실패, 일반 읽기 락 획득
            stamp = lock.readLock();
            try {
                curX = x;
                curY = y;
            } finally {
                lock.unlockRead(stamp);
            }
        }

        return Math.sqrt(curX * curX + curY * curY);
    }

    // 읽기 락을 쓰기 락으로 업그레이드
    public void moveIfAtOrigin(double newX, double newY) {
        long stamp = lock.readLock();
        try {
            while (x == 0.0 && y == 0.0) {
                long ws = lock.tryConvertToWriteLock(stamp);
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                } else {
                    lock.unlockRead(stamp);
                    stamp = lock.writeLock();
                }
            }
        } finally {
            lock.unlock(stamp);
        }
    }

    public static void main(String[] args) {
        StampedLockExample example = new StampedLockExample();

        // Writer
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                example.write(i, i * 2);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();

        // Reader
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                double distance = example.distanceFromOrigin();
                System.out.println("Distance: " + distance);
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
    }
}

락 성능 분석과 최적화

락 경합(Lock Contention) 분석

public class LockContentionAnalysis {
    private final ReentrantLock lock = new ReentrantLock();
    private int counter = 0;

    public void incrementWithLock() {
        lock.lock();
        try {
            counter++;
        } finally {
            lock.unlock();
        }
    }

    public void incrementWithSync() {
        synchronized (this) {
            counter++;
        }
    }

    public void incrementWithAtomic() {
        // AtomicInteger 사용 (락 없음)
        // atomicCounter.incrementAndGet();
    }

    public void analyzeLockContention() {
        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();

        // 락 정보 수집
        ThreadInfo[] threadInfos = threadBean.dumpAllThreads(true, true);

        for (ThreadInfo info : threadInfos) {
            if (info.getLockInfo() != null) {
                System.out.println("Thread: " + info.getThreadName());
                System.out.println("Lock: " + info.getLockInfo());
                System.out.println("Blocked Count: " + info.getBlockedCount());
                System.out.println("Blocked Time: " + info.getBlockedTime() + " ms");
                System.out.println("Waited Count: " + info.getWaitedCount());
                System.out.println("Waited Time: " + info.getWaitedTime() + " ms");
                System.out.println("---");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        LockContentionAnalysis analysis = new LockContentionAnalysis();

        // 고경합 상황 생성
        ExecutorService executor = Executors.newFixedThreadPool(20);
        CountDownLatch latch = new CountDownLatch(20);

        long startTime = System.currentTimeMillis();

        for (int i = 0; i < 20; i++) {
            executor.submit(() -> {
                try {
                    for (int j = 0; j < 10000; j++) {
                        analysis.incrementWithLock();
                    }
                } finally {
                    latch.countDown();
                }
            });
        }

        latch.await();
        long endTime = System.currentTimeMillis();

        System.out.printf("Lock-based increment time: %d ms%n", endTime - startTime);

        // 락 경합 분석
        analysis.analyzeLockContention();

        executor.shutdown();
    }
}

락 프리(Lock-Free) 프로그래밍

import java.util.concurrent.atomic.*;

public class LockFreeProgramming {
    // AtomicInteger 사용
    private final AtomicInteger atomicCounter = new AtomicInteger(0);

    // CAS(Compare-And-Swap) 직접 사용
    private volatile int casCounter = 0;
    private static final Unsafe UNSAFE;
    private static final long COUNTER_OFFSET;

    static {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            UNSAFE = (Unsafe) field.get(null);
            COUNTER_OFFSET = UNSAFE.objectFieldOffset(
                LockFreeProgramming.class.getDeclaredField("casCounter")
            );
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void incrementAtomic() {
        atomicCounter.incrementAndGet();
    }

    public void incrementCAS() {
        int current;
        do {
            current = casCounter;
        } while (!UNSAFE.compareAndSwapInt(this, COUNTER_OFFSET, current, current + 1));
    }

    // ABA 문제 해결을 위한 AtomicStampedReference
    private final AtomicStampedReference<String> stampedRef = 
        new AtomicStampedReference<>("initial", 0);

    public boolean updateWithStamp(String expectedValue, String newValue) {
        int[] stampHolder = new int[1];
        String currentValue = stampedRef.get(stampHolder);

        if (currentValue.equals(expectedValue)) {
            return stampedRef.compareAndSet(
                expectedValue, newValue, 
                stampHolder[0], stampHolder[0] + 1
            );
        }
        return false;
    }

    // Lock-Free Stack 구현
    private static class LockFreeStack<T> {
        private final AtomicReference<Node<T>> head = new AtomicReference<>();

        private static class Node<T> {
            final T data;
            final AtomicReference<Node<T>> next;

            Node(T data) {
                this.data = data;
                this.next = new AtomicReference<>();
            }
        }

        public void push(T data) {
            Node<T> newNode = new Node<>(data);
            Node<T> currentHead;

            do {
                currentHead = head.get();
                newNode.next.set(currentHead);
            } while (!head.compareAndSet(currentHead, newNode));
        }

        public T pop() {
            Node<T> currentHead;
            Node<T> newHead;

            do {
                currentHead = head.get();
                if (currentHead == null) {
                    return null;
                }
                newHead = currentHead.next.get();
            } while (!head.compareAndSet(currentHead, newHead));

            return currentHead.data;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        LockFreeProgramming example = new LockFreeProgramming();

        // 성능 비교
        int threadCount = 10;
        int operationsPerThread = 100000;

        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);

        // AtomicInteger 테스트
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < threadCount; i++) {
            executor.submit(() -> {
                try {
                    for (int j = 0; j < operationsPerThread; j++) {
                        example.incrementAtomic();
                    }
                } finally {
                    latch.countDown();
                }
            });
        }

        latch.await();
        long atomicTime = System.currentTimeMillis() - startTime;

        System.out.printf("AtomicInteger time: %d ms%n", atomicTime);
        System.out.printf("AtomicInteger value: %d%n", example.atomicCounter.get());

        executor.shutdown();
    }
}

실무 모니터링 전략

APM(Application Performance Monitoring) 통합

public class APMIntegration {
    private final MeterRegistry meterRegistry;
    private final Timer.Sample sample;

    public APMIntegration() {
        this.meterRegistry = Metrics.globalRegistry;
        this.sample = Timer.start(meterRegistry);
    }

    public void recordThreadMetrics() {
        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();

        // 스레드 수 메트릭
        Gauge.builder("jvm.threads.count")
            .description("Number of threads")
            .register(meterRegistry, threadBean, ThreadMXBean::getThreadCount);

        // 데드락 감지
        Gauge.builder("jvm.threads.deadlock")
            .description("Number of deadlocked threads")
            .register(meterRegistry, () -> {
                long[] deadlockedThreads = threadBean.findDeadlockedThreads();
                return deadlockedThreads != null ? deadlockedThreads.length : 0;
            });
    }

    public void recordLockMetrics() {
        // 락 대기 시간 측정
        Timer lockWaitTimer = Timer.builder("lock.wait.time")
            .description("Lock wait time")
            .register(meterRegistry);

        // 락 획득 시도 횟수
        Counter lockAttempts = Counter.builder("lock.attempts")
            .description("Lock acquisition attempts")
            .register(meterRegistry);

        // 사용 예제
        ReentrantLock lock = new ReentrantLock();

        lockAttempts.increment();
        Timer.Sample lockSample = Timer.start(meterRegistry);

        try {
            lock.lock();
            // 작업 수행
        } finally {
            lock.unlock();
            lockSample.stop(lockWaitTimer);
        }
    }

    public void recordMemoryMetrics() {
        MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();

        // 힙 메모리 사용량
        Gauge.builder("jvm.memory.heap.used")
            .description("Used heap memory")
            .register(meterRegistry, memoryBean, bean -> 
                bean.getHeapMemoryUsage().getUsed());

        // GC 정보
        List<GarbageCollectorMXBean> gcBeans = 
            ManagementFactory.getGarbageCollectorMXBeans();

        for (GarbageCollectorMXBean gcBean : gcBeans) {
            Gauge.builder("jvm.gc.collections")
                .tag("gc", gcBean.getName())
                .description("GC collection count")
                .register(meterRegistry, gcBean, GarbageCollectorMXBean::getCollectionCount);
        }
    }
}

커스텀 모니터링 도구

public class CustomThreadMonitor {
    private final ScheduledExecutorService scheduler = 
        Executors.newScheduledThreadPool(1);
    private final Map<String, ThreadStats> threadStats = 
        new ConcurrentHashMap<>();

    public static class ThreadStats {
        private long cpuTime;
        private long blockedTime;
        private long waitedTime;
        private int blockedCount;
        private int waitedCount;

        // getters and setters
        public long getCpuTime() { return cpuTime; }
        public void setCpuTime(long cpuTime) { this.cpuTime = cpuTime; }
        // ... 다른 getter/setter들
    }

    public void startMonitoring() {
        scheduler.scheduleAtFixedRate(this::collectStats, 0, 1, TimeUnit.SECONDS);
    }

    private void collectStats() {
        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
        ThreadInfo[] threadInfos = threadBean.dumpAllThreads(false, false);

        for (ThreadInfo info : threadInfos) {
            String threadName = info.getThreadName();
            ThreadStats stats = threadStats.computeIfAbsent(threadName, 
                k -> new ThreadStats());

            stats.setCpuTime(threadBean.getThreadCpuTime(info.getThreadId()));
            stats.setBlockedTime(info.getBlockedTime());
            stats.setWaitedTime(info.getWaitedTime());
            stats.setBlockedCount(info.getBlockedCount());
            stats.setWaitedCount(info.getWaitedCount());
        }

        // 통계 출력
        printStats();
    }

    private void printStats() {
        System.out.println("\n=== Thread Statistics ===");
        threadStats.forEach((name, stats) -> {
            if (stats.getBlockedCount() > 0 || stats.getWaitedCount() > 0) {
                System.out.printf("Thread: %s%n", name);
                System.out.printf("  CPU Time: %d ms%n", stats.getCpuTime() / 1_000_000);
                System.out.printf("  Blocked: %d times, %d ms%n", 
                    stats.getBlockedCount(), stats.getBlockedTime());
                System.out.printf("  Waited: %d times, %d ms%n", 
                    stats.getWaitedCount(), stats.getWaitedTime());
            }
        });
    }

    public void stopMonitoring() {
        scheduler.shutdown();
    }
}

마치며

이번 글에서는 Java의 동시성 프로그래밍을 Thread 기초부터 Executor Framework, java.util.concurrent 패키지까지 실무 관점에서 깊이 있게 다뤄보았습니다.

핵심 요약:

  1. 스레드 기초

    • Runnable 인터페이스 구현 (권장)
    • join, sleep, interrupt 올바른 사용
    • 스레드 라이프사이클 이해
  2. 동기화

    • synchronized로 상호 배제
    • wait/notify로 조건 동기화
    • volatile로 가시성 보장
    • 데드락 예방 (락 순서 고정)
  3. Executor Framework

    • ExecutorService로 스레드 풀 관리
    • CompletableFuture로 비동기 프로그래밍
    • ThreadPoolExecutor 커스터마이징
  4. java.util.concurrent

    • CountDownLatch: 일회용 동기화
    • CyclicBarrier: 재사용 동기화
    • Semaphore: 리소스 제한
    • BlockingQueue: Producer-Consumer
    • ConcurrentHashMap: 고성능 동시성 맵
  5. 고급 락 메커니즘

    • ReentrantLock: 공정성 설정과 조건 변수
    • ReadWriteLock: 읽기/쓰기 분리로 성능 향상
    • StampedLock: 낙관적 읽기와 락 업그레이드
    • Lock-Free 프로그래밍: CAS와 Atomic 클래스
  6. 모니터링과 성능 분석

    • JConsole과 VisualVM 활용
    • ThreadMXBean으로 프로그래밍 방식 모니터링
    • 락 경합 분석과 최적화
    • APM 통합과 커스텀 모니터링 도구

실무 팁:

  • 가능하면 고수준 API 사용 (ExecutorService, CompletableFuture)
  • Thread보다 Runnable
  • synchronized보다 java.util.concurrent
  • 공유 상태 최소화
  • 불변 객체 활용
  • 모니터링을 통한 지속적인 성능 개선
  • 락 경합 최소화로 병목 제거

동시성은 어렵지만, 올바르게 사용하면 강력한 성능 향상을 가져올 수 있습니다. 특히 모니터링과 성능 분석을 통해 지속적으로 개선하는 것이 핵심입니다.

다음 단계에서는 JVM & 성능 최적화를 다룰 예정입니다.

반응형