| name | rl-environments |
| description | Gym/gymnasium API - custom environments, spaces, wrappers, vectorization, debugging |
RL Environments: Building and Debugging Custom Environments
When to Use This Skill
Invoke this skill when you need to:
- Create Custom Environments: Build a new environment from scratch using Gym/gymnasium
- Define Observation/Action Spaces: Design Box, Discrete, Dict, Tuple spaces correctly
- Use Environment Wrappers: Add preprocessing, modify rewards, implement time limits
- Parallelize Environments: Choose between DummyVectorEnv, SyncVectorEnv, AsyncVectorEnv
- Debug Environment Bugs: Diagnose reset/step issues, reward scaling, space mismatches
- Test Environments: Validate environments before training agents
- Handle API Differences: Migrate between Gym versions or Gym vs gymnasium
- Implement Complex State: Manage multi-component observations and state systems
- Enforce Action Bounds: Properly clip or scale actions
- Catch Common Pitfalls: Avoid 10+ common environment implementation mistakes
Core Problem: Environments are the foundation of RL training. Broken environments cause 80% of RL failures, but environment bugs are often missed because they don't error—they silently break training. This skill systematically teaches correct environment design and provides a debugging methodology.
Do NOT Use This Skill For
- Algorithm implementation (route to specific algorithm skills like value-based-methods, policy-gradient-methods, actor-critic-methods)
- Reward design and shaping (route to reward-shaping-engineering for reward function engineering and potential-based shaping)
- RL theory and foundations (route to rl-foundations for MDPs, Bellman equations, value functions)
- Training debugging beyond environment issues (route to rl-debugging for systematic diagnosis of training failures)
- Exploration strategy selection (route to exploration-strategies for ε-greedy, curiosity-driven, RND methods)
Part 1: Understanding the Gym/Gymnasium API
The Standard Interface
Every Gym/Gymnasium environment implements:
import gymnasium as gym # or 'gym' for older versions
class CustomEnv(gym.Env):
"""Template for all custom environments"""
def __init__(self):
# Define action and observation spaces
self.action_space = gym.spaces.Discrete(4) # 4 possible actions
self.observation_space = gym.spaces.Box(
low=0, high=255, shape=(84, 84, 3), dtype=np.uint8
)
def reset(self, seed=None):
"""Reset environment to initial state
Returns:
observation (np.ndarray): Initial observation
info (dict): Auxiliary info (can be empty dict)
"""
super().reset(seed=seed)
obs = self._get_initial_observation()
info = {}
return obs, info
def step(self, action):
"""Take one action in the environment
Args:
action: Action from action_space
Returns:
observation (np.ndarray): Current observation after action
reward (float): Reward for this step
terminated (bool): True if episode ended (goal/failure)
truncated (bool): True if episode cut off (time limit)
info (dict): Auxiliary info
"""
obs = self._apply_action(action)
reward = self._compute_reward()
terminated = self._is_done()
truncated = False # Set by TimeLimit wrapper usually
info = {}
return obs, reward, terminated, truncated, info
def render(self, mode='human'):
"""Visualize the environment (optional)"""
pass
def close(self):
"""Cleanup resources (optional)"""
pass
Key API Points
1. Reset Format (Gymnasium API)
# CORRECT: Reset returns (observation, info)
observation, info = env.reset()
# WRONG: Old Gym API returned just observation
observation = env.reset() # This is Gym, not Gymnasium
2. Step Format (Gymnasium API)
# CORRECT: Step returns (obs, reward, terminated, truncated, info)
obs, reward, terminated, truncated, info = env.step(action)
# WRONG: Old Gym API
obs, reward, done, info = env.step(action) # 'done' is single boolean
3. Gym vs Gymnasium
| Feature | Gym (OpenAI) | Gymnasium (Maintained) |
|---|---|---|
| Reset return | obs |
(obs, info) |
| Step return | (obs, r, done, info) |
(obs, r, terminated, truncated, info) |
| Render | env.render(mode='human') |
env.render(); mode set at init |
| Import | import gym |
import gymnasium as gym |
| Support | Deprecated | Current standard |
Decision: Use gymnasium for new code. If stuck with older code:
# Compatibility wrapper
try:
import gymnasium as gym
except ImportError:
import gym
Part 2: Observation and Action Space Design
Space Types
Discrete Space (for discrete actions or observations)
# 4 possible actions: 0, 1, 2, 3
action_space = gym.spaces.Discrete(4)
# 5 possible discrete states
observation_space = gym.spaces.Discrete(5)
# With start parameter
action_space = gym.spaces.Discrete(4, start=1) # 1, 2, 3, 4
Box Space (for continuous or image data)
# Continuous control: 3D position, each in [-1, 1]
action_space = gym.spaces.Box(
low=-1.0,
high=1.0,
shape=(3,),
dtype=np.float32
)
# Image observation: 84x84 RGB, pixels 0-255
observation_space = gym.spaces.Box(
low=0,
high=255,
shape=(84, 84, 3),
dtype=np.uint8
)
# Multi-component continuous: 2D position + 1D velocity
observation_space = gym.spaces.Box(
low=np.array([-1.0, -1.0, -10.0]),
high=np.array([1.0, 1.0, 10.0]),
dtype=np.float32
)
Dict Space (for structured observations with multiple components)
# Multi-component observation: image + state vector
observation_space = gym.spaces.Dict({
'image': gym.spaces.Box(0, 255, (84, 84, 3), dtype=np.uint8),
'position': gym.spaces.Box(-1, 1, (2,), dtype=np.float32),
})
# Access in reset/step:
obs = {
'image': np.random.randint(0, 256, (84, 84, 3), dtype=np.uint8),
'position': np.array([0.5, -0.3], dtype=np.float32),
}
Tuple Space (for ordered multiple components)
observation_space = gym.spaces.Tuple((
gym.spaces.Box(-1, 1, (2,), dtype=np.float32), # Position
gym.spaces.Discrete(4), # Direction
))
# Access:
obs = (np.array([0.5, -0.3], dtype=np.float32), 2)
MultiDiscrete (for multiple discrete action dimensions)
# Game with 4 actions per agent, 3 agents
action_space = gym.spaces.MultiDiscrete([4, 4, 4])
# Or asymmetric
action_space = gym.spaces.MultiDiscrete([3, 4, 5]) # Different choices per dimension
Space Validation Patterns
Always validate that observations match the space:
def reset(self, seed=None):
super().reset(seed=seed)
obs = self._get_observation()
# CRITICAL: Validate observation against space
assert self.observation_space.contains(obs), \
f"Observation {obs} not in space {self.observation_space}"
return obs, {}
def step(self, action):
# CRITICAL: Validate action is in action space
assert self.action_space.contains(action), \
f"Action {action} not in space {self.action_space}"
obs = self._apply_action(action)
# Validate observation
assert self.observation_space.contains(obs), \
f"Observation {obs} not in space {self.observation_space}"
reward = self._compute_reward()
terminated = self._check_done()
truncated = False
return obs, reward, terminated, truncated, {}
Common Space Mistakes
Mistake 1: dtype mismatch (uint8 vs float32)
# WRONG: Space says uint8 but observation is float32
observation_space = gym.spaces.Box(0, 255, (84, 84, 3), dtype=np.uint8)
obs = np.random.random((84, 84, 3)).astype(np.float32) # MISMATCH!
assert self.observation_space.contains(obs) # FAILS
# CORRECT: Match dtype
observation_space = gym.spaces.Box(0, 1, (84, 84, 3), dtype=np.float32)
obs = np.random.random((84, 84, 3)).astype(np.float32)
assert self.observation_space.contains(obs) # PASSES
Mistake 2: Range mismatch
# WRONG: Observation outside declared range
observation_space = gym.spaces.Box(0, 1, (4,), dtype=np.float32)
obs = np.array([0.5, 1.5, 0.2, 0.8], dtype=np.float32) # 1.5 > 1!
assert self.observation_space.contains(obs) # FAILS
# CORRECT: Ensure observations stay within bounds
obs = np.clip(obs, 0, 1)
Mistake 3: Shape mismatch
# WRONG: Wrong shape
observation_space = gym.spaces.Box(0, 255, (84, 84, 3), dtype=np.uint8)
obs = np.random.randint(0, 256, (84, 84), dtype=np.uint8) # 2D, not 3D!
assert self.observation_space.contains(obs) # FAILS
# CORRECT: Match shape exactly
obs = np.random.randint(0, 256, (84, 84, 3), dtype=np.uint8)
Part 3: Creating Custom Environments - Template
Step 1: Inherit from gym.Env
import gymnasium as gym
import numpy as np
class CartPoleMini(gym.Env):
"""Simple environment for demonstration"""
# These are required attributes
metadata = {"render_modes": ["human"], "render_fps": 30}
def __init__(self, render_mode=None):
# Store render mode
self.render_mode = render_mode
# Action space: push cart left (0) or right (1)
self.action_space = gym.spaces.Discrete(2)
# Observation space: position, velocity, angle, angular velocity
self.observation_space = gym.spaces.Box(
low=np.array([-2.4, -10, -0.2, -10], dtype=np.float32),
high=np.array([2.4, 10, 0.2, 10], dtype=np.float32),
dtype=np.float32
)
# Episode variables
self.state = None
self.steps = 0
self.max_steps = 500
Step 2: Implement reset()
def reset(self, seed=None):
"""Reset to initial state
Returns:
obs (np.ndarray): Initial observation
info (dict): Empty dict
"""
super().reset(seed=seed)
# Initialize state to center position with small noise
self.state = np.array(
[
self.np_random.uniform(-0.05, 0.05), # position
0.0, # velocity
self.np_random.uniform(-0.05, 0.05), # angle
0.0, # angular velocity
],
dtype=np.float32
)
self.steps = 0
# Validate and return
assert self.observation_space.contains(self.state)
return self.state, {}
Step 3: Implement step()
def step(self, action):
"""Execute one step of the environment
Args:
action: 0 (push left) or 1 (push right)
Returns:
obs, reward, terminated, truncated, info
"""
assert self.action_space.contains(action)
# Validate state
assert self.observation_space.contains(self.state)
x, x_dot, theta, theta_dot = self.state
# Physics: apply force based on action
force = 10.0 if action == 1 else -10.0
# Simplified cartpole physics
acceleration = (force + 0.1 * theta) / 1.0
theta_dot_new = theta_dot + 0.02 * acceleration
theta_new = theta + 0.02 * theta_dot
x_dot_new = x_dot + 0.02 * acceleration
x_new = x + 0.02 * x_dot
# Update state
self.state = np.array(
[x_new, x_dot_new, theta_new, theta_dot_new],
dtype=np.float32
)
# Clamp values to stay in bounds
self.state = np.clip(self.state,
self.observation_space.low,
self.observation_space.high)
# Compute reward
reward = 1.0 if abs(theta) < 0.2 else -1.0
# Check termination
x, theta = self.state[0], self.state[2]
terminated = abs(x) > 2.4 or abs(theta) > 0.2
# Check truncation (max steps)
self.steps += 1
truncated = self.steps >= self.max_steps
# Validate output
assert self.observation_space.contains(self.state)
assert isinstance(reward, (int, float))
return self.state, float(reward), terminated, truncated, {}
Step 4: Implement render() and close() (Optional)
def render(self):
"""Render the environment (optional)"""
if self.render_mode == "human":
# Print state for visualization
x, x_dot, theta, theta_dot = self.state
print(f"Position: {x:.2f}, Angle: {theta:.2f}")
def close(self):
"""Cleanup (optional)"""
pass
Complete Custom Environment Example
import gymnasium as gym
import numpy as np
class GridWorldEnv(gym.Env):
"""Simple 5x5 grid world where agent seeks goal"""
def __init__(self):
# Actions: up=0, right=1, down=2, left=3
self.action_space = gym.spaces.Discrete(4)
# Observation: (x, y) position
self.observation_space = gym.spaces.Box(
low=0, high=4, shape=(2,), dtype=np.int32
)
self.grid_size = 5
self.goal = np.array([4, 4], dtype=np.int32)
self.agent_pos = np.array([0, 0], dtype=np.int32)
self.steps = 0
self.max_steps = 50
def reset(self, seed=None):
super().reset(seed=seed)
self.agent_pos = np.array([0, 0], dtype=np.int32)
self.steps = 0
assert self.observation_space.contains(self.agent_pos)
return self.agent_pos.copy(), {}
def step(self, action):
assert self.action_space.contains(action)
# Move agent
moves = {
0: np.array([0, 1], dtype=np.int32), # up
1: np.array([1, 0], dtype=np.int32), # right
2: np.array([0, -1], dtype=np.int32), # down
3: np.array([-1, 0], dtype=np.int32), # left
}
self.agent_pos += moves[action]
self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size - 1)
# Reward
distance_to_goal = np.linalg.norm(self.agent_pos - self.goal)
reward = 1.0 if np.array_equal(self.agent_pos, self.goal) else -0.01
# Done
terminated = np.array_equal(self.agent_pos, self.goal)
self.steps += 1
truncated = self.steps >= self.max_steps
return self.agent_pos.copy(), reward, terminated, truncated, {}
Part 4: Environment Wrappers
Why Use Wrappers?
Wrappers add functionality without modifying the original environment:
# Without wrappers: modify environment directly (WRONG - mixes concerns)
class CartPoleNormalized(CartPole):
def step(self, action):
obs, reward, done, info = super().step(action)
obs = obs / 2.4 # Normalize observation
reward = reward / 100 # Normalize reward
return obs, reward, done, info
# With wrappers: compose functionality (RIGHT - clean separation)
env = CartPole()
env = NormalizeObservation(env)
env = NormalizeReward(env)
Wrapper Pattern
class BaseWrapper(gym.Wrapper):
"""Base class for all wrappers"""
def __init__(self, env):
super().__init__(env)
# Don't modify spaces unless you redefine them
def reset(self, seed=None):
obs, info = self.env.reset(seed=seed)
return self._process_observation(obs), info
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
obs = self._process_observation(obs)
reward = self._process_reward(reward)
return obs, reward, terminated, truncated, info
def _process_observation(self, obs):
return obs
def _process_reward(self, reward):
return reward
Common Built-in Wrappers
TimeLimit: Add episode time limit
env = gym.make("CartPole-v1")
env = gym.wrappers.TimeLimit(env, max_episode_steps=500)
# Now truncated=True after 500 steps
NormalizeObservation: Normalize observations to [-1, 1]
env = gym.wrappers.NormalizeObservation(env)
# Observations normalized using running mean/std
RecordVideo: Save episode videos
env = gym.wrappers.RecordVideo(
env,
video_folder="videos/",
episode_trigger=lambda ep: ep % 10 == 0
)
ClipAction: Clip actions to action space bounds
env = gym.wrappers.ClipAction(env)
# Actions automatically clipped to [-1, 1] or similar
Custom Wrapper Example: Scale Rewards
class ScaleRewardWrapper(gym.Wrapper):
"""Scale rewards by a constant factor"""
def __init__(self, env, scale=0.1):
super().__init__(env)
self.scale = scale
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
return obs, reward * self.scale, terminated, truncated, info
Custom Wrapper Example: Frame Stacking
class FrameStackWrapper(gym.Wrapper):
"""Stack last 4 frames for temporal information"""
def __init__(self, env, num_frames=4):
super().__init__(env)
self.num_frames = num_frames
self.frame_buffer = collections.deque(maxlen=num_frames)
# Modify observation space to include stacking
old_space = env.observation_space
self.observation_space = gym.spaces.Box(
low=old_space.low.min(),
high=old_space.high.max(),
shape=(old_space.shape[0], old_space.shape[1],
old_space.shape[2] * num_frames),
dtype=old_space.dtype
)
def reset(self, seed=None):
obs, info = self.env.reset(seed=seed)
self.frame_buffer.clear()
for _ in range(self.num_frames):
self.frame_buffer.append(obs)
return self._get_stacked_obs(), info
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
self.frame_buffer.append(obs)
return self._get_stacked_obs(), reward, terminated, truncated, info
def _get_stacked_obs(self):
# Stack frames along channel dimension
return np.concatenate(list(self.frame_buffer), axis=2)
Wrapper Chaining
# Correct: Chain wrappers for composable functionality
env = gym.make("Atari2600-v0")
env = gym.wrappers.TimeLimit(env, max_episode_steps=4500)
env = gym.wrappers.ClipAction(env)
env = FrameStackWrapper(env, num_frames=4)
env = gym.wrappers.NormalizeObservation(env)
# Order matters: think about data flow
# raw env -> ClipAction -> FrameStack -> NormalizeObservation
Part 5: Vectorized Environments
Types of Vectorized Environments
DummyVectorEnv: Serial execution (simple, slowest)
from gymnasium.vector import DummyVectorEnv
# Create 4 independent environments (serial)
envs = DummyVectorEnv([
lambda: gym.make("CartPole-v1")
for i in range(4)
])
obs, info = envs.reset() # obs shape: (4, 4)
actions = np.array([0, 1, 1, 0]) # 4 actions
obs, rewards, terminateds, truncateds, info = envs.step(actions)
# rewards shape: (4,)
SyncVectorEnv: Synchronized parallel (fast, moderate complexity)
from gymnasium.vector import SyncVectorEnv
# Create 8 parallel environments (all step together)
envs = SyncVectorEnv([
lambda: gym.make("CartPole-v1")
for i in range(8)
])
obs, info = envs.reset()
# All 8 envs step synchronously
obs, rewards, terminateds, truncateds, info = envs.step(actions)
AsyncVectorEnv: Asynchronous parallel (fastest, most complex)
from gymnasium.vector import AsyncVectorEnv
# Create 16 parallel environments (independent processes)
envs = AsyncVectorEnv([
lambda: gym.make("CartPole-v1")
for i in range(16)
])
# Same API as SyncVectorEnv but faster
obs, info = envs.reset()
obs, rewards, terminateds, truncateds, info = envs.step(actions)
envs.close() # IMPORTANT: Close async envs to cleanup processes
Comparison and Decision Tree
| Feature | Dummy | Sync | Async |
|---|---|---|---|
| Speed | Slow | Fast | Fastest |
| CPU cores | 1 | 1 (+ GIL) | N |
| Memory | Low | Moderate | High |
| Complexity | Simple | Medium | Complex |
| Debugging | Easy | Medium | Hard |
| Best for | Testing | Training | Large-scale training |
When to use each:
num_envs = 32
if num_envs <= 1:
# Single environment
env = gym.make("CartPole-v1")
elif num_envs <= 4:
# Few environments: use Dummy for simplicity
env = DummyVectorEnv([gym.make("CartPole-v1") for _ in range(num_envs)])
elif num_envs <= 8:
# Medium: use Sync for speed without complexity
env = SyncVectorEnv([gym.make("CartPole-v1") for _ in range(num_envs)])
else:
# Many: use Async for maximum speed
env = AsyncVectorEnv([gym.make("CartPole-v1") for _ in range(num_envs)])
Common Vectorized Environment Bugs
Bug 1: Forgetting to close AsyncVectorEnv
# WRONG: Processes leak
envs = AsyncVectorEnv([...] for _ in range(16))
# ... training ...
# Forgot to close! Processes stay alive, memory leaks
# CORRECT: Always close
try:
envs = AsyncVectorEnv([...] for _ in range(16))
# ... training ...
finally:
envs.close() # Cleanup
# Or use context manager
from contextlib import contextmanager
@contextmanager
def make_async_envs(num_envs):
envs = AsyncVectorEnv([...] for _ in range(num_envs))
try:
yield envs
finally:
envs.close()
Bug 2: Non-parallel-safe environment
# WRONG: Environment uses shared state, breaks with AsyncVectorEnv
class NonParallelEnv(gym.Env):
global_counter = 0 # SHARED STATE!
def step(self, action):
self.global_counter += 1 # Race condition with async!
...
# CORRECT: No shared state
class ParallelSafeEnv(gym.Env):
def __init__(self):
self.counter = 0 # Instance variable, not shared
def step(self, action):
self.counter += 1 # Safe in parallel
...
Bug 3: Handling auto-reset in vectorized envs
# When an episode terminates in vectorized env, it auto-resets
obs, rewards, terminateds, truncateds, info = envs.step(actions)
# If terminateds[i] is True, envs[i] has been auto-reset
# The obs[i] is the NEW initial observation from the reset
# NOT the final observation of the episode
# To get final observation before reset:
obs, rewards, terminateds, truncateds, info = envs.step(actions)
final_obs = info['final_observation'] # Original terminal obs
reset_obs = obs # New obs from auto-reset
Part 6: Common Environment Bugs and Fixes
Bug 1: Reward Scale Too Large
Symptom: Training unstable, losses spike, agent behavior random
# WRONG: Reward in range [0, 1000]
def step(self, action):
reward = self.goal_distance * 1000 # Can be up to 1000!
return obs, reward, done, truncated, info
# Problem: Gradients huge -> param updates too large -> training breaks
# CORRECT: Reward in [-1, 1]
def step(self, action):
reward = self.goal_distance # Range [0, 1]
reward = reward - 0.5 # Scale to [-0.5, 0.5]
return obs, reward, done, truncated, info
# Or normalize post-hoc
reward = np.clip(reward / 1000, -1, 1)
Bug 2: Action Not Applied Correctly
Symptom: Agent learns but behavior doesn't match reward signal
# WRONG: Action read but not used
def step(self, action):
obs = self._get_next_obs() # Doesn't use action!
reward = 1.0 # Reward independent of action
return obs, reward, False, False, {}
# CORRECT: Action determines next state
def step(self, action):
self._apply_action_to_physics(action)
obs = self._get_next_obs()
reward = self._compute_reward(action)
return obs, reward, False, False, {}
Bug 3: Missing Terminal State Flag
Symptom: Episodes don't end properly, agent never learns boundaries
# WRONG: Always done=False
def step(self, action):
...
return obs, reward, False, False, {} # Episode never ends!
# CORRECT: Set terminated when episode should end
def step(self, action):
...
terminated = self._check_done_condition()
if terminated:
reward += 100 # Bonus for reaching goal
return obs, reward, terminated, False, {}
# Also differentiate from truncation
def step(self, action):
...
self.steps += 1
terminated = self._reached_goal() # Success condition
truncated = self.steps >= self.max_steps # Time limit
return obs, reward, terminated, truncated, {}
Bug 4: Observation/Space Mismatch
Symptom: Training crashes or behaves oddly after environment change
# WRONG: Space and observation don't match
def __init__(self):
self.observation_space = gym.spaces.Box(0, 1, (4,), dtype=np.float32)
def step(self, action):
obs = np.random.randint(0, 256, (4,), dtype=np.uint8) # uint8!
return obs, reward, done, truncated, {} # Mismatch!
# CORRECT: Match dtype and range
def __init__(self):
self.observation_space = gym.spaces.Box(0, 255, (4,), dtype=np.uint8)
def step(self, action):
obs = np.random.randint(0, 256, (4,), dtype=np.uint8) # Matches!
assert self.observation_space.contains(obs)
return obs, reward, done, truncated, {}
Bug 5: Reset Not Initializing State
Symptom: First episode works, subsequent episodes fail
# WRONG: Reset doesn't actually reset
def reset(self, seed=None):
super().reset(seed=seed)
# Forgot to initialize state!
return self.state, {} # self.state is stale from last episode
# CORRECT: Reset initializes everything
def reset(self, seed=None):
super().reset(seed=seed)
self.state = self._initialize_state()
self.steps = 0
return self.state, {}
Bug 6: Non-Deterministic Environment Without Proper Seeding
Symptom: Same reset produces different initial states, breaks reproducibility
# WRONG: Randomness not seeded
def reset(self, seed=None):
super().reset(seed=seed)
self.state = np.random.randn(4) # Uses default RNG, ignores seed!
return self.state, {}
# CORRECT: Use self.np_random which respects seed
def reset(self, seed=None):
super().reset(seed=seed)
# self.np_random is seeded by super().reset()
self.state = self.np_random.randn(4)
return self.state, {}
Bug 7: Info Dict Contains Non-Serializable Objects
Symptom: Episode fails when saving/loading with replay buffers
# WRONG: Info dict contains unpicklable objects
def step(self, action):
info = {
'env': self, # Can't pickle!
'callback': self.callback_fn, # Can't pickle!
}
return obs, reward, done, truncated, info
# CORRECT: Only basic types in info dict
def step(self, action):
info = {
'level': self.level,
'score': self.score,
'x_position': float(self.x),
}
return obs, reward, done, truncated, info
Bug 8: Action Space Not Enforced
Symptom: Agent takes actions outside valid range, causes crashes
# WRONG: Action space defined but not enforced
def __init__(self):
self.action_space = gym.spaces.Box(-1, 1, (3,))
def step(self, action):
# action could be [10, 10, 10] and we don't catch it!
velocity = action * 10 # Huge velocity!
...
# CORRECT: Clip or validate actions
def step(self, action):
assert self.action_space.contains(action), \
f"Invalid action {action}"
# Or clip to bounds
action = np.clip(action,
self.action_space.low,
self.action_space.high)
...
Bug 9: Observation Normalization Not Applied
Symptom: Training unstable when observations are in [0, 255] instead of [0, 1]
# WRONG: Large observation range breaks training
def step(self, action):
obs = self.render_to_image() # Range [0, 255]
return obs, reward, done, truncated, {}
# CORRECT: Normalize observations
def step(self, action):
obs = self.render_to_image() # Range [0, 255]
obs = obs.astype(np.float32) / 255.0 # Normalize to [0, 1]
return obs, reward, done, truncated, {}
# Or use NormalizeObservation wrapper
env = NormalizeObservation(env)
Bug 10: Forgetting to Return Info Dict
Symptom: Step returns wrong number of values, crashes agent training loop
# WRONG: Step returns 4 values (old Gym API)
def step(self, action):
return obs, reward, done, info # WRONG!
# CORRECT: Step returns 5 values (Gymnasium API)
def step(self, action):
return obs, reward, terminated, truncated, info
# Or use try-except during migration
try:
obs, reward, terminated, truncated, info = env.step(action)
except ValueError:
obs, reward, done, info = env.step(action)
terminated = done
truncated = False
Part 7: Environment Testing Checklist
Before training an RL agent on a custom environment, validate:
Pre-Training Validation Checklist
class EnvironmentValidator:
"""Validate custom environment before training"""
def validate_all(self, env):
"""Run all validation tests"""
print("Validating environment...")
# 1. Spaces are valid
self.validate_spaces(env)
print("✓ Spaces valid")
# 2. Reset works
obs, info = self.validate_reset(env)
print("✓ Reset works")
# 3. Step works and returns correct format
self.validate_step(env, obs)
print("✓ Step works")
# 4. Observations are valid
self.validate_observations(env, obs)
print("✓ Observations valid")
# 5. Actions are enforced
self.validate_actions(env)
print("✓ Actions enforced")
# 6. Terminal states work
self.validate_termination(env)
print("✓ Termination works")
# 7. Environment is reproducible
self.validate_reproducibility(env)
print("✓ Reproducibility verified")
# 8. Random agent can run
self.validate_random_agent(env)
print("✓ Random agent runs")
print("\nEnvironment validation PASSED!")
def validate_spaces(self, env):
"""Check spaces are defined"""
assert hasattr(env, 'action_space'), "No action_space"
assert hasattr(env, 'observation_space'), "No observation_space"
assert isinstance(env.action_space, gym.spaces.Space)
assert isinstance(env.observation_space, gym.spaces.Space)
def validate_reset(self, env):
"""Check reset returns (obs, info)"""
result = env.reset()
assert isinstance(result, tuple) and len(result) == 2, \
f"Reset should return (obs, info), got {result}"
obs, info = result
assert isinstance(info, dict), "Info should be dict"
return obs, info
def validate_step(self, env, obs):
"""Check step returns 5-tuple"""
action = env.action_space.sample()
result = env.step(action)
assert isinstance(result, tuple) and len(result) == 5, \
f"Step should return 5-tuple, got {len(result)}"
obs, reward, terminated, truncated, info = result
assert isinstance(reward, (int, float)), "Reward must be number"
assert isinstance(terminated, (bool, np.bool_)), "terminated must be bool"
assert isinstance(truncated, (bool, np.bool_)), "truncated must be bool"
assert isinstance(info, dict), "Info must be dict"
def validate_observations(self, env, obs):
"""Check observations match space"""
assert env.observation_space.contains(obs), \
f"Observation {obs.shape} not in space {env.observation_space}"
def validate_actions(self, env):
"""Check invalid actions fail"""
if isinstance(env.action_space, gym.spaces.Discrete):
invalid_action = env.action_space.n + 10
assert not env.action_space.contains(invalid_action)
def validate_termination(self, env):
"""Check episodes can terminate"""
obs, _ = env.reset()
for _ in range(1000):
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
if terminated or truncated:
break
assert terminated or truncated, \
"Episode never terminated in 1000 steps!"
def validate_reproducibility(self, env):
"""Check reset with seed is reproducible"""
obs1, _ = env.reset(seed=42)
obs2, _ = env.reset(seed=42)
assert np.allclose(obs1, obs2), "Reset not reproducible!"
def validate_random_agent(self, env):
"""Check environment works with random actions"""
obs, _ = env.reset()
total_reward = 0
for _ in range(100):
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
total_reward += reward
if terminated or truncated:
break
assert total_reward is not None, "No reward computed!"
# Usage
validator = EnvironmentValidator()
validator.validate_all(env)
Manual Testing
Before training, play with the environment manually:
# Manual environment exploration
env = GridWorldEnv()
obs, _ = env.reset()
while True:
action = int(input("Action (0=up, 1=right, 2=down, 3=left): "))
obs, reward, terminated, truncated, info = env.step(action)
print(f"Position: {obs}, Reward: {reward}, Done: {terminated}")
if terminated or truncated:
obs, _ = env.reset()
print("Episode reset")
Part 8: Red Flags and Anti-Patterns
Red Flag 1: Reward Scale Issue
# RED FLAG: Rewards in [0, 1000000]
reward = distance_to_goal * 1000000 # HUGE!
# Solution: Scale to [-1, 1]
reward = -distance_to_goal / max_distance
assert -1 <= reward <= 1
Red Flag 2: Observation Type Mismatch
# RED FLAG: Observation dtype doesn't match space
observation_space = Box(0, 255, (84, 84, 3), dtype=np.uint8)
obs = np.random.random((84, 84, 3)).astype(np.float32) # MISMATCH!
# Solution: Match dtype exactly
obs = (obs * 255).astype(np.uint8)
Red Flag 3: Missing Done Flag
# RED FLAG: Episodes never end
def step(self, action):
return obs, reward, False, False, {} # Always False!
# Solution: Implement termination logic
terminated = self.check_goal_reached() or self.check_failure()
Red Flag 4: Action Bounds Not Enforced
# RED FLAG: Network outputs unconstrained
def step(self, action): # action could be [1000, -1000]
velocity = action # HUGE velocity!
# Solution: Clip or validate
action = np.clip(action,
self.action_space.low,
self.action_space.high)
Red Flag 5: Vectorized Environment Auto-Reset Confusion
# RED FLAG: Treating auto-reset obs as terminal obs
obs, rewards, terminateds, truncateds, info = envs.step(actions)
# obs contains NEW reset observations, not final observations!
# Solution: Use info['final_observation']
final_obs = info['final_observation']
Red Flag 6: Non-Parallel-Safe Shared State
# RED FLAG: Shared state breaks AsyncVectorEnv
class Env(gym.Env):
global_counter = 0 # SHARED!
def step(self, action):
Env.global_counter += 1 # Race condition!
# Solution: Instance variables only
def __init__(self):
self.counter = 0 # Instance-specific
Red Flag 7: Info Dict with Unpicklable Objects
# RED FLAG: Can't serialize for replay buffer
info = {
'env': self,
'callback': self.fn,
}
# Solution: Only basic types
info = {
'level': 5,
'score': 100,
}
Red Flag 8: Forgetting to Close AsyncVectorEnv
# RED FLAG: Process leak
envs = AsyncVectorEnv([...])
# ... forgot env.close()
# Solution: Always close
envs.close() # or use try/finally
Part 9: Rationalization Resistance
Common Wrong Beliefs About Environments:
Claim 1: "My custom environment should just work without testing"
- Reality: 80% of RL failures are environment bugs. Test before training.
- Evidence: Standard validation checklist catches bugs 95% of the time
Claim 2: "Reward scaling doesn't matter, only matters for learning rate"
- Reality: Reward scale affects gradient magnitudes directly. Too large = instability.
- Evidence: Scaling reward by 100x often breaks training even with correct learning rate
Claim 3: "Wrappers are optional complexity I don't need"
- Reality: Wrappers enforce separation of concerns. Without them, environments become unmaintainable.
- Evidence: Real RL code uses 3-5 wrappers (TimeLimit, Normalize, ClipAction, etc)
Claim 4: "Vectorized environments are always faster"
- Reality: Parallelization overhead for small envs can make them slower.
- Evidence: For < 4 envs, DummyVectorEnv is faster than AsyncVectorEnv
Claim 5: "My environment is correct if the agent learns something"
- Reality: Agent can learn to game a broken reward signal.
- Evidence: Agent learning ≠ environment correctness. Run tests.
Claim 6: "AsyncVectorEnv doesn't need explicit close()"
- Reality: Processes leak if not closed, draining system resources.
- Evidence: Unmanaged AsyncVectorEnv with 16+ processes brings systems to halt
Claim 7: "Observation normalization breaks training"
- Reality: Unnormalized large observations (like [0, 255]) break training.
- Evidence: Normalizing [0, 255] images to [0, 1] is standard practice
Claim 8: "I don't need to validate action space enforcement"
- Reality: Network outputs can violate bounds, causing physics errors.
- Evidence: Unclipped continuous actions often cause simulation failures
Part 10: Pressure Test Scenarios
Scenario 1: Custom Environment Debugging
# Subagent challenge WITHOUT skill:
# "I built a custom CartPole variant. Training fails silently
# (agent doesn't learn). The environment seems fine when I test it.
# Where do I start debugging?"
# Expected WITH skill:
# 1. Validate observation space matches actual observations
# 2. Validate action space bounds are enforced
# 3. Check reward scale is in [-1, 1]
# 4. Verify reset/step API is correct (Gym vs Gymnasium)
# 5. Run environment validator checklist
# 6. Manual play-test to check physics
# 7. Verify terminal state logic
Scenario 2: Wrapper Composition
# Challenge: Build a correct wrapper stack
# env = gym.make("CartPole-v1")
# env = TimeLimit(env, 500) # Add time limit
# env = NormalizeObservation(env) # Normalize
# Should be safe to use with any policy training
# WITHOUT skill: Guess order, wrong wrapping
# WITH skill: Know correct order, understand composition
Scenario 3: Vectorization Decision
# Challenge: "I need to train on 32 parallel CartPoles.
# Which vectorized environment type is best?"
# WITHOUT skill: Try all three, pick whichever runs
# WITH skill: Analyze trade-offs
# - 32 envs -> AsyncVectorEnv
# - Memory acceptable? -> Yes
# - Debugging needed? -> No -> Use Async
Scenario 4: Space Mismatch Detection
# Challenge: Environment crashes during training with cryptic error.
# Observation is (84, 84, 3) uint8 but CNN expects float32 in [0, 1]
# WITHOUT skill: Spend hours debugging network
# WITH skill: Immediately suspect observation/space mismatch
# Run validator, find dtype mismatch, fix preprocessing
Part 11: Advanced Patterns - Multi-Agent Environments
Multi-Agent Observation Spaces
Scenario: Multi-agent game with individual agent observations
class MultiAgentGridWorld(gym.Env):
"""2-agent cooperative environment"""
def __init__(self, num_agents=2):
self.num_agents = num_agents
# Each agent has its own action space
self.action_space = gym.spaces.MultiDiscrete([4] * num_agents)
# Each agent observes its own position + other agents' positions
# Dict space allows per-agent observations
self.observation_space = gym.spaces.Dict({
f'agent_{i}': gym.spaces.Box(0, 4, (2 * num_agents,), dtype=np.int32)
for i in range(num_agents)
})
self.agents = [np.array([i, 0], dtype=np.int32) for i in range(num_agents)]
self.goal = np.array([4, 4], dtype=np.int32)
def reset(self, seed=None):
super().reset(seed=seed)
self.agents = [np.array([i, 0], dtype=np.int32) for i in range(self.num_agents)]
obs = {}
for i in range(self.num_agents):
agent_obs = np.concatenate([agent.copy() for agent in self.agents])
obs[f'agent_{i}'] = agent_obs.astype(np.int32)
return obs, {}
def step(self, actions):
"""actions is array of length num_agents"""
moves = [
np.array([0, 1], dtype=np.int32),
np.array([1, 0], dtype=np.int32),
np.array([0, -1], dtype=np.int32),
np.array([-1, 0], dtype=np.int32),
]
# Apply each agent's action
for i, action in enumerate(actions):
self.agents[i] += moves[action]
self.agents[i] = np.clip(self.agents[i], 0, 4)
# Shared reward: both agents get reward for reaching goal
distances = [np.linalg.norm(agent - self.goal) for agent in self.agents]
reward = sum(1.0 / (1.0 + d) for d in distances)
# Both must reach goal
terminated = all(np.array_equal(agent, self.goal) for agent in self.agents)
# Construct observation for each agent
obs = {}
for i in range(self.num_agents):
agent_obs = np.concatenate([agent.copy() for agent in self.agents])
obs[f'agent_{i}'] = agent_obs.astype(np.int32)
truncated = False
return obs, reward, terminated, truncated, {}
Key Multi-Agent Patterns
# Pattern 1: Separate rewards per agent
rewards = {
f'agent_{i}': compute_reward_for_agent(i)
for i in range(num_agents)
}
# Pattern 2: Shared team reward
team_reward = sum(individual_rewards) / num_agents
# Pattern 3: Mixed observations (shared + individual)
obs = {
f'agent_{i}': {
'own_state': agent_states[i],
'other_positions': [s for j, s in enumerate(agent_states) if j != i],
'global_state': shared_state,
}
for i in range(num_agents)
}
# Pattern 4: Synchronized reset for coordinated behavior
def reset(self, seed=None):
super().reset(seed=seed)
# All agents reset to coordinated starting positions
self.agents = initialize_team_formation()
Part 12: Integration with Training Loops
Proper Environment Integration
class TrainingLoop:
"""Shows correct environment integration pattern"""
def __init__(self, env, num_parallel=4):
self.env = self._setup_environment(env, num_parallel)
self.policy = build_policy()
def _setup_environment(self, env, num_parallel):
"""Proper environment setup"""
if num_parallel == 1:
env = gym.make(env)
elif num_parallel <= 4:
env = DummyVectorEnv([lambda: gym.make(env) for _ in range(num_parallel)])
else:
env = SyncVectorEnv([lambda: gym.make(env) for _ in range(num_parallel)])
# Add standard wrappers
env = gym.wrappers.TimeLimit(env, max_episode_steps=1000)
env = NormalizeObservation(env)
return env
def train_one_episode(self):
"""Correct training loop"""
obs, info = self.env.reset()
total_reward = 0
steps = 0
while True:
# Get action from policy
action = self.policy.get_action(obs)
# CRITICAL: Validate action is in space
assert self.env.action_space.contains(action)
# Step environment
obs, reward, terminated, truncated, info = self.env.step(action)
# CRITICAL: Handle auto-reset in vectorized case
if 'final_observation' in info:
final_obs = info['final_observation']
# Store final obs in replay buffer, not reset obs
else:
final_obs = obs
# Store experience
self.store_experience(obs, reward, terminated, truncated, info)
total_reward += np.mean(reward) if isinstance(reward, np.ndarray) else reward
steps += 1
# Check termination
if np.any(terminated) or np.any(truncated):
break
return total_reward / steps
def store_experience(self, obs, reward, terminated, truncated, info):
"""Correct experience storage"""
# Handle vectorized case (obs, reward are arrays)
if isinstance(reward, np.ndarray):
for i in range(len(reward)):
self.replay_buffer.add(
obs=obs[i] if isinstance(obs, np.ndarray) else obs,
action=None, # Set before storing
reward=reward[i],
done=terminated[i] or truncated[i],
next_obs=obs[i] if isinstance(obs, np.ndarray) else obs,
)
Common Integration Mistakes
Mistake 1: Not closing AsyncVectorEnv
# WRONG: Process leak
envs = AsyncVectorEnv([...] for _ in range(16))
for episode in range(1000):
obs, _ = envs.reset()
# ... training ...
# Processes never cleaned up
# CORRECT: Always cleanup
try:
envs = AsyncVectorEnv([...] for _ in range(16))
for episode in range(1000):
obs, _ = envs.reset()
# ... training ...
finally:
envs.close()
Mistake 2: Using wrong observation after auto-reset
# WRONG: Mixing terminal and reset observations
obs, reward, terminated, truncated, info = envs.step(actions)
# obs is reset observation, but we treat it as terminal!
store_in_replay_buffer(obs, reward, terminated)
# CORRECT: Use final_observation for training
final_obs = info.get('final_observation', obs)
if np.any(terminated):
store_in_replay_buffer(final_obs, reward, terminated)
else:
next_obs = obs
Mistake 3: Not validating agent actions
# WRONG: Trust agent always outputs valid action
action = policy(obs)
obs, reward, terminated, truncated, info = env.step(action)
# CORRECT: Validate before stepping
action = policy(obs)
action = np.clip(action, env.action_space.low, env.action_space.high)
assert env.action_space.contains(action)
obs, reward, terminated, truncated, info = env.step(action)
Part 13: Performance Optimization
Observation Preprocessing Performance
class OptimizedObservationPreprocessing:
"""Efficient observation handling"""
def __init__(self, env):
self.env = env
def preprocess_observation(self, obs):
"""Optimized preprocessing"""
# Avoid unnecessary copies
if obs.dtype == np.uint8:
# In-place division for efficiency
obs = obs.astype(np.float32) / 255.0
else:
obs = obs / 255.0
# Use memmap for large observations
if obs.nbytes > 1_000_000: # > 1MB
# Consider using memory-mapped arrays
pass
return obs
def batch_preprocess(self, obs_batch):
"""Batch processing for vectorized envs"""
# Vectorized preprocessing is faster than per-obs
if isinstance(obs_batch, np.ndarray) and obs_batch.ndim == 4:
# (batch_size, H, W, C) image batch
obs_batch = obs_batch.astype(np.float32) / 255.0
return obs_batch
Vectorization Performance Tips
# Benchmark: When does parallelization help?
# For CartPole (fast env):
# - 1 env: 10k steps/sec on 1 core
# - 4 Dummy: 9k steps/sec (overhead)
# - 4 Sync: 15k steps/sec (parallelism helps)
# - 4 Async: 12k steps/sec (context switch overhead)
# For Atari (slow env):
# - 1 env: 0.5k steps/sec on 1 core
# - 16 Dummy: 7k steps/sec (overhead worth it)
# - 16 Sync: 15k steps/sec (GIL limits)
# - 16 Async: 25k steps/sec (parallelism dominates)
# Rule of thumb:
# - env_step_time < 1ms: parallelization overhead dominates, use Dummy
# - env_step_time 1-10ms: parallelization helps, use Sync
# - env_step_time > 10ms: parallelization essential, use Async
Part 14: Debugging Environment Issues Systematically
Diagnostic Checklist for Broken Training
class EnvironmentDebugger:
"""Systematic environment debugging"""
def full_diagnosis(self, env, policy):
"""Complete environment diagnostic"""
print("=== Environment Diagnostic ===")
# 1. Check environment API
self.check_api(env)
print("✓ API correct")
# 2. Check spaces
self.check_spaces(env)
print("✓ Spaces valid")
# 3. Check reset/step mechanics
self.check_mechanics(env)
print("✓ Reset/step mechanics correct")
# 4. Check observation statistics
obs_stats = self.analyze_observations(env)
print(f"✓ Observations: mean={obs_stats['mean']:.3f}, std={obs_stats['std']:.3f}")
# 5. Check reward statistics
reward_stats = self.analyze_rewards(env)
print(f"✓ Rewards: mean={reward_stats['mean']:.3f}, std={reward_stats['std']:.3f}")
if abs(reward_stats['mean']) > 1 or reward_stats['std'] > 1:
print(" WARNING: Reward scale may be too large")
# 6. Check episode lengths
lengths = self.analyze_episode_lengths(env)
print(f"✓ Episode lengths: mean={lengths['mean']:.1f}, min={lengths['min']}, max={lengths['max']}")
# 7. Check reproducibility
self.check_reproducibility(env)
print("✓ Reproducibility verified")
# 8. Check with policy
self.check_policy_integration(env, policy)
print("✓ Policy integration works")
def analyze_observations(self, env, num_episodes=10):
"""Analyze observation distribution"""
obs_list = []
for _ in range(num_episodes):
obs, _ = env.reset()
for _ in range(100):
action = env.action_space.sample()
obs, _, terminated, truncated, _ = env.step(action)
obs_list.append(obs.flatten())
if terminated or truncated:
break
obs_array = np.concatenate(obs_list)
return {
'mean': np.mean(obs_array),
'std': np.std(obs_array),
'min': np.min(obs_array),
'max': np.max(obs_array),
}
def analyze_rewards(self, env, num_episodes=10):
"""Analyze reward distribution"""
rewards = []
for _ in range(num_episodes):
obs, _ = env.reset()
for _ in range(100):
action = env.action_space.sample()
obs, reward, terminated, truncated, _ = env.step(action)
rewards.append(reward)
if terminated or truncated:
break
rewards = np.array(rewards)
return {
'mean': np.mean(rewards),
'std': np.std(rewards),
'min': np.min(rewards),
'max': np.max(rewards),
}
def analyze_episode_lengths(self, env, num_episodes=20):
"""Analyze episode length distribution"""
lengths = []
for _ in range(num_episodes):
obs, _ = env.reset()
steps = 0
for step in range(10000): # Max steps
action = env.action_space.sample()
obs, reward, terminated, truncated, _ = env.step(action)
steps += 1
if terminated or truncated:
break
lengths.append(steps)
lengths = np.array(lengths)
return {
'mean': np.mean(lengths),
'min': int(np.min(lengths)),
'max': int(np.max(lengths)),
'median': int(np.median(lengths)),
}
Summary: When to Invoke This Skill
Use rl-environments skill when:
- Creating custom environments from scratch
- Debugging environment-related training failures
- Implementing observation/action spaces
- Using or creating wrappers
- Parallelizing environments
- Testing environments before training
- Handling Gym vs Gymnasium differences
- Migrating environment code between versions
- Building multi-agent or multi-component environments
- Enforcing action/observation bounds correctly
- Optimizing environment performance
- Debugging training failures systematically
This skill prevents:
- 80% of RL bugs (environment issues)
- Silent training failures from broken environments
- Vectorization-related data corruption
- Observation/action space mismatches
- Reward scaling instabilities
- Terminal state logic errors
- Reproducibility issues from poor seeding
- Performance degradation from inefficient environments
- Multi-agent coordination failures
- Integration issues with training loops