강화학습 완벽 가이드: 이론부터 실전까지
도입부
강화학습(Reinforcement Learning)은 인공지능(AI) 분야에서 기계가 스스로 학습하고 결정할 수 있는 능력을 부여하는 중요한 기술입니다. 이 방법론은 로봇 제어, 게임 플레이, 자율 주행 자동차, 추천 시스템, 금융 거래 등 다양한 분야에서 혁신적인 결과를 보여주고 있습니다.
강화학습은 에이전트(Agent)가 환경(Environment)과 상호작용하면서 최적의 정책(Policy)을 학습하는 과정입니다. 여기서 에이전트는 주어진 환경에서 행동을 선택하여 장기적인 보상을 최대화하려고 노력합니다. 이 과정은 주로 탐험(Exploration)과 활용(Exploitation)의 균형을 맞추는 데 중점을 둡니다.
본 가이드에서는 강화학습의 역사와 발전 과정, 수학적 기초, 주요 알고리즘들, 그리고 실제 구현까지 심층적으로 다룹니다.
강화학습의 역사와 주요 마일스톤
초기 발전 (1950s-1980s)
강화학습의 뿌리는 심리학의 행동주의(Behaviorism)와 동적 프로그래밍(Dynamic Programming)에서 찾을 수 있습니다.
- 1950년대: 심리학자 B.F. Skinner의 조작적 조건화(Operant Conditioning) 이론이 강화학습의 기초를 제공했습니다.
- 1957년: Richard Bellman이 동적 프로그래밍을 제안하며 벨만 방정식(Bellman Equation)을 소개했습니다.
- 1972년: Klopf가 강화학습의 초기 형태를 제안했습니다.
- 1989년: Chris Watkins가 Q-Learning 알고리즘을 개발하여 강화학습의 중요한 전환점을 마련했습니다.
현대 강화학습의 부상 (1990s-2010s)
- 1992년: Gerald Tesauro가 TD-Gammon을 개발하여 백개먼 게임에서 인간 전문가 수준의 성능을 달성했습니다.
- 1998년: Richard Sutton과 Andrew Barto의 "Reinforcement Learning: An Introduction" 출판으로 이론적 기반이 확립되었습니다.
- 2013년: DeepMind가 DQN(Deep Q-Network)을 개발하여 Atari 게임에서 인간 수준의 성능을 달성했습니다.
- 2015년: AlphaGo의 등장으로 바둑에서 인간 챔피언을 이겼습니다.
- 2016년: OpenAI가 PPO(Proximal Policy Optimization)를 발표하며 정책 기반 방법의 안정성을 크게 향상시켰습니다.
최근의 혁신 (2020s-)
- 2020년: AlphaFold가 단백질 구조 예측 문제를 해결하며 생물학 분야에 혁명을 일으켰습니다.
- 2021-2023년: ChatGPT와 같은 대규모 언어 모델에서 RLHF(Reinforcement Learning from Human Feedback)가 핵심 기술로 자리잡았습니다.
- 2024-현재: 멀티모달 AI, 로봇 공학, 자율 주행 등 다양한 분야에서 강화학습이 핵심 기술로 활용되고 있습니다.
강화학습의 수학적 기초
MDP (Markov Decision Process)
강화학습의 수학적 기반은 마르코프 결정 과정(MDP)입니다. MDP는 다음과 같은 튜플로 정의됩니다:
여기서:
- : 상태 공간(State Space) - 모든 가능한 상태들의 집합
- : 행동 공간(Action Space) - 모든 가능한 행동들의 집합
- : 상태 전이 확률(Transition Probability) - 는 상태 에서 행동 를 취했을 때 상태 로 전이할 확률
- : 보상 함수(Reward Function) - 는 상태 에서 행동 를 취하고 로 전이했을 때 받는 보상
- : 할인율(Discount Factor) - , 미래 보상의 현재 가치
마르코프 속성 (Markov Property)
MDP의 핵심 가정은 마르코프 속성입니다:
이는 미래 상태가 현재 상태와 행동에만 의존하며, 과거 이력에는 독립적임을 의미합니다.
정책 (Policy)
정책 는 각 상태에서 어떤 행동을 선택할지 결정하는 전략입니다:
- 결정론적 정책(Deterministic Policy):
- 확률적 정책(Stochastic Policy):
가치 함수 (Value Function)
가치 함수는 특정 상태나 상태-행동 쌍의 가치를 평가합니다.
상태 가치 함수 (State Value Function)
상태 에서 정책 를 따를 때의 기대 누적 보상:
행동 가치 함수 (Action Value Function)
상태 에서 행동 를 취하고 이후 정책 를 따를 때의 기대 누적 보상:
벨만 방정식 (Bellman Equation)
벨만 방정식은 가치 함수의 재귀적 관계를 정의합니다.
벨만 기대 방정식 (Bellman Expectation Equation)
벨만 최적 방정식 (Bellman Optimality Equation)
최적 정책은 다음과 같이 구할 수 있습니다:
Q-Learning: 가치 기반 강화학습의 핵심
Q-Learning 알고리즘의 이론
Q-Learning은 model-free 오프-정책(off-policy) 알고리즘으로, 환경의 dynamics를 모르는 상태에서도 최적 행동 가치 함수 를 학습할 수 있습니다.
Q-Learning 업데이트 규칙
여기서:
- : 학습률(Learning Rate, )
- : 할인율(Discount Factor, )
- : 즉각적인 보상
- : 다음 상태에서의 최대 Q-값
Temporal Difference (TD) Error
TD 오차는 예측과 실제의 차이를 나타냅니다:
Q-Learning 구현
import gym
import numpy as np
import matplotlib.pyplot as plt
class QLearningAgent:
def __init__(self, state_size, action_size, learning_rate=0.1,
discount_factor=0.99, epsilon=1.0, epsilon_decay=0.995,
epsilon_min=0.01):
"""
Q-Learning 에이전트 초기화
Parameters:
-----------
state_size : int
상태 공간의 크기
action_size : int
행동 공간의 크기
learning_rate : float
학습률 (alpha)
discount_factor : float
할인율 (gamma)
epsilon : float
초기 탐험 확률
epsilon_decay : float
에피소드마다 epsilon 감소율
epsilon_min : float
최소 epsilon 값
"""
self.state_size = state_size
self.action_size = action_size
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min
# Q-테이블 초기화 (모든 값을 0으로)
self.q_table = np.zeros([state_size, action_size])
def get_action(self, state):
"""
epsilon-greedy 정책을 사용하여 행동 선택
"""
# 탐험(Exploration): 무작위 행동 선택
if np.random.rand() < self.epsilon:
return np.random.randint(self.action_size)
# 활용(Exploitation): Q-값이 가장 큰 행동 선택
else:
return np.argmax(self.q_table[state])
def learn(self, state, action, reward, next_state, done):
"""
Q-Learning 업데이트 규칙 적용
"""
# 현재 Q-값
current_q = self.q_table[state, action]
# 다음 상태의 최대 Q-값 (에피소드가 끝났다면 0)
if done:
max_next_q = 0
else:
max_next_q = np.max(self.q_table[next_state])
# TD 타겟 계산
td_target = reward + self.discount_factor * max_next_q
# TD 오차 계산
td_error = td_target - current_q
# Q-값 업데이트
self.q_table[state, action] += self.learning_rate * td_error
def decay_epsilon(self):
"""
epsilon 값 감소
"""
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
# 환경 설정 및 학습
def train_q_learning(env_name='FrozenLake-v1', num_episodes=5000, render_interval=1000):
"""
Q-Learning 알고리즘으로 에이전트 학습
"""
env = gym.make(env_name, is_slippery=False)
agent = QLearningAgent(
state_size=env.observation_space.n,
action_size=env.action_space.n
)
# 학습 통계
episode_rewards = []
success_rate_history = []
for episode in range(num_episodes):
state, _ = env.reset()
total_reward = 0
done = False
while not done:
# 행동 선택
action = agent.get_action(state)
# 환경과 상호작용
next_state, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
# Q-값 업데이트
agent.learn(state, action, reward, next_state, done)
state = next_state
total_reward += reward
# epsilon 감소
agent.decay_epsilon()
# 통계 기록
episode_rewards.append(total_reward)
# 최근 100 에피소드의 성공률 계산
if episode >= 100:
recent_success_rate = np.mean(episode_rewards[-100:])
success_rate_history.append(recent_success_rate)
# 진행상황 출력
if (episode + 1) % render_interval == 0:
avg_reward = np.mean(episode_rewards[-100:])
print(f"Episode {episode + 1}/{num_episodes}, "
f"Avg Reward: {avg_reward:.2f}, "
f"Epsilon: {agent.epsilon:.3f}")
env.close()
return agent, episode_rewards, success_rate_history
# 학습 실행
agent, rewards, success_rates = train_q_learning()
print("\n학습된 Q-테이블:")
print(agent.q_table)
탐험 전략 (Exploration Strategies)
1. Epsilon-Greedy
가장 일반적인 전략으로, 확률 로 무작위 행동을 선택합니다:
def epsilon_greedy(q_values, epsilon):
if np.random.rand() < epsilon:
return np.random.randint(len(q_values))
return np.argmax(q_values)
2. Boltzmann Exploration (Softmax)
온도 매개변수를 사용한 확률적 선택:
def boltzmann_exploration(q_values, temperature=1.0):
exp_q = np.exp(q_values / temperature)
probs = exp_q / np.sum(exp_q)
return np.random.choice(len(q_values), p=probs)
3. Upper Confidence Bound (UCB)
불확실성을 고려한 탐험:
def ucb_action_selection(q_values, action_counts, total_count, c=2.0):
ucb_values = q_values + c * np.sqrt(np.log(total_count + 1) / (action_counts + 1e-5))
return np.argmax(ucb_values)
DQN (Deep Q-Network): 딥러닝과의 만남
DQN의 혁신
DQN은 2013년 DeepMind가 제안한 알고리즘으로, 신경망을 사용하여 Q-함수를 근사합니다. 이를 통해 고차원 상태 공간(예: 이미지)에서도 강화학습이 가능해졌습니다.
DQN의 핵심 기법
- Experience Replay: 과거 경험을 메모리에 저장하고 무작위로 샘플링하여 학습
- Target Network: 안정적인 학습을 위해 별도의 타겟 네트워크 사용
- Reward Clipping: 보상을 일정 범위로 제한하여 학습 안정화
DQN의 손실 함수
여기서:
- : 현재 네트워크의 파라미터
- : 타겟 네트워크의 파라미터
- : 리플레이 메모리
DQN 구현
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import random
import numpy as np
class DQN(nn.Module):
"""
Deep Q-Network 아키텍처
"""
def __init__(self, state_size, action_size, hidden_sizes=[128, 128]):
super(DQN, self).__init__()
# 입력층
self.fc1 = nn.Linear(state_size, hidden_sizes[0])
# 은닉층들
self.hidden_layers = nn.ModuleList()
for i in range(len(hidden_sizes) - 1):
self.hidden_layers.append(
nn.Linear(hidden_sizes[i], hidden_sizes[i+1])
)
# 출력층 (각 행동의 Q-값)
self.fc_out = nn.Linear(hidden_sizes[-1], action_size)
def forward(self, x):
# 입력층
x = F.relu(self.fc1(x))
# 은닉층들
for layer in self.hidden_layers:
x = F.relu(layer(x))
# 출력층 (활성화 함수 없음)
return self.fc_out(x)
class ReplayMemory:
"""
Experience Replay를 위한 메모리 버퍼
"""
def __init__(self, capacity):
self.memory = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
"""경험 저장"""
self.memory.append((state, action, reward, next_state, done))
def sample(self, batch_size):
"""무작위 샘플링"""
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
class DQNAgent:
"""
DQN 에이전트
"""
def __init__(self, state_size, action_size, learning_rate=0.001,
discount_factor=0.99, epsilon=1.0, epsilon_decay=0.995,
epsilon_min=0.01, memory_size=10000, batch_size=64,
target_update_freq=10):
self.state_size = state_size
self.action_size = action_size
self.discount_factor = discount_factor
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min
self.batch_size = batch_size
self.target_update_freq = target_update_freq
# 메모리
self.memory = ReplayMemory(memory_size)
# 네트워크
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.policy_net = DQN(state_size, action_size).to(self.device)
self.target_net = DQN(state_size, action_size).to(self.device)
self.target_net.load_state_dict(self.policy_net.state_dict())
self.target_net.eval()
# 옵티마이저
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)
# 학습 카운터
self.update_count = 0
def get_action(self, state):
"""
Epsilon-greedy 정책으로 행동 선택
"""
if np.random.rand() < self.epsilon:
return np.random.randint(self.action_size)
with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
q_values = self.policy_net(state_tensor)
return q_values.argmax().item()
def remember(self, state, action, reward, next_state, done):
"""
경험을 메모리에 저장
"""
self.memory.push(state, action, reward, next_state, done)
def replay(self):
"""
Experience Replay로 학습
"""
if len(self.memory) < self.batch_size:
return
# 미니배치 샘플링
batch = self.memory.sample(self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
# 텐서로 변환
states = torch.FloatTensor(np.array(states)).to(self.device)
actions = torch.LongTensor(actions).unsqueeze(1).to(self.device)
rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.device)
next_states = torch.FloatTensor(np.array(next_states)).to(self.device)
dones = torch.FloatTensor(dones).unsqueeze(1).to(self.device)
# 현재 Q-값 계산
current_q_values = self.policy_net(states).gather(1, actions)
# 타겟 Q-값 계산
with torch.no_grad():
next_q_values = self.target_net(next_states).max(1)[0].unsqueeze(1)
target_q_values = rewards + (1 - dones) * self.discount_factor * next_q_values
# 손실 계산 및 역전파
loss = F.mse_loss(current_q_values, target_q_values)
self.optimizer.zero_grad()
loss.backward()
# 그래디언트 클리핑 (학습 안정화)
torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), 1.0)
self.optimizer.step()
# 타겟 네트워크 업데이트
self.update_count += 1
if self.update_count % self.target_update_freq == 0:
self.target_net.load_state_dict(self.policy_net.state_dict())
return loss.item()
def decay_epsilon(self):
"""
Epsilon 값 감소
"""
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
# CartPole 환경에서 DQN 학습
def train_dqn(env_name='CartPole-v1', num_episodes=500):
"""
DQN으로 에이전트 학습
"""
env = gym.make(env_name)
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = DQNAgent(state_size, action_size)
episode_rewards = []
for episode in range(num_episodes):
state, _ = env.reset()
total_reward = 0
done = False
while not done:
# 행동 선택
action = agent.get_action(state)
# 환경과 상호작용
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
# 경험 저장
agent.remember(state, action, reward, next_state, done)
# 학습
agent.replay()
state = next_state
total_reward += reward
# Epsilon 감소
agent.decay_epsilon()
episode_rewards.append(total_reward)
# 진행상황 출력
if (episode + 1) % 50 == 0:
avg_reward = np.mean(episode_rewards[-50:])
print(f"Episode {episode + 1}/{num_episodes}, "
f"Avg Reward: {avg_reward:.2f}, "
f"Epsilon: {agent.epsilon:.3f}")
env.close()
return agent, episode_rewards
# 학습 실행
# dqn_agent, dqn_rewards = train_dqn()
DQN의 발전: Double DQN, Dueling DQN
Double DQN
기존 DQN의 과대평가(overestimation) 문제를 해결:
Dueling DQN
가치 함수를 상태 가치와 이점 함수로 분리:
Policy Gradient: 정책을 직접 학습하기
정책 그래디언트의 이론
정책 그래디언트 방법은 정책을 직접 파라미터화하고 최적화합니다. 목표는 기대 누적 보상을 최대화하는 것입니다:
REINFORCE 알고리즘
정책 그래디언트의 기본 형태:
여기서 는 시간 부터의 누적 보상입니다.
베이스라인을 사용한 분산 감소
분산을 줄이기 위해 베이스라인 를 사용:
REINFORCE 구현
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categorical
import gym
import numpy as np
class PolicyNetwork(nn.Module):
"""
정책 네트워크: 상태를 입력받아 행동 확률 분포 출력
"""
def __init__(self, state_size, action_size, hidden_size=128):
super(PolicyNetwork, self).__init__()
self.fc1 = nn.Linear(state_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, action_size)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
# Softmax를 통해 확률 분포 생성
action_probs = F.softmax(self.fc3(x), dim=-1)
return action_probs
class REINFORCEAgent:
"""
REINFORCE 알고리즘 에이전트
"""
def __init__(self, state_size, action_size, learning_rate=0.001,
discount_factor=0.99):
self.state_size = state_size
self.action_size = action_size
self.discount_factor = discount_factor
# 정책 네트워크
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.policy = PolicyNetwork(state_size, action_size).to(self.device)
self.optimizer = optim.Adam(self.policy.parameters(), lr=learning_rate)
# 에피소드 메모리
self.states = []
self.actions = []
self.rewards = []
self.log_probs = []
def get_action(self, state):
"""
정책 네트워크를 사용하여 행동 샘플링
"""
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
action_probs = self.policy(state_tensor)
# 확률 분포에서 행동 샘플링
dist = Categorical(action_probs)
action = dist.sample()
# 로그 확률 저장 (그래디언트 계산에 필요)
self.log_probs.append(dist.log_prob(action))
return action.item()
def remember(self, state, action, reward):
"""
경험 저장
"""
self.states.append(state)
self.actions.append(action)
self.rewards.append(reward)
def calculate_returns(self):
"""
각 시간 단계에서의 누적 보상 계산 (G_t)
"""
returns = []
G = 0
# 역순으로 계산
for reward in reversed(self.rewards):
G = reward + self.discount_factor * G
returns.insert(0, G)
# 정규화 (학습 안정화)
returns = torch.FloatTensor(returns).to(self.device)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
return returns
def update(self):
"""
정책 네트워크 업데이트
"""
# 누적 보상 계산
returns = self.calculate_returns()
# 정책 그래디언트 계산
policy_loss = []
for log_prob, G in zip(self.log_probs, returns):
# -log_prob * G (음수는 경사 상승을 위함)
policy_loss.append(-log_prob * G)
# 손실 계산 및 역전파
policy_loss = torch.stack(policy_loss).sum()
self.optimizer.zero_grad()
policy_loss.backward()
self.optimizer.step()
# 메모리 초기화
self.states = []
self.actions = []
self.rewards = []
self.log_probs = []
return policy_loss.item()
def train_reinforce(env_name='CartPole-v1', num_episodes=1000):
"""
REINFORCE 알고리즘으로 학습
"""
env = gym.make(env_name)
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = REINFORCEAgent(state_size, action_size)
episode_rewards = []
for episode in range(num_episodes):
state, _ = env.reset()
total_reward = 0
done = False
# 에피소드 실행
while not done:
action = agent.get_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
agent.remember(state, action, reward)
state = next_state
total_reward += reward
# 에피소드 종료 후 정책 업데이트
loss = agent.update()
episode_rewards.append(total_reward)
# 진행상황 출력
if (episode + 1) % 50 == 0:
avg_reward = np.mean(episode_rewards[-50:])
print(f"Episode {episode + 1}/{num_episodes}, "
f"Avg Reward: {avg_reward:.2f}, "
f"Loss: {loss:.4f}")
env.close()
return agent, episode_rewards
# 학습 실행
# reinforce_agent, reinforce_rewards = train_reinforce()
Actor-Critic: 두 세계의 장점을 결합
Actor-Critic의 개념
Actor-Critic은 정책 기반(Policy-based)과 가치 기반(Value-based) 방법의 장점을 결합한 알고리즘입니다:
- Actor: 정책 를 학습
- Critic: 가치 함수 또는 를 학습
Advantage Function
이점 함수(Advantage Function)는 특정 행동이 평균보다 얼마나 좋은지를 나타냅니다:
이를 사용한 정책 그래디언트:
A2C (Advantage Actor-Critic) 구현
class ActorCriticNetwork(nn.Module):
"""
Actor-Critic 네트워크: Actor와 Critic이 일부 층을 공유
"""
def __init__(self, state_size, action_size, hidden_size=128):
super(ActorCriticNetwork, self).__init__()
# 공유 층
self.shared_fc1 = nn.Linear(state_size, hidden_size)
self.shared_fc2 = nn.Linear(hidden_size, hidden_size)
# Actor 헤드 (정책)
self.actor_fc = nn.Linear(hidden_size, action_size)
# Critic 헤드 (가치 함수)
self.critic_fc = nn.Linear(hidden_size, 1)
def forward(self, x):
# 공유 특징 추출
x = F.relu(self.shared_fc1(x))
x = F.relu(self.shared_fc2(x))
# Actor: 행동 확률 분포
action_probs = F.softmax(self.actor_fc(x), dim=-1)
# Critic: 상태 가치
state_value = self.critic_fc(x)
return action_probs, state_value
class A2CAgent:
"""
Advantage Actor-Critic (A2C) 에이전트
"""
def __init__(self, state_size, action_size, learning_rate=0.001,
discount_factor=0.99, value_coef=0.5, entropy_coef=0.01):
self.state_size = state_size
self.action_size = action_size
self.discount_factor = discount_factor
self.value_coef = value_coef # 가치 손실의 가중치
self.entropy_coef = entropy_coef # 엔트로피 보너스의 가중치
# 네트워크
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = ActorCriticNetwork(state_size, action_size).to(self.device)
self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
# 메모리
self.states = []
self.actions = []
self.rewards = []
self.values = []
self.log_probs = []
self.entropies = []
def get_action(self, state):
"""
정책에 따라 행동 선택
"""
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
action_probs, state_value = self.model(state_tensor)
# 행동 샘플링
dist = Categorical(action_probs)
action = dist.sample()
# 저장 (학습에 필요)
self.values.append(state_value)
self.log_probs.append(dist.log_prob(action))
self.entropies.append(dist.entropy())
return action.item()
def remember(self, state, action, reward):
"""
경험 저장
"""
self.states.append(state)
self.actions.append(action)
self.rewards.append(reward)
def calculate_returns_and_advantages(self):
"""
Returns와 Advantages 계산
"""
returns = []
advantages = []
G = 0
# Returns 계산
for reward in reversed(self.rewards):
G = reward + self.discount_factor * G
returns.insert(0, G)
returns = torch.FloatTensor(returns).to(self.device)
values = torch.cat(self.values).squeeze()
# Advantages 계산: A(s,a) = G - V(s)
advantages = returns - values.detach()
return returns, advantages
def update(self):
"""
Actor-Critic 업데이트
"""
returns, advantages = self.calculate_returns_and_advantages()
# 손실 계산
policy_loss = []
value_loss = []
for log_prob, advantage, entropy, value, ret in zip(
self.log_probs, advantages, self.entropies, self.values, returns
):
# Actor 손실: -log_prob * advantage
policy_loss.append(-log_prob * advantage)
# Critic 손실: MSE between predicted value and return
value_loss.append(F.mse_loss(value.squeeze(), ret.unsqueeze(0)))
# 전체 손실
policy_loss = torch.stack(policy_loss).sum()
value_loss = torch.stack(value_loss).sum()
entropy_loss = -torch.stack(self.entropies).sum() # 음수는 엔트로피 최대화를 위함
total_loss = policy_loss + self.value_coef * value_loss + self.entropy_coef * entropy_loss
# 역전파
self.optimizer.zero_grad()
total_loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 0.5)
self.optimizer.step()
# 메모리 초기화
self.states = []
self.actions = []
self.rewards = []
self.values = []
self.log_probs = []
self.entropies = []
return total_loss.item()
def train_a2c(env_name='CartPole-v1', num_episodes=500):
"""
A2C로 학습
"""
env = gym.make(env_name)
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = A2CAgent(state_size, action_size)
episode_rewards = []
for episode in range(num_episodes):
state, _ = env.reset()
total_reward = 0
done = False
while not done:
action = agent.get_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
agent.remember(state, action, reward)
state = next_state
total_reward += reward
# 에피소드 종료 후 업데이트
loss = agent.update()
episode_rewards.append(total_reward)
if (episode + 1) % 50 == 0:
avg_reward = np.mean(episode_rewards[-50:])
print(f"Episode {episode + 1}/{num_episodes}, "
f"Avg Reward: {avg_reward:.2f}, "
f"Loss: {loss:.4f}")
env.close()
return agent, episode_rewards
# 학습 실행
# a2c_agent, a2c_rewards = train_a2c()
고급 주제 및 최신 기법
PPO (Proximal Policy Optimization)
PPO는 정책 업데이트를 안정화하기 위해 클리핑을 사용합니다:
여기서 는 중요도 비율입니다.
SAC (Soft Actor-Critic)
엔트로피를 보상에 포함하여 탐험을 장려:
Multi-Agent Reinforcement Learning
여러 에이전트가 동시에 학습하는 환경:
class MultiAgentEnvironment:
"""
다중 에이전트 환경 예제
"""
def __init__(self, num_agents):
self.num_agents = num_agents
self.agents = [A2CAgent(state_size, action_size)
for _ in range(num_agents)]
def step(self, actions):
"""
모든 에이전트의 행동을 동시에 실행
"""
# 환경 업데이트 로직
pass
def train(self, num_episodes):
"""
다중 에이전트 학습
"""
for episode in range(num_episodes):
# 각 에이전트가 독립적으로 또는 협력하여 학습
pass
실전 응용 및 프로젝트
OpenAI Gym을 활용한 완전한 학습 파이프라인
import gym
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Tuple
class RLTrainer:
"""
범용 강화학습 트레이너
"""
def __init__(self, env_name: str, agent_class, agent_kwargs: dict):
self.env = gym.make(env_name)
self.agent = agent_class(
state_size=self.env.observation_space.shape[0],
action_size=self.env.action_space.n,
**agent_kwargs
)
self.episode_rewards = []
self.episode_lengths = []
self.losses = []
def train(self, num_episodes: int, render_interval: int = None,
save_interval: int = None, target_reward: float = None):
"""
학습 실행
"""
best_avg_reward = -float('inf')
for episode in range(num_episodes):
state, _ = self.env.reset()
total_reward = 0
steps = 0
done = False
while not done:
# 렌더링
if render_interval and episode % render_interval == 0:
self.env.render()
# 행동 선택 및 실행
action = self.agent.get_action(state)
next_state, reward, terminated, truncated, _ = self.env.step(action)
done = terminated or truncated
# 학습 (알고리즘에 따라 다름)
if hasattr(self.agent, 'remember'):
self.agent.remember(state, action, reward)
if hasattr(self.agent, 'replay'):
loss = self.agent.replay()
if loss:
self.losses.append(loss)
state = next_state
total_reward += reward
steps += 1
# 에피소드 종료 후 처리
if hasattr(self.agent, 'update'):
loss = self.agent.update()
if loss:
self.losses.append(loss)
if hasattr(self.agent, 'decay_epsilon'):
self.agent.decay_epsilon()
# 통계 기록
self.episode_rewards.append(total_reward)
self.episode_lengths.append(steps)
# 진행상황 출력
if (episode + 1) % 10 == 0:
avg_reward = np.mean(self.episode_rewards[-100:])
avg_length = np.mean(self.episode_lengths[-100:])
print(f"Episode {episode + 1}/{num_episodes}")
print(f" Avg Reward (100 ep): {avg_reward:.2f}")
print(f" Avg Length (100 ep): {avg_length:.2f}")
print(f" Current Reward: {total_reward:.2f}")
# 모델 저장
if save_interval and (episode + 1) % save_interval == 0:
if avg_reward > best_avg_reward:
best_avg_reward = avg_reward
self.save_model(f"best_model_ep{episode+1}.pth")
# 목표 달성 확인
if target_reward and avg_reward >= target_reward:
print(f"\nTarget reward {target_reward} reached!")
break
self.env.close()
return self.episode_rewards, self.episode_lengths
def save_model(self, filename: str):
"""
모델 저장
"""
if hasattr(self.agent, 'policy_net'):
torch.save(self.agent.policy_net.state_dict(), filename)
elif hasattr(self.agent, 'policy'):
torch.save(self.agent.policy.state_dict(), filename)
elif hasattr(self.agent, 'model'):
torch.save(self.agent.model.state_dict(), filename)
def plot_results(self):
"""
학습 결과 시각화
"""
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 에피소드별 보상
axes[0, 0].plot(self.episode_rewards, alpha=0.3, label='Episode Reward')
axes[0, 0].plot(self._moving_average(self.episode_rewards, 100),
label='100-ep Moving Avg')
axes[0, 0].set_xlabel('Episode')
axes[0, 0].set_ylabel('Reward')
axes[0, 0].set_title('Training Rewards')
axes[0, 0].legend()
axes[0, 0].grid(True)
# 에피소드 길이
axes[0, 1].plot(self.episode_lengths, alpha=0.3, label='Episode Length')
axes[0, 1].plot(self._moving_average(self.episode_lengths, 100),
label='100-ep Moving Avg')
axes[0, 1].set_xlabel('Episode')
axes[0, 1].set_ylabel('Steps')
axes[0, 1].set_title('Episode Lengths')
axes[0, 1].legend()
axes[0, 1].grid(True)
# 손실
if self.losses:
axes[1, 0].plot(self.losses, alpha=0.5)
axes[1, 0].set_xlabel('Update Step')
axes[1, 0].set_ylabel('Loss')
axes[1, 0].set_title('Training Loss')
axes[1, 0].grid(True)
# 보상 분포
axes[1, 1].hist(self.episode_rewards, bins=50, edgecolor='black')
axes[1, 1].set_xlabel('Reward')
axes[1, 1].set_ylabel('Frequency')
axes[1, 1].set_title('Reward Distribution')
axes[1, 1].grid(True)
plt.tight_layout()
plt.savefig('training_results.png', dpi=300, bbox_inches='tight')
plt.show()
@staticmethod
def _moving_average(data: List[float], window: int) -> List[float]:
"""
이동 평균 계산
"""
if len(data) < window:
return data
cumsum = np.cumsum(data)
cumsum[window:] = cumsum[window:] - cumsum[:-window]
return cumsum[window - 1:] / window
# 사용 예제
"""
# DQN으로 CartPole 학습
trainer = RLTrainer(
env_name='CartPole-v1',
agent_class=DQNAgent,
agent_kwargs={
'learning_rate': 0.001,
'discount_factor': 0.99,
'epsilon': 1.0,
'epsilon_decay': 0.995,
'epsilon_min': 0.01
}
)
rewards, lengths = trainer.train(
num_episodes=500,
save_interval=100,
target_reward=195.0
)
trainer.plot_results()
"""
커스텀 환경 만들기
import gym
from gym import spaces
class CustomGridWorld(gym.Env):
"""
커스텀 그리드 월드 환경
"""
def __init__(self, grid_size=5):
super(CustomGridWorld, self).__init__()
self.grid_size = grid_size
self.state = None
self.goal = (grid_size - 1, grid_size - 1)
# 행동 공간: 상, 하, 좌, 우
self.action_space = spaces.Discrete(4)
# 상태 공간: (x, y) 좌표
self.observation_space = spaces.Box(
low=0, high=grid_size-1,
shape=(2,), dtype=np.int32
)
def reset(self):
"""
환경 초기화
"""
self.state = np.array([0, 0])
return self.state, {}
def step(self, action):
"""
행동 실행
"""
# 행동에 따라 상태 변경
if action == 0: # 상
self.state[1] = max(0, self.state[1] - 1)
elif action == 1: # 하
self.state[1] = min(self.grid_size - 1, self.state[1] + 1)
elif action == 2: # 좌
self.state[0] = max(0, self.state[0] - 1)
elif action == 3: # 우
self.state[0] = min(self.grid_size - 1, self.state[0] + 1)
# 보상 계산
if tuple(self.state) == self.goal:
reward = 10.0
done = True
else:
reward = -0.1 # 각 단계마다 작은 패널티
done = False
return self.state, reward, done, False, {}
def render(self, mode='human'):
"""
환경 시각화
"""
grid = np.zeros((self.grid_size, self.grid_size))
grid[tuple(self.state)] = 1
grid[self.goal] = 2
print(grid)
# 커스텀 환경 사용
"""
env = CustomGridWorld(grid_size=5)
state, _ = env.reset()
print(f"Initial state: {state}")
for _ in range(10):
action = env.action_space.sample()
state, reward, done, _, _ = env.step(action)
print(f"Action: {action}, State: {state}, Reward: {reward}")
env.render()
if done:
break
"""
결론 및 향후 방향
강화학습의 현재와 미래
강화학습은 인공지능의 중요한 한 분야로, 에이전트가 환경과 상호작용하면서 스스로 학습하는 과정을 거칩니다. 본 가이드에서는 다음 내용을 다루었습니다:
- 역사적 발전: 1950년대 행동주의 심리학부터 현대의 대규모 언어 모델까지
- 수학적 기초: MDP, 벨만 방정식, 가치 함수의 이론적 배경
- 주요 알고리즘들:
- Q-Learning: 테이블 기반 가치 학습
- DQN: 딥러닝을 활용한 고차원 상태 공간 처리
- Policy Gradient (REINFORCE): 정책 직접 최적화
- Actor-Critic (A2C): 두 접근법의 결합
- 실전 구현: OpenAI Gym을 활용한 완전한 학습 파이프라인
주요 도전 과제
- 샘플 효율성: 강화학습은 많은 샘플이 필요하며, 이는 실제 환경에서 비용이 클 수 있습니다.
- 안정성: 학습 과정이 불안정할 수 있으며, 하이퍼파라미터에 민감합니다.
- 일반화: 특정 환경에서 학습한 정책이 다른 환경으로 전이되기 어렵습니다.
- 안전성: 실제 시스템에서 탐험 과정이 위험할 수 있습니다.
- 보상 설계: 복잡한 작업에 대한 적절한 보상 함수를 설계하기 어렵습니다.
최신 연구 방향
- 오프라인 강화학습(Offline RL): 기존 데이터셋만으로 학습
- 메타 강화학습(Meta-RL): 빠른 적응과 전이 학습
- 계층적 강화학습(Hierarchical RL): 복잡한 작업을 하위 작업으로 분해
- 모델 기반 강화학습(Model-based RL): 환경 모델을 학습하여 샘플 효율성 향상
- 인간 피드백 강화학습(RLHF): 인간의 선호도를 활용한 정렬
추천 학습 자료
서적
- Sutton, R. S., & Barto, A. G. (2018). "Reinforcement Learning: An Introduction" (2nd Edition)
- Graesser, L., & Keng, W. L. (2019). "Foundations of Deep Reinforcement Learning"
- Lapan, M. (2020). "Deep Reinforcement Learning Hands-On" (2nd Edition)
온라인 강좌
- David Silver's RL Course (UCL/DeepMind)
- Stanford CS234: Reinforcement Learning
- Berkeley CS285: Deep Reinforcement Learning
리소스 및 라이브러리
- OpenAI Gym: https://gym.openai.com/
- Stable-Baselines3: https://stable-baselines3.readthedocs.io/
- RLlib (Ray): https://docs.ray.io/en/latest/rllib/
- Spinning Up in Deep RL (OpenAI): https://spinningup.openai.com/
연구 논문 및 블로그
- DeepMind Research: https://deepmind.com/research/
- OpenAI Blog: https://openai.com/blog/
- Distill.pub (시각적 설명): https://distill.pub/
- Lil'Log (Lilian Weng's Blog): https://lilianweng.github.io/
실전 프로젝트 아이디어
- 게임 AI: Atari 게임, 체스, 바둑 등
- 로봇 제어: 시뮬레이션 환경에서 로봇 팔 제어
- 자율 주행: CARLA 시뮬레이터 활용
- 추천 시스템: 사용자 선호도 학습
- 금융 거래: 주식 거래 전략 학습
- 자원 관리: 데이터센터 냉각 최적화
강화학습은 계속 발전하는 분야이며, 새로운 알고리즘과 응용이 끊임없이 등장하고 있습니다. 이 가이드가 강화학습의 세계로 들어가는 여러분에게 든든한 길잡이가 되기를 바랍니다. 지속적인 학습과 실험을 통해 더 깊은 이해를 얻으시기를 응원합니다!

