강화학습 완벽 가이드: 이론부터 실전까지

강화학습(Reinforcement Learning)은 인공지능(AI) 분야에서 기계가 스스로 학습하고 결정할 수 있는 능력을 부여하는 중요한 기술입니다. 이 방법론은 로봇 제어, 게임 플레이, 자율 주행 자동차 등 다양한 분야에서 혁신적인 결과를 보여주고 있습니다. 본 가이드에서는 강화학습의 역사, 이론적 배경, 주요 알고리즘, 그리고 실제 구현까지 심층적으로 다룹니다.

강화학습 완벽 가이드: 이론부터 실전까지

강화학습 완벽 가이드: 이론부터 실전까지

도입부

강화학습(Reinforcement Learning)은 인공지능(AI) 분야에서 기계가 스스로 학습하고 결정할 수 있는 능력을 부여하는 중요한 기술입니다. 이 방법론은 로봇 제어, 게임 플레이, 자율 주행 자동차, 추천 시스템, 금융 거래 등 다양한 분야에서 혁신적인 결과를 보여주고 있습니다.

강화학습은 에이전트(Agent)가 환경(Environment)과 상호작용하면서 최적의 정책(Policy)을 학습하는 과정입니다. 여기서 에이전트는 주어진 환경에서 행동을 선택하여 장기적인 보상을 최대화하려고 노력합니다. 이 과정은 주로 탐험(Exploration)과 활용(Exploitation)의 균형을 맞추는 데 중점을 둡니다.

본 가이드에서는 강화학습의 역사와 발전 과정, 수학적 기초, 주요 알고리즘들, 그리고 실제 구현까지 심층적으로 다룹니다.

강화학습의 역사와 주요 마일스톤

초기 발전 (1950s-1980s)

강화학습의 뿌리는 심리학의 행동주의(Behaviorism)와 동적 프로그래밍(Dynamic Programming)에서 찾을 수 있습니다.

  • 1950년대: 심리학자 B.F. Skinner의 조작적 조건화(Operant Conditioning) 이론이 강화학습의 기초를 제공했습니다.
  • 1957년: Richard Bellman이 동적 프로그래밍을 제안하며 벨만 방정식(Bellman Equation)을 소개했습니다.
  • 1972년: Klopf가 강화학습의 초기 형태를 제안했습니다.
  • 1989년: Chris Watkins가 Q-Learning 알고리즘을 개발하여 강화학습의 중요한 전환점을 마련했습니다.

현대 강화학습의 부상 (1990s-2010s)

  • 1992년: Gerald Tesauro가 TD-Gammon을 개발하여 백개먼 게임에서 인간 전문가 수준의 성능을 달성했습니다.
  • 1998년: Richard Sutton과 Andrew Barto의 "Reinforcement Learning: An Introduction" 출판으로 이론적 기반이 확립되었습니다.
  • 2013년: DeepMind가 DQN(Deep Q-Network)을 개발하여 Atari 게임에서 인간 수준의 성능을 달성했습니다.
  • 2015년: AlphaGo의 등장으로 바둑에서 인간 챔피언을 이겼습니다.
  • 2016년: OpenAI가 PPO(Proximal Policy Optimization)를 발표하며 정책 기반 방법의 안정성을 크게 향상시켰습니다.

최근의 혁신 (2020s-)

  • 2020년: AlphaFold가 단백질 구조 예측 문제를 해결하며 생물학 분야에 혁명을 일으켰습니다.
  • 2021-2023년: ChatGPT와 같은 대규모 언어 모델에서 RLHF(Reinforcement Learning from Human Feedback)가 핵심 기술로 자리잡았습니다.
  • 2024-현재: 멀티모달 AI, 로봇 공학, 자율 주행 등 다양한 분야에서 강화학습이 핵심 기술로 활용되고 있습니다.

강화학습의 수학적 기초

MDP (Markov Decision Process)

강화학습의 수학적 기반은 마르코프 결정 과정(MDP)입니다. MDP는 다음과 같은 튜플로 정의됩니다:

MDP=(S,A,P,R,γ)\text{MDP} = (S, A, P, R, \gamma)

여기서:

  • SS: 상태 공간(State Space) - 모든 가능한 상태들의 집합
  • AA: 행동 공간(Action Space) - 모든 가능한 행동들의 집합
  • PP: 상태 전이 확률(Transition Probability) - P(ss,a)P(s'|s, a)는 상태 ss에서 행동 aa를 취했을 때 상태 ss'로 전이할 확률
  • RR: 보상 함수(Reward Function) - R(s,a,s)R(s, a, s')는 상태 ss에서 행동 aa를 취하고 ss'로 전이했을 때 받는 보상
  • γ\gamma: 할인율(Discount Factor) - 0γ<10 \leq \gamma < 1, 미래 보상의 현재 가치

마르코프 속성 (Markov Property)

MDP의 핵심 가정은 마르코프 속성입니다:

P(st+1st,at,st1,at1,...,s0,a0)=P(st+1st,at)P(s_{t+1} | s_t, a_t, s_{t-1}, a_{t-1}, ..., s_0, a_0) = P(s_{t+1} | s_t, a_t)

이는 미래 상태가 현재 상태와 행동에만 의존하며, 과거 이력에는 독립적임을 의미합니다.

정책 (Policy)

정책 π\pi는 각 상태에서 어떤 행동을 선택할지 결정하는 전략입니다:

  • 결정론적 정책(Deterministic Policy): a=π(s)a = \pi(s)
  • 확률적 정책(Stochastic Policy): π(as)=P(as)\pi(a|s) = P(a|s)

가치 함수 (Value Function)

가치 함수는 특정 상태나 상태-행동 쌍의 가치를 평가합니다.

상태 가치 함수 (State Value Function)

상태 ss에서 정책 π\pi를 따를 때의 기대 누적 보상:

Vπ(s)=Eπ[t=0γtRt+1S0=s]V^\pi(s) = \mathbb{E}_\pi \left[ \sum_{t=0}^{\infty} \gamma^t R_{t+1} | S_0 = s \right]

행동 가치 함수 (Action Value Function)

상태 ss에서 행동 aa를 취하고 이후 정책 π\pi를 따를 때의 기대 누적 보상:

Qπ(s,a)=Eπ[t=0γtRt+1S0=s,A0=a]Q^\pi(s, a) = \mathbb{E}_\pi \left[ \sum_{t=0}^{\infty} \gamma^t R_{t+1} | S_0 = s, A_0 = a \right]

벨만 방정식 (Bellman Equation)

벨만 방정식은 가치 함수의 재귀적 관계를 정의합니다.

벨만 기대 방정식 (Bellman Expectation Equation)

Vπ(s)=aAπ(as)sSP(ss,a)[R(s,a,s)+γVπ(s)]V^\pi(s) = \sum_{a \in A} \pi(a|s) \sum_{s' \in S} P(s'|s,a) \left[ R(s,a,s') + \gamma V^\pi(s') \right] Qπ(s,a)=sSP(ss,a)[R(s,a,s)+γaAπ(as)Qπ(s,a)]Q^\pi(s,a) = \sum_{s' \in S} P(s'|s,a) \left[ R(s,a,s') + \gamma \sum_{a' \in A} \pi(a'|s') Q^\pi(s',a') \right]

벨만 최적 방정식 (Bellman Optimality Equation)

V(s)=maxaAsSP(ss,a)[R(s,a,s)+γV(s)]V^*(s) = \max_{a \in A} \sum_{s' \in S} P(s'|s,a) \left[ R(s,a,s') + \gamma V^*(s') \right] Q(s,a)=sSP(ss,a)[R(s,a,s)+γmaxaAQ(s,a)]Q^*(s,a) = \sum_{s' \in S} P(s'|s,a) \left[ R(s,a,s') + \gamma \max_{a' \in A} Q^*(s',a') \right]

최적 정책은 다음과 같이 구할 수 있습니다:

π(s)=argmaxaAQ(s,a)\pi^*(s) = \arg\max_{a \in A} Q^*(s,a)

Q-Learning: 가치 기반 강화학습의 핵심

Q-Learning 알고리즘의 이론

Q-Learning은 model-free 오프-정책(off-policy) 알고리즘으로, 환경의 dynamics를 모르는 상태에서도 최적 행동 가치 함수 QQ^*를 학습할 수 있습니다.

Q-Learning 업데이트 규칙

Q(st,at)Q(st,at)+α[rt+1+γmaxaQ(st+1,a)Q(st,at)]Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \alpha \left[ r_{t+1} + \gamma \max_{a'} Q(s_{t+1}, a') - Q(s_t, a_t) \right]

여기서:

  • α\alpha: 학습률(Learning Rate, 0<α10 < \alpha \leq 1)
  • γ\gamma: 할인율(Discount Factor, 0γ<10 \leq \gamma < 1)
  • rt+1r_{t+1}: 즉각적인 보상
  • maxaQ(st+1,a)\max_{a'} Q(s_{t+1}, a'): 다음 상태에서의 최대 Q-값

Temporal Difference (TD) Error

TD 오차는 예측과 실제의 차이를 나타냅니다:

δt=rt+1+γmaxaQ(st+1,a)Q(st,at)\delta_t = r_{t+1} + \gamma \max_{a'} Q(s_{t+1}, a') - Q(s_t, a_t)

Q-Learning 구현

import gym
import numpy as np
import matplotlib.pyplot as plt

class QLearningAgent:
    def __init__(self, state_size, action_size, learning_rate=0.1,
                 discount_factor=0.99, epsilon=1.0, epsilon_decay=0.995,
                 epsilon_min=0.01):
        """
        Q-Learning 에이전트 초기화

        Parameters:
        -----------
        state_size : int
            상태 공간의 크기
        action_size : int
            행동 공간의 크기
        learning_rate : float
            학습률 (alpha)
        discount_factor : float
            할인율 (gamma)
        epsilon : float
            초기 탐험 확률
        epsilon_decay : float
            에피소드마다 epsilon 감소율
        epsilon_min : float
            최소 epsilon 값
        """
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min

        # Q-테이블 초기화 (모든 값을 0으로)
        self.q_table = np.zeros([state_size, action_size])

    def get_action(self, state):
        """
        epsilon-greedy 정책을 사용하여 행동 선택
        """
        # 탐험(Exploration): 무작위 행동 선택
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.action_size)
        # 활용(Exploitation): Q-값이 가장 큰 행동 선택
        else:
            return np.argmax(self.q_table[state])

    def learn(self, state, action, reward, next_state, done):
        """
        Q-Learning 업데이트 규칙 적용
        """
        # 현재 Q-값
        current_q = self.q_table[state, action]

        # 다음 상태의 최대 Q-값 (에피소드가 끝났다면 0)
        if done:
            max_next_q = 0
        else:
            max_next_q = np.max(self.q_table[next_state])

        # TD 타겟 계산
        td_target = reward + self.discount_factor * max_next_q

        # TD 오차 계산
        td_error = td_target - current_q

        # Q-값 업데이트
        self.q_table[state, action] += self.learning_rate * td_error

    def decay_epsilon(self):
        """
        epsilon 값 감소
        """
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

# 환경 설정 및 학습
def train_q_learning(env_name='FrozenLake-v1', num_episodes=5000, render_interval=1000):
    """
    Q-Learning 알고리즘으로 에이전트 학습
    """
    env = gym.make(env_name, is_slippery=False)
    agent = QLearningAgent(
        state_size=env.observation_space.n,
        action_size=env.action_space.n
    )

    # 학습 통계
    episode_rewards = []
    success_rate_history = []

    for episode in range(num_episodes):
        state, _ = env.reset()
        total_reward = 0
        done = False

        while not done:
            # 행동 선택
            action = agent.get_action(state)

            # 환경과 상호작용
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            # Q-값 업데이트
            agent.learn(state, action, reward, next_state, done)

            state = next_state
            total_reward += reward

        # epsilon 감소
        agent.decay_epsilon()

        # 통계 기록
        episode_rewards.append(total_reward)

        # 최근 100 에피소드의 성공률 계산
        if episode >= 100:
            recent_success_rate = np.mean(episode_rewards[-100:])
            success_rate_history.append(recent_success_rate)

        # 진행상황 출력
        if (episode + 1) % render_interval == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            print(f"Episode {episode + 1}/{num_episodes}, "
                  f"Avg Reward: {avg_reward:.2f}, "
                  f"Epsilon: {agent.epsilon:.3f}")

    env.close()
    return agent, episode_rewards, success_rate_history

# 학습 실행
agent, rewards, success_rates = train_q_learning()

print("\n학습된 Q-테이블:")
print(agent.q_table)

탐험 전략 (Exploration Strategies)

1. Epsilon-Greedy

가장 일반적인 전략으로, 확률 ϵ\epsilon로 무작위 행동을 선택합니다:

def epsilon_greedy(q_values, epsilon):
    if np.random.rand() < epsilon:
        return np.random.randint(len(q_values))
    return np.argmax(q_values)

2. Boltzmann Exploration (Softmax)

온도 매개변수를 사용한 확률적 선택:

def boltzmann_exploration(q_values, temperature=1.0):
    exp_q = np.exp(q_values / temperature)
    probs = exp_q / np.sum(exp_q)
    return np.random.choice(len(q_values), p=probs)

3. Upper Confidence Bound (UCB)

불확실성을 고려한 탐험:

def ucb_action_selection(q_values, action_counts, total_count, c=2.0):
    ucb_values = q_values + c * np.sqrt(np.log(total_count + 1) / (action_counts + 1e-5))
    return np.argmax(ucb_values)

DQN (Deep Q-Network): 딥러닝과의 만남

DQN의 혁신

DQN은 2013년 DeepMind가 제안한 알고리즘으로, 신경망을 사용하여 Q-함수를 근사합니다. 이를 통해 고차원 상태 공간(예: 이미지)에서도 강화학습이 가능해졌습니다.

DQN의 핵심 기법

  1. Experience Replay: 과거 경험을 메모리에 저장하고 무작위로 샘플링하여 학습
  2. Target Network: 안정적인 학습을 위해 별도의 타겟 네트워크 사용
  3. Reward Clipping: 보상을 일정 범위로 제한하여 학습 안정화

DQN의 손실 함수

L(θ)=E(s,a,r,s)D[(r+γmaxaQ(s,a;θ)Q(s,a;θ))2]L(\theta) = \mathbb{E}_{(s,a,r,s') \sim \mathcal{D}} \left[ \left( r + \gamma \max_{a'} Q(s', a'; \theta^-) - Q(s, a; \theta) \right)^2 \right]

여기서:

  • θ\theta: 현재 네트워크의 파라미터
  • θ\theta^-: 타겟 네트워크의 파라미터
  • D\mathcal{D}: 리플레이 메모리

DQN 구현

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import random
import numpy as np

class DQN(nn.Module):
    """
    Deep Q-Network 아키텍처
    """
    def __init__(self, state_size, action_size, hidden_sizes=[128, 128]):
        super(DQN, self).__init__()

        # 입력층
        self.fc1 = nn.Linear(state_size, hidden_sizes[0])

        # 은닉층들
        self.hidden_layers = nn.ModuleList()
        for i in range(len(hidden_sizes) - 1):
            self.hidden_layers.append(
                nn.Linear(hidden_sizes[i], hidden_sizes[i+1])
            )

        # 출력층 (각 행동의 Q-값)
        self.fc_out = nn.Linear(hidden_sizes[-1], action_size)

    def forward(self, x):
        # 입력층
        x = F.relu(self.fc1(x))

        # 은닉층들
        for layer in self.hidden_layers:
            x = F.relu(layer(x))

        # 출력층 (활성화 함수 없음)
        return self.fc_out(x)

class ReplayMemory:
    """
    Experience Replay를 위한 메모리 버퍼
    """
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        """경험 저장"""
        self.memory.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        """무작위 샘플링"""
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

class DQNAgent:
    """
    DQN 에이전트
    """
    def __init__(self, state_size, action_size, learning_rate=0.001,
                 discount_factor=0.99, epsilon=1.0, epsilon_decay=0.995,
                 epsilon_min=0.01, memory_size=10000, batch_size=64,
                 target_update_freq=10):

        self.state_size = state_size
        self.action_size = action_size
        self.discount_factor = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.batch_size = batch_size
        self.target_update_freq = target_update_freq

        # 메모리
        self.memory = ReplayMemory(memory_size)

        # 네트워크
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.policy_net = DQN(state_size, action_size).to(self.device)
        self.target_net = DQN(state_size, action_size).to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()

        # 옵티마이저
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)

        # 학습 카운터
        self.update_count = 0

    def get_action(self, state):
        """
        Epsilon-greedy 정책으로 행동 선택
        """
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.action_size)

        with torch.no_grad():
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            q_values = self.policy_net(state_tensor)
            return q_values.argmax().item()

    def remember(self, state, action, reward, next_state, done):
        """
        경험을 메모리에 저장
        """
        self.memory.push(state, action, reward, next_state, done)

    def replay(self):
        """
        Experience Replay로 학습
        """
        if len(self.memory) < self.batch_size:
            return

        # 미니배치 샘플링
        batch = self.memory.sample(self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        # 텐서로 변환
        states = torch.FloatTensor(np.array(states)).to(self.device)
        actions = torch.LongTensor(actions).unsqueeze(1).to(self.device)
        rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.device)
        next_states = torch.FloatTensor(np.array(next_states)).to(self.device)
        dones = torch.FloatTensor(dones).unsqueeze(1).to(self.device)

        # 현재 Q-값 계산
        current_q_values = self.policy_net(states).gather(1, actions)

        # 타겟 Q-값 계산
        with torch.no_grad():
            next_q_values = self.target_net(next_states).max(1)[0].unsqueeze(1)
            target_q_values = rewards + (1 - dones) * self.discount_factor * next_q_values

        # 손실 계산 및 역전파
        loss = F.mse_loss(current_q_values, target_q_values)
        self.optimizer.zero_grad()
        loss.backward()

        # 그래디언트 클리핑 (학습 안정화)
        torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), 1.0)

        self.optimizer.step()

        # 타겟 네트워크 업데이트
        self.update_count += 1
        if self.update_count % self.target_update_freq == 0:
            self.target_net.load_state_dict(self.policy_net.state_dict())

        return loss.item()

    def decay_epsilon(self):
        """
        Epsilon 값 감소
        """
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

# CartPole 환경에서 DQN 학습
def train_dqn(env_name='CartPole-v1', num_episodes=500):
    """
    DQN으로 에이전트 학습
    """
    env = gym.make(env_name)
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n

    agent = DQNAgent(state_size, action_size)

    episode_rewards = []

    for episode in range(num_episodes):
        state, _ = env.reset()
        total_reward = 0
        done = False

        while not done:
            # 행동 선택
            action = agent.get_action(state)

            # 환경과 상호작용
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # 경험 저장
            agent.remember(state, action, reward, next_state, done)

            # 학습
            agent.replay()

            state = next_state
            total_reward += reward

        # Epsilon 감소
        agent.decay_epsilon()

        episode_rewards.append(total_reward)

        # 진행상황 출력
        if (episode + 1) % 50 == 0:
            avg_reward = np.mean(episode_rewards[-50:])
            print(f"Episode {episode + 1}/{num_episodes}, "
                  f"Avg Reward: {avg_reward:.2f}, "
                  f"Epsilon: {agent.epsilon:.3f}")

    env.close()
    return agent, episode_rewards

# 학습 실행
# dqn_agent, dqn_rewards = train_dqn()

DQN의 발전: Double DQN, Dueling DQN

Double DQN

기존 DQN의 과대평가(overestimation) 문제를 해결:

Q(s,a;θ)r+γQ(s,argmaxaQ(s,a;θ);θ)Q(s, a; \theta) \leftarrow r + \gamma Q(s', \arg\max_{a'} Q(s', a'; \theta); \theta^-)

Dueling DQN

가치 함수를 상태 가치와 이점 함수로 분리:

Q(s,a;θ)=V(s;θv)+A(s,a;θa)1AaA(s,a;θa)Q(s, a; \theta) = V(s; \theta_v) + A(s, a; \theta_a) - \frac{1}{|A|} \sum_{a'} A(s, a'; \theta_a)

Policy Gradient: 정책을 직접 학습하기

정책 그래디언트의 이론

정책 그래디언트 방법은 정책을 직접 파라미터화하고 최적화합니다. 목표는 기대 누적 보상을 최대화하는 것입니다:

J(θ)=Eτπθ[t=0Tγtrt]J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta} \left[ \sum_{t=0}^{T} \gamma^t r_t \right]

REINFORCE 알고리즘

정책 그래디언트의 기본 형태:

θJ(θ)=Eτπθ[t=0Tθlogπθ(atst)Gt]\nabla_\theta J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta} \left[ \sum_{t=0}^{T} \nabla_\theta \log \pi_\theta(a_t|s_t) G_t \right]

여기서 Gt=k=tTγktrkG_t = \sum_{k=t}^{T} \gamma^{k-t} r_k는 시간 tt부터의 누적 보상입니다.

베이스라인을 사용한 분산 감소

분산을 줄이기 위해 베이스라인 b(s)b(s)를 사용:

θJ(θ)=Eτπθ[t=0Tθlogπθ(atst)(Gtb(st))]\nabla_\theta J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta} \left[ \sum_{t=0}^{T} \nabla_\theta \log \pi_\theta(a_t|s_t) (G_t - b(s_t)) \right]

REINFORCE 구현

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categorical
import gym
import numpy as np

class PolicyNetwork(nn.Module):
    """
    정책 네트워크: 상태를 입력받아 행동 확률 분포 출력
    """
    def __init__(self, state_size, action_size, hidden_size=128):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, action_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        # Softmax를 통해 확률 분포 생성
        action_probs = F.softmax(self.fc3(x), dim=-1)
        return action_probs

class REINFORCEAgent:
    """
    REINFORCE 알고리즘 에이전트
    """
    def __init__(self, state_size, action_size, learning_rate=0.001,
                 discount_factor=0.99):
        self.state_size = state_size
        self.action_size = action_size
        self.discount_factor = discount_factor

        # 정책 네트워크
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.policy = PolicyNetwork(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=learning_rate)

        # 에피소드 메모리
        self.states = []
        self.actions = []
        self.rewards = []
        self.log_probs = []

    def get_action(self, state):
        """
        정책 네트워크를 사용하여 행동 샘플링
        """
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        action_probs = self.policy(state_tensor)

        # 확률 분포에서 행동 샘플링
        dist = Categorical(action_probs)
        action = dist.sample()

        # 로그 확률 저장 (그래디언트 계산에 필요)
        self.log_probs.append(dist.log_prob(action))

        return action.item()

    def remember(self, state, action, reward):
        """
        경험 저장
        """
        self.states.append(state)
        self.actions.append(action)
        self.rewards.append(reward)

    def calculate_returns(self):
        """
        각 시간 단계에서의 누적 보상 계산 (G_t)
        """
        returns = []
        G = 0

        # 역순으로 계산
        for reward in reversed(self.rewards):
            G = reward + self.discount_factor * G
            returns.insert(0, G)

        # 정규화 (학습 안정화)
        returns = torch.FloatTensor(returns).to(self.device)
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)

        return returns

    def update(self):
        """
        정책 네트워크 업데이트
        """
        # 누적 보상 계산
        returns = self.calculate_returns()

        # 정책 그래디언트 계산
        policy_loss = []
        for log_prob, G in zip(self.log_probs, returns):
            # -log_prob * G (음수는 경사 상승을 위함)
            policy_loss.append(-log_prob * G)

        # 손실 계산 및 역전파
        policy_loss = torch.stack(policy_loss).sum()

        self.optimizer.zero_grad()
        policy_loss.backward()
        self.optimizer.step()

        # 메모리 초기화
        self.states = []
        self.actions = []
        self.rewards = []
        self.log_probs = []

        return policy_loss.item()

def train_reinforce(env_name='CartPole-v1', num_episodes=1000):
    """
    REINFORCE 알고리즘으로 학습
    """
    env = gym.make(env_name)
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n

    agent = REINFORCEAgent(state_size, action_size)

    episode_rewards = []

    for episode in range(num_episodes):
        state, _ = env.reset()
        total_reward = 0
        done = False

        # 에피소드 실행
        while not done:
            action = agent.get_action(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            agent.remember(state, action, reward)

            state = next_state
            total_reward += reward

        # 에피소드 종료 후 정책 업데이트
        loss = agent.update()

        episode_rewards.append(total_reward)

        # 진행상황 출력
        if (episode + 1) % 50 == 0:
            avg_reward = np.mean(episode_rewards[-50:])
            print(f"Episode {episode + 1}/{num_episodes}, "
                  f"Avg Reward: {avg_reward:.2f}, "
                  f"Loss: {loss:.4f}")

    env.close()
    return agent, episode_rewards

# 학습 실행
# reinforce_agent, reinforce_rewards = train_reinforce()

Actor-Critic: 두 세계의 장점을 결합

Actor-Critic의 개념

Actor-Critic은 정책 기반(Policy-based)과 가치 기반(Value-based) 방법의 장점을 결합한 알고리즘입니다:

  • Actor: 정책 πθ(as)\pi_\theta(a|s)를 학습
  • Critic: 가치 함수 Vϕ(s)V_\phi(s) 또는 Qϕ(s,a)Q_\phi(s,a)를 학습

Advantage Function

이점 함수(Advantage Function)는 특정 행동이 평균보다 얼마나 좋은지를 나타냅니다:

A(s,a)=Q(s,a)V(s)A(s, a) = Q(s, a) - V(s)

이를 사용한 정책 그래디언트:

θJ(θ)=E[θlogπθ(as)A(s,a)]\nabla_\theta J(\theta) = \mathbb{E} \left[ \nabla_\theta \log \pi_\theta(a|s) A(s, a) \right]

A2C (Advantage Actor-Critic) 구현

class ActorCriticNetwork(nn.Module):
    """
    Actor-Critic 네트워크: Actor와 Critic이 일부 층을 공유
    """
    def __init__(self, state_size, action_size, hidden_size=128):
        super(ActorCriticNetwork, self).__init__()

        # 공유 층
        self.shared_fc1 = nn.Linear(state_size, hidden_size)
        self.shared_fc2 = nn.Linear(hidden_size, hidden_size)

        # Actor 헤드 (정책)
        self.actor_fc = nn.Linear(hidden_size, action_size)

        # Critic 헤드 (가치 함수)
        self.critic_fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        # 공유 특징 추출
        x = F.relu(self.shared_fc1(x))
        x = F.relu(self.shared_fc2(x))

        # Actor: 행동 확률 분포
        action_probs = F.softmax(self.actor_fc(x), dim=-1)

        # Critic: 상태 가치
        state_value = self.critic_fc(x)

        return action_probs, state_value

class A2CAgent:
    """
    Advantage Actor-Critic (A2C) 에이전트
    """
    def __init__(self, state_size, action_size, learning_rate=0.001,
                 discount_factor=0.99, value_coef=0.5, entropy_coef=0.01):
        self.state_size = state_size
        self.action_size = action_size
        self.discount_factor = discount_factor
        self.value_coef = value_coef  # 가치 손실의 가중치
        self.entropy_coef = entropy_coef  # 엔트로피 보너스의 가중치

        # 네트워크
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = ActorCriticNetwork(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)

        # 메모리
        self.states = []
        self.actions = []
        self.rewards = []
        self.values = []
        self.log_probs = []
        self.entropies = []

    def get_action(self, state):
        """
        정책에 따라 행동 선택
        """
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        action_probs, state_value = self.model(state_tensor)

        # 행동 샘플링
        dist = Categorical(action_probs)
        action = dist.sample()

        # 저장 (학습에 필요)
        self.values.append(state_value)
        self.log_probs.append(dist.log_prob(action))
        self.entropies.append(dist.entropy())

        return action.item()

    def remember(self, state, action, reward):
        """
        경험 저장
        """
        self.states.append(state)
        self.actions.append(action)
        self.rewards.append(reward)

    def calculate_returns_and_advantages(self):
        """
        Returns와 Advantages 계산
        """
        returns = []
        advantages = []
        G = 0

        # Returns 계산
        for reward in reversed(self.rewards):
            G = reward + self.discount_factor * G
            returns.insert(0, G)

        returns = torch.FloatTensor(returns).to(self.device)
        values = torch.cat(self.values).squeeze()

        # Advantages 계산: A(s,a) = G - V(s)
        advantages = returns - values.detach()

        return returns, advantages

    def update(self):
        """
        Actor-Critic 업데이트
        """
        returns, advantages = self.calculate_returns_and_advantages()

        # 손실 계산
        policy_loss = []
        value_loss = []

        for log_prob, advantage, entropy, value, ret in zip(
            self.log_probs, advantages, self.entropies, self.values, returns
        ):
            # Actor 손실: -log_prob * advantage
            policy_loss.append(-log_prob * advantage)

            # Critic 손실: MSE between predicted value and return
            value_loss.append(F.mse_loss(value.squeeze(), ret.unsqueeze(0)))

        # 전체 손실
        policy_loss = torch.stack(policy_loss).sum()
        value_loss = torch.stack(value_loss).sum()
        entropy_loss = -torch.stack(self.entropies).sum()  # 음수는 엔트로피 최대화를 위함

        total_loss = policy_loss + self.value_coef * value_loss + self.entropy_coef * entropy_loss

        # 역전파
        self.optimizer.zero_grad()
        total_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), 0.5)
        self.optimizer.step()

        # 메모리 초기화
        self.states = []
        self.actions = []
        self.rewards = []
        self.values = []
        self.log_probs = []
        self.entropies = []

        return total_loss.item()

def train_a2c(env_name='CartPole-v1', num_episodes=500):
    """
    A2C로 학습
    """
    env = gym.make(env_name)
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n

    agent = A2CAgent(state_size, action_size)

    episode_rewards = []

    for episode in range(num_episodes):
        state, _ = env.reset()
        total_reward = 0
        done = False

        while not done:
            action = agent.get_action(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            agent.remember(state, action, reward)

            state = next_state
            total_reward += reward

        # 에피소드 종료 후 업데이트
        loss = agent.update()

        episode_rewards.append(total_reward)

        if (episode + 1) % 50 == 0:
            avg_reward = np.mean(episode_rewards[-50:])
            print(f"Episode {episode + 1}/{num_episodes}, "
                  f"Avg Reward: {avg_reward:.2f}, "
                  f"Loss: {loss:.4f}")

    env.close()
    return agent, episode_rewards

# 학습 실행
# a2c_agent, a2c_rewards = train_a2c()

고급 주제 및 최신 기법

PPO (Proximal Policy Optimization)

PPO는 정책 업데이트를 안정화하기 위해 클리핑을 사용합니다:

LCLIP(θ)=Et[min(rt(θ)A^t,clip(rt(θ),1ϵ,1+ϵ)A^t)]L^{CLIP}(\theta) = \mathbb{E}_t \left[ \min\left(r_t(\theta)\hat{A}_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon)\hat{A}_t\right) \right]

여기서 rt(θ)=πθ(atst)πθold(atst)r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)}는 중요도 비율입니다.

SAC (Soft Actor-Critic)

엔트로피를 보상에 포함하여 탐험을 장려:

J(π)=t=0TE(st,at)ρπ[r(st,at)+αH(π(st))]J(\pi) = \sum_{t=0}^{T} \mathbb{E}_{(s_t, a_t) \sim \rho_\pi} \left[ r(s_t, a_t) + \alpha H(\pi(\cdot|s_t)) \right]

Multi-Agent Reinforcement Learning

여러 에이전트가 동시에 학습하는 환경:

class MultiAgentEnvironment:
    """
    다중 에이전트 환경 예제
    """
    def __init__(self, num_agents):
        self.num_agents = num_agents
        self.agents = [A2CAgent(state_size, action_size)
                       for _ in range(num_agents)]

    def step(self, actions):
        """
        모든 에이전트의 행동을 동시에 실행
        """
        # 환경 업데이트 로직
        pass

    def train(self, num_episodes):
        """
        다중 에이전트 학습
        """
        for episode in range(num_episodes):
            # 각 에이전트가 독립적으로 또는 협력하여 학습
            pass

실전 응용 및 프로젝트

OpenAI Gym을 활용한 완전한 학습 파이프라인

import gym
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Tuple

class RLTrainer:
    """
    범용 강화학습 트레이너
    """
    def __init__(self, env_name: str, agent_class, agent_kwargs: dict):
        self.env = gym.make(env_name)
        self.agent = agent_class(
            state_size=self.env.observation_space.shape[0],
            action_size=self.env.action_space.n,
            **agent_kwargs
        )

        self.episode_rewards = []
        self.episode_lengths = []
        self.losses = []

    def train(self, num_episodes: int, render_interval: int = None,
              save_interval: int = None, target_reward: float = None):
        """
        학습 실행
        """
        best_avg_reward = -float('inf')

        for episode in range(num_episodes):
            state, _ = self.env.reset()
            total_reward = 0
            steps = 0
            done = False

            while not done:
                # 렌더링
                if render_interval and episode % render_interval == 0:
                    self.env.render()

                # 행동 선택 및 실행
                action = self.agent.get_action(state)
                next_state, reward, terminated, truncated, _ = self.env.step(action)
                done = terminated or truncated

                # 학습 (알고리즘에 따라 다름)
                if hasattr(self.agent, 'remember'):
                    self.agent.remember(state, action, reward)
                if hasattr(self.agent, 'replay'):
                    loss = self.agent.replay()
                    if loss:
                        self.losses.append(loss)

                state = next_state
                total_reward += reward
                steps += 1

            # 에피소드 종료 후 처리
            if hasattr(self.agent, 'update'):
                loss = self.agent.update()
                if loss:
                    self.losses.append(loss)

            if hasattr(self.agent, 'decay_epsilon'):
                self.agent.decay_epsilon()

            # 통계 기록
            self.episode_rewards.append(total_reward)
            self.episode_lengths.append(steps)

            # 진행상황 출력
            if (episode + 1) % 10 == 0:
                avg_reward = np.mean(self.episode_rewards[-100:])
                avg_length = np.mean(self.episode_lengths[-100:])

                print(f"Episode {episode + 1}/{num_episodes}")
                print(f"  Avg Reward (100 ep): {avg_reward:.2f}")
                print(f"  Avg Length (100 ep): {avg_length:.2f}")
                print(f"  Current Reward: {total_reward:.2f}")

                # 모델 저장
                if save_interval and (episode + 1) % save_interval == 0:
                    if avg_reward > best_avg_reward:
                        best_avg_reward = avg_reward
                        self.save_model(f"best_model_ep{episode+1}.pth")

                # 목표 달성 확인
                if target_reward and avg_reward >= target_reward:
                    print(f"\nTarget reward {target_reward} reached!")
                    break

        self.env.close()
        return self.episode_rewards, self.episode_lengths

    def save_model(self, filename: str):
        """
        모델 저장
        """
        if hasattr(self.agent, 'policy_net'):
            torch.save(self.agent.policy_net.state_dict(), filename)
        elif hasattr(self.agent, 'policy'):
            torch.save(self.agent.policy.state_dict(), filename)
        elif hasattr(self.agent, 'model'):
            torch.save(self.agent.model.state_dict(), filename)

    def plot_results(self):
        """
        학습 결과 시각화
        """
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))

        # 에피소드별 보상
        axes[0, 0].plot(self.episode_rewards, alpha=0.3, label='Episode Reward')
        axes[0, 0].plot(self._moving_average(self.episode_rewards, 100),
                       label='100-ep Moving Avg')
        axes[0, 0].set_xlabel('Episode')
        axes[0, 0].set_ylabel('Reward')
        axes[0, 0].set_title('Training Rewards')
        axes[0, 0].legend()
        axes[0, 0].grid(True)

        # 에피소드 길이
        axes[0, 1].plot(self.episode_lengths, alpha=0.3, label='Episode Length')
        axes[0, 1].plot(self._moving_average(self.episode_lengths, 100),
                       label='100-ep Moving Avg')
        axes[0, 1].set_xlabel('Episode')
        axes[0, 1].set_ylabel('Steps')
        axes[0, 1].set_title('Episode Lengths')
        axes[0, 1].legend()
        axes[0, 1].grid(True)

        # 손실
        if self.losses:
            axes[1, 0].plot(self.losses, alpha=0.5)
            axes[1, 0].set_xlabel('Update Step')
            axes[1, 0].set_ylabel('Loss')
            axes[1, 0].set_title('Training Loss')
            axes[1, 0].grid(True)

        # 보상 분포
        axes[1, 1].hist(self.episode_rewards, bins=50, edgecolor='black')
        axes[1, 1].set_xlabel('Reward')
        axes[1, 1].set_ylabel('Frequency')
        axes[1, 1].set_title('Reward Distribution')
        axes[1, 1].grid(True)

        plt.tight_layout()
        plt.savefig('training_results.png', dpi=300, bbox_inches='tight')
        plt.show()

    @staticmethod
    def _moving_average(data: List[float], window: int) -> List[float]:
        """
        이동 평균 계산
        """
        if len(data) < window:
            return data

        cumsum = np.cumsum(data)
        cumsum[window:] = cumsum[window:] - cumsum[:-window]
        return cumsum[window - 1:] / window

# 사용 예제
"""
# DQN으로 CartPole 학습
trainer = RLTrainer(
    env_name='CartPole-v1',
    agent_class=DQNAgent,
    agent_kwargs={
        'learning_rate': 0.001,
        'discount_factor': 0.99,
        'epsilon': 1.0,
        'epsilon_decay': 0.995,
        'epsilon_min': 0.01
    }
)

rewards, lengths = trainer.train(
    num_episodes=500,
    save_interval=100,
    target_reward=195.0
)

trainer.plot_results()
"""

커스텀 환경 만들기

import gym
from gym import spaces

class CustomGridWorld(gym.Env):
    """
    커스텀 그리드 월드 환경
    """
    def __init__(self, grid_size=5):
        super(CustomGridWorld, self).__init__()

        self.grid_size = grid_size
        self.state = None
        self.goal = (grid_size - 1, grid_size - 1)

        # 행동 공간: 상, 하, 좌, 우
        self.action_space = spaces.Discrete(4)

        # 상태 공간: (x, y) 좌표
        self.observation_space = spaces.Box(
            low=0, high=grid_size-1,
            shape=(2,), dtype=np.int32
        )

    def reset(self):
        """
        환경 초기화
        """
        self.state = np.array([0, 0])
        return self.state, {}

    def step(self, action):
        """
        행동 실행
        """
        # 행동에 따라 상태 변경
        if action == 0:  # 상
            self.state[1] = max(0, self.state[1] - 1)
        elif action == 1:  # 하
            self.state[1] = min(self.grid_size - 1, self.state[1] + 1)
        elif action == 2:  # 좌
            self.state[0] = max(0, self.state[0] - 1)
        elif action == 3:  # 우
            self.state[0] = min(self.grid_size - 1, self.state[0] + 1)

        # 보상 계산
        if tuple(self.state) == self.goal:
            reward = 10.0
            done = True
        else:
            reward = -0.1  # 각 단계마다 작은 패널티
            done = False

        return self.state, reward, done, False, {}

    def render(self, mode='human'):
        """
        환경 시각화
        """
        grid = np.zeros((self.grid_size, self.grid_size))
        grid[tuple(self.state)] = 1
        grid[self.goal] = 2
        print(grid)

# 커스텀 환경 사용
"""
env = CustomGridWorld(grid_size=5)
state, _ = env.reset()
print(f"Initial state: {state}")

for _ in range(10):
    action = env.action_space.sample()
    state, reward, done, _, _ = env.step(action)
    print(f"Action: {action}, State: {state}, Reward: {reward}")
    env.render()
    if done:
        break
"""

결론 및 향후 방향

강화학습의 현재와 미래

강화학습은 인공지능의 중요한 한 분야로, 에이전트가 환경과 상호작용하면서 스스로 학습하는 과정을 거칩니다. 본 가이드에서는 다음 내용을 다루었습니다:

  1. 역사적 발전: 1950년대 행동주의 심리학부터 현대의 대규모 언어 모델까지
  2. 수학적 기초: MDP, 벨만 방정식, 가치 함수의 이론적 배경
  3. 주요 알고리즘들:
    • Q-Learning: 테이블 기반 가치 학습
    • DQN: 딥러닝을 활용한 고차원 상태 공간 처리
    • Policy Gradient (REINFORCE): 정책 직접 최적화
    • Actor-Critic (A2C): 두 접근법의 결합
  4. 실전 구현: OpenAI Gym을 활용한 완전한 학습 파이프라인

주요 도전 과제

  1. 샘플 효율성: 강화학습은 많은 샘플이 필요하며, 이는 실제 환경에서 비용이 클 수 있습니다.
  2. 안정성: 학습 과정이 불안정할 수 있으며, 하이퍼파라미터에 민감합니다.
  3. 일반화: 특정 환경에서 학습한 정책이 다른 환경으로 전이되기 어렵습니다.
  4. 안전성: 실제 시스템에서 탐험 과정이 위험할 수 있습니다.
  5. 보상 설계: 복잡한 작업에 대한 적절한 보상 함수를 설계하기 어렵습니다.

최신 연구 방향

  1. 오프라인 강화학습(Offline RL): 기존 데이터셋만으로 학습
  2. 메타 강화학습(Meta-RL): 빠른 적응과 전이 학습
  3. 계층적 강화학습(Hierarchical RL): 복잡한 작업을 하위 작업으로 분해
  4. 모델 기반 강화학습(Model-based RL): 환경 모델을 학습하여 샘플 효율성 향상
  5. 인간 피드백 강화학습(RLHF): 인간의 선호도를 활용한 정렬

추천 학습 자료

서적

  • Sutton, R. S., & Barto, A. G. (2018). "Reinforcement Learning: An Introduction" (2nd Edition)
  • Graesser, L., & Keng, W. L. (2019). "Foundations of Deep Reinforcement Learning"
  • Lapan, M. (2020). "Deep Reinforcement Learning Hands-On" (2nd Edition)

온라인 강좌

  • David Silver's RL Course (UCL/DeepMind)
  • Stanford CS234: Reinforcement Learning
  • Berkeley CS285: Deep Reinforcement Learning

리소스 및 라이브러리

연구 논문 및 블로그

실전 프로젝트 아이디어

  1. 게임 AI: Atari 게임, 체스, 바둑 등
  2. 로봇 제어: 시뮬레이션 환경에서 로봇 팔 제어
  3. 자율 주행: CARLA 시뮬레이터 활용
  4. 추천 시스템: 사용자 선호도 학습
  5. 금융 거래: 주식 거래 전략 학습
  6. 자원 관리: 데이터센터 냉각 최적화

강화학습은 계속 발전하는 분야이며, 새로운 알고리즘과 응용이 끊임없이 등장하고 있습니다. 이 가이드가 강화학습의 세계로 들어가는 여러분에게 든든한 길잡이가 되기를 바랍니다. 지속적인 학습과 실험을 통해 더 깊은 이해를 얻으시기를 응원합니다!