| name | reinforcement-learning |
| description | Q-learning, DQN, PPO, A3C, policy gradient methods, multi-agent systems, and Gym environments. Use for training agents, game AI, robotics, or decision-making systems. |
| sasmp_version | 1.3.0 |
| bonded_agent | 04-machine-learning-ai |
| bond_type | PRIMARY_BOND |
Reinforcement Learning
Train intelligent agents that learn optimal behavior through interaction with environments.
Quick Start
OpenAI Gymnasium Setup
import gymnasium as gym
import numpy as np
# Create environment
env = gym.make('CartPole-v1')
# Environment info
print(f"Observation space: {env.observation_space}")
print(f"Action space: {env.action_space}")
# Basic interaction loop
observation, info = env.reset()
for _ in range(1000):
action = env.action_space.sample() # Random action
observation, reward, terminated, truncated, info = env.step(action)
if terminated or truncated:
observation, info = env.reset()
env.close()
Q-Learning (Tabular)
import numpy as np
class QLearning:
"""Tabular Q-Learning for discrete state/action spaces"""
def __init__(self, n_states, n_actions, lr=0.1, gamma=0.99, epsilon=1.0):
self.q_table = np.zeros((n_states, n_actions))
self.lr = lr
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
def get_action(self, state):
"""Epsilon-greedy action selection"""
if np.random.random() < self.epsilon:
return np.random.randint(self.q_table.shape[1])
return np.argmax(self.q_table[state])
def update(self, state, action, reward, next_state, done):
"""Update Q-value using Bellman equation"""
if done:
target = reward
else:
target = reward + self.gamma * np.max(self.q_table[next_state])
self.q_table[state, action] += self.lr * (target - self.q_table[state, action])
# Decay epsilon
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
# Training loop
env = gym.make('FrozenLake-v1')
agent = QLearning(n_states=16, n_actions=4)
for episode in range(10000):
state, _ = env.reset()
total_reward = 0
while True:
action = agent.get_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
agent.update(state, action, reward, next_state, terminated)
total_reward += reward
state = next_state
if terminated or truncated:
break
Deep Q-Network (DQN)
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
class DQN(nn.Module):
"""Deep Q-Network"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(DQN, self).__init__()
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim)
)
def forward(self, x):
return self.network(x)
class ReplayBuffer:
"""Experience replay buffer"""
def __init__(self, capacity=100000):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
return (
torch.FloatTensor(states),
torch.LongTensor(actions),
torch.FloatTensor(rewards),
torch.FloatTensor(next_states),
torch.FloatTensor(dones)
)
def __len__(self):
return len(self.buffer)
class DQNAgent:
"""DQN Agent with target network and experience replay"""
def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99,
epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995):
self.action_dim = action_dim
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_min = epsilon_min
self.epsilon_decay = epsilon_decay
# Networks
self.policy_net = DQN(state_dim, action_dim)
self.target_net = DQN(state_dim, action_dim)
self.target_net.load_state_dict(self.policy_net.state_dict())
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
self.buffer = ReplayBuffer()
def get_action(self, state):
if np.random.random() < self.epsilon:
return np.random.randint(self.action_dim)
with torch.no_grad():
state = torch.FloatTensor(state).unsqueeze(0)
q_values = self.policy_net(state)
return q_values.argmax().item()
def train(self, batch_size=64):
if len(self.buffer) < batch_size:
return
states, actions, rewards, next_states, dones = self.buffer.sample(batch_size)
# Current Q values
current_q = self.policy_net(states).gather(1, actions.unsqueeze(1))
# Target Q values
with torch.no_grad():
next_q = self.target_net(next_states).max(1)[0]
target_q = rewards + self.gamma * next_q * (1 - dones)
# Loss
loss = nn.MSELoss()(current_q.squeeze(), target_q)
# Optimize
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Decay epsilon
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def update_target(self):
"""Update target network"""
self.target_net.load_state_dict(self.policy_net.state_dict())
Policy Gradient Methods
REINFORCE
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
class PolicyNetwork(nn.Module):
"""Policy network for REINFORCE"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super().__init__()
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim),
nn.Softmax(dim=-1)
)
def forward(self, x):
return self.network(x)
def get_action(self, state):
probs = self.forward(torch.FloatTensor(state))
dist = Categorical(probs)
action = dist.sample()
return action.item(), dist.log_prob(action)
class REINFORCE:
"""REINFORCE with baseline"""
def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99):
self.policy = PolicyNetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
self.gamma = gamma
def compute_returns(self, rewards):
"""Compute discounted returns"""
returns = []
G = 0
for r in reversed(rewards):
G = r + self.gamma * G
returns.insert(0, G)
returns = torch.tensor(returns)
# Normalize for stable training
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
return returns
def update(self, log_probs, rewards):
returns = self.compute_returns(rewards)
log_probs = torch.stack(log_probs)
# Policy gradient loss
loss = -(log_probs * returns).mean()
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Training
agent = REINFORCE(state_dim=4, action_dim=2)
for episode in range(1000):
state, _ = env.reset()
log_probs = []
rewards = []
while True:
action, log_prob = agent.policy.get_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
log_probs.append(log_prob)
rewards.append(reward)
state = next_state
if terminated or truncated:
break
agent.update(log_probs, rewards)
Proximal Policy Optimization (PPO)
import torch
import torch.nn as nn
import torch.optim as optim
class ActorCritic(nn.Module):
"""Actor-Critic network for PPO"""
def __init__(self, state_dim, action_dim, hidden_dim=256):
super().__init__()
# Shared feature extractor
self.features = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU()
)
# Actor (policy)
self.actor = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim),
nn.Softmax(dim=-1)
)
# Critic (value function)
self.critic = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
def forward(self, x):
features = self.features(x)
return self.actor(features), self.critic(features)
class PPO:
"""Proximal Policy Optimization"""
def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99,
clip_ratio=0.2, epochs=10, batch_size=64):
self.model = ActorCritic(state_dim, action_dim)
self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
self.gamma = gamma
self.clip_ratio = clip_ratio
self.epochs = epochs
self.batch_size = batch_size
def compute_gae(self, rewards, values, dones, gamma=0.99, lam=0.95):
"""Generalized Advantage Estimation"""
advantages = []
gae = 0
for t in reversed(range(len(rewards))):
if t == len(rewards) - 1:
next_value = 0
else:
next_value = values[t + 1]
delta = rewards[t] + gamma * next_value * (1 - dones[t]) - values[t]
gae = delta + gamma * lam * (1 - dones[t]) * gae
advantages.insert(0, gae)
return torch.tensor(advantages)
def update(self, states, actions, old_log_probs, returns, advantages):
"""PPO update with clipping"""
for _ in range(self.epochs):
# Get current policy outputs
probs, values = self.model(states)
dist = Categorical(probs)
log_probs = dist.log_prob(actions)
entropy = dist.entropy().mean()
# Ratio for PPO clipping
ratio = torch.exp(log_probs - old_log_probs)
# Clipped surrogate loss
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1 - self.clip_ratio,
1 + self.clip_ratio) * advantages
actor_loss = -torch.min(surr1, surr2).mean()
# Critic loss
critic_loss = nn.MSELoss()(values.squeeze(), returns)
# Total loss with entropy bonus
loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy
self.optimizer.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(self.model.parameters(), 0.5)
self.optimizer.step()
Multi-Agent RL
class MultiAgentEnv:
"""Simple multi-agent environment wrapper"""
def __init__(self, n_agents, env_fn):
self.n_agents = n_agents
self.envs = [env_fn() for _ in range(n_agents)]
def reset(self):
return [env.reset()[0] for env in self.envs]
def step(self, actions):
results = [env.step(a) for env, a in zip(self.envs, actions)]
observations = [r[0] for r in results]
rewards = [r[1] for r in results]
dones = [r[2] or r[3] for r in results]
return observations, rewards, dones
class IndependentLearners:
"""Independent Q-learning agents"""
def __init__(self, n_agents, state_dim, action_dim):
self.agents = [
DQNAgent(state_dim, action_dim)
for _ in range(n_agents)
]
def get_actions(self, observations):
return [agent.get_action(obs)
for agent, obs in zip(self.agents, observations)]
def train(self):
for agent in self.agents:
agent.train()
Reward Shaping
def shape_reward(reward, state, next_state, done, info):
"""Design better reward signals"""
shaped_reward = reward
# Progress reward (encourage forward movement)
if 'x_position' in info:
progress = info['x_position'] - info.get('prev_x', 0)
shaped_reward += 0.1 * progress
# Survival bonus
if not done:
shaped_reward += 0.01
# Penalty for dangerous states
if 'danger_zone' in info and info['danger_zone']:
shaped_reward -= 0.5
# Goal proximity reward
if 'goal_distance' in info:
shaped_reward += 0.1 * (1.0 / (info['goal_distance'] + 1))
return shaped_reward
# Curriculum learning
class CurriculumEnv:
"""Environment with difficulty progression"""
def __init__(self, base_env, difficulty_schedule):
self.env = base_env
self.schedule = difficulty_schedule
self.current_level = 0
self.episode_count = 0
def reset(self):
self.episode_count += 1
# Increase difficulty based on schedule
if self.episode_count in self.schedule:
self.current_level += 1
self._update_difficulty()
return self.env.reset()
def _update_difficulty(self):
# Modify environment parameters
pass
Stable Baselines3 (Production Ready)
from stable_baselines3 import PPO, DQN, A2C
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.callbacks import EvalCallback
# Vectorized environments for parallel training
def make_env():
return gym.make('CartPole-v1')
env = DummyVecEnv([make_env for _ in range(4)])
# Train PPO agent
model = PPO(
'MlpPolicy',
env,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
gamma=0.99,
gae_lambda=0.95,
clip_range=0.2,
verbose=1,
tensorboard_log="./ppo_logs/"
)
# Evaluation callback
eval_env = gym.make('CartPole-v1')
eval_callback = EvalCallback(
eval_env,
best_model_save_path='./best_model/',
log_path='./logs/',
eval_freq=1000,
n_eval_episodes=10
)
# Train
model.learn(total_timesteps=100000, callback=eval_callback)
# Save and load
model.save("ppo_cartpole")
model = PPO.load("ppo_cartpole")
# Inference
obs = env.reset()
for _ in range(1000):
action, _ = model.predict(obs, deterministic=True)
obs, reward, done, info = env.step(action)
Hyperparameter Tuning
# Common hyperparameter ranges
rl_hyperparameters = {
"learning_rate": [1e-4, 3e-4, 1e-3],
"gamma": [0.95, 0.99, 0.999],
"batch_size": [32, 64, 128, 256],
"n_steps": [128, 256, 512, 2048],
"clip_range": [0.1, 0.2, 0.3],
"entropy_coef": [0.0, 0.01, 0.05],
"hidden_sizes": [(64, 64), (128, 128), (256, 256)]
}
# Optuna tuning
import optuna
def objective(trial):
lr = trial.suggest_float('lr', 1e-5, 1e-2, log=True)
gamma = trial.suggest_float('gamma', 0.9, 0.9999)
n_steps = trial.suggest_int('n_steps', 128, 2048, step=128)
model = PPO('MlpPolicy', env, learning_rate=lr,
gamma=gamma, n_steps=n_steps)
model.learn(total_timesteps=50000)
# Evaluate
mean_reward = evaluate_policy(model, eval_env, n_eval_episodes=10)
return mean_reward
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50)
Common Issues & Solutions
Issue: Training instability
Solutions:
- Reduce learning rate
- Increase batch size
- Use gradient clipping
- Normalize observations and rewards
- Use proper random seeds
Issue: Poor exploration
Solutions:
- Increase epsilon/entropy
- Use curiosity-driven exploration
- Add noise to actions (Gaussian, OU)
- Use count-based exploration bonus
Issue: Reward hacking
Solutions:
- Careful reward design
- Use sparse rewards when possible
- Test with adversarial evaluation
- Monitor for unexpected behaviors
Best Practices
- Environment: Verify env correctness before training
- Normalization: Normalize states and rewards
- Logging: Track episode rewards, lengths, losses
- Reproducibility: Set seeds for all random sources
- Evaluation: Separate eval environment, many episodes
- Hyperparameters: Start with known good defaults
- Baseline: Compare against random policy