Claude Code Plugins

Community-maintained marketplace

Feedback
1
0

Gym/gymnasium API - custom environments, spaces, wrappers, vectorization, debugging

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name rl-environments
description Gym/gymnasium API - custom environments, spaces, wrappers, vectorization, debugging

RL Environments: Building and Debugging Custom Environments

When to Use This Skill

Invoke this skill when you need to:

  • Create Custom Environments: Build a new environment from scratch using Gym/gymnasium
  • Define Observation/Action Spaces: Design Box, Discrete, Dict, Tuple spaces correctly
  • Use Environment Wrappers: Add preprocessing, modify rewards, implement time limits
  • Parallelize Environments: Choose between DummyVectorEnv, SyncVectorEnv, AsyncVectorEnv
  • Debug Environment Bugs: Diagnose reset/step issues, reward scaling, space mismatches
  • Test Environments: Validate environments before training agents
  • Handle API Differences: Migrate between Gym versions or Gym vs gymnasium
  • Implement Complex State: Manage multi-component observations and state systems
  • Enforce Action Bounds: Properly clip or scale actions
  • Catch Common Pitfalls: Avoid 10+ common environment implementation mistakes

Core Problem: Environments are the foundation of RL training. Broken environments cause 80% of RL failures, but environment bugs are often missed because they don't error—they silently break training. This skill systematically teaches correct environment design and provides a debugging methodology.

Do NOT Use This Skill For

  • Algorithm implementation (route to specific algorithm skills like value-based-methods, policy-gradient-methods, actor-critic-methods)
  • Reward design and shaping (route to reward-shaping-engineering for reward function engineering and potential-based shaping)
  • RL theory and foundations (route to rl-foundations for MDPs, Bellman equations, value functions)
  • Training debugging beyond environment issues (route to rl-debugging for systematic diagnosis of training failures)
  • Exploration strategy selection (route to exploration-strategies for ε-greedy, curiosity-driven, RND methods)

Part 1: Understanding the Gym/Gymnasium API

The Standard Interface

Every Gym/Gymnasium environment implements:

import gymnasium as gym  # or 'gym' for older versions

class CustomEnv(gym.Env):
    """Template for all custom environments"""

    def __init__(self):
        # Define action and observation spaces
        self.action_space = gym.spaces.Discrete(4)  # 4 possible actions
        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=(84, 84, 3), dtype=np.uint8
        )

    def reset(self, seed=None):
        """Reset environment to initial state

        Returns:
            observation (np.ndarray): Initial observation
            info (dict): Auxiliary info (can be empty dict)
        """
        super().reset(seed=seed)
        obs = self._get_initial_observation()
        info = {}
        return obs, info

    def step(self, action):
        """Take one action in the environment

        Args:
            action: Action from action_space

        Returns:
            observation (np.ndarray): Current observation after action
            reward (float): Reward for this step
            terminated (bool): True if episode ended (goal/failure)
            truncated (bool): True if episode cut off (time limit)
            info (dict): Auxiliary info
        """
        obs = self._apply_action(action)
        reward = self._compute_reward()
        terminated = self._is_done()
        truncated = False  # Set by TimeLimit wrapper usually
        info = {}
        return obs, reward, terminated, truncated, info

    def render(self, mode='human'):
        """Visualize the environment (optional)"""
        pass

    def close(self):
        """Cleanup resources (optional)"""
        pass

Key API Points

1. Reset Format (Gymnasium API)

# CORRECT: Reset returns (observation, info)
observation, info = env.reset()

# WRONG: Old Gym API returned just observation
observation = env.reset()  # This is Gym, not Gymnasium

2. Step Format (Gymnasium API)

# CORRECT: Step returns (obs, reward, terminated, truncated, info)
obs, reward, terminated, truncated, info = env.step(action)

# WRONG: Old Gym API
obs, reward, done, info = env.step(action)  # 'done' is single boolean

3. Gym vs Gymnasium

Feature Gym (OpenAI) Gymnasium (Maintained)
Reset return obs (obs, info)
Step return (obs, r, done, info) (obs, r, terminated, truncated, info)
Render env.render(mode='human') env.render(); mode set at init
Import import gym import gymnasium as gym
Support Deprecated Current standard

Decision: Use gymnasium for new code. If stuck with older code:

# Compatibility wrapper
try:
    import gymnasium as gym
except ImportError:
    import gym

Part 2: Observation and Action Space Design

Space Types

Discrete Space (for discrete actions or observations)

# 4 possible actions: 0, 1, 2, 3
action_space = gym.spaces.Discrete(4)

# 5 possible discrete states
observation_space = gym.spaces.Discrete(5)

# With start parameter
action_space = gym.spaces.Discrete(4, start=1)  # 1, 2, 3, 4

Box Space (for continuous or image data)

# Continuous control: 3D position, each in [-1, 1]
action_space = gym.spaces.Box(
    low=-1.0,
    high=1.0,
    shape=(3,),
    dtype=np.float32
)

# Image observation: 84x84 RGB, pixels 0-255
observation_space = gym.spaces.Box(
    low=0,
    high=255,
    shape=(84, 84, 3),
    dtype=np.uint8
)

# Multi-component continuous: 2D position + 1D velocity
observation_space = gym.spaces.Box(
    low=np.array([-1.0, -1.0, -10.0]),
    high=np.array([1.0, 1.0, 10.0]),
    dtype=np.float32
)

Dict Space (for structured observations with multiple components)

# Multi-component observation: image + state vector
observation_space = gym.spaces.Dict({
    'image': gym.spaces.Box(0, 255, (84, 84, 3), dtype=np.uint8),
    'position': gym.spaces.Box(-1, 1, (2,), dtype=np.float32),
})

# Access in reset/step:
obs = {
    'image': np.random.randint(0, 256, (84, 84, 3), dtype=np.uint8),
    'position': np.array([0.5, -0.3], dtype=np.float32),
}

Tuple Space (for ordered multiple components)

observation_space = gym.spaces.Tuple((
    gym.spaces.Box(-1, 1, (2,), dtype=np.float32),  # Position
    gym.spaces.Discrete(4),  # Direction
))

# Access:
obs = (np.array([0.5, -0.3], dtype=np.float32), 2)

MultiDiscrete (for multiple discrete action dimensions)

# Game with 4 actions per agent, 3 agents
action_space = gym.spaces.MultiDiscrete([4, 4, 4])

# Or asymmetric
action_space = gym.spaces.MultiDiscrete([3, 4, 5])  # Different choices per dimension

Space Validation Patterns

Always validate that observations match the space:

def reset(self, seed=None):
    super().reset(seed=seed)
    obs = self._get_observation()

    # CRITICAL: Validate observation against space
    assert self.observation_space.contains(obs), \
        f"Observation {obs} not in space {self.observation_space}"

    return obs, {}

def step(self, action):
    # CRITICAL: Validate action is in action space
    assert self.action_space.contains(action), \
        f"Action {action} not in space {self.action_space}"

    obs = self._apply_action(action)

    # Validate observation
    assert self.observation_space.contains(obs), \
        f"Observation {obs} not in space {self.observation_space}"

    reward = self._compute_reward()
    terminated = self._check_done()
    truncated = False

    return obs, reward, terminated, truncated, {}

Common Space Mistakes

Mistake 1: dtype mismatch (uint8 vs float32)

# WRONG: Space says uint8 but observation is float32
observation_space = gym.spaces.Box(0, 255, (84, 84, 3), dtype=np.uint8)
obs = np.random.random((84, 84, 3)).astype(np.float32)  # MISMATCH!
assert self.observation_space.contains(obs)  # FAILS

# CORRECT: Match dtype
observation_space = gym.spaces.Box(0, 1, (84, 84, 3), dtype=np.float32)
obs = np.random.random((84, 84, 3)).astype(np.float32)
assert self.observation_space.contains(obs)  # PASSES

Mistake 2: Range mismatch

# WRONG: Observation outside declared range
observation_space = gym.spaces.Box(0, 1, (4,), dtype=np.float32)
obs = np.array([0.5, 1.5, 0.2, 0.8], dtype=np.float32)  # 1.5 > 1!
assert self.observation_space.contains(obs)  # FAILS

# CORRECT: Ensure observations stay within bounds
obs = np.clip(obs, 0, 1)

Mistake 3: Shape mismatch

# WRONG: Wrong shape
observation_space = gym.spaces.Box(0, 255, (84, 84, 3), dtype=np.uint8)
obs = np.random.randint(0, 256, (84, 84), dtype=np.uint8)  # 2D, not 3D!
assert self.observation_space.contains(obs)  # FAILS

# CORRECT: Match shape exactly
obs = np.random.randint(0, 256, (84, 84, 3), dtype=np.uint8)

Part 3: Creating Custom Environments - Template

Step 1: Inherit from gym.Env

import gymnasium as gym
import numpy as np

class CartPoleMini(gym.Env):
    """Simple environment for demonstration"""

    # These are required attributes
    metadata = {"render_modes": ["human"], "render_fps": 30}

    def __init__(self, render_mode=None):
        # Store render mode
        self.render_mode = render_mode

        # Action space: push cart left (0) or right (1)
        self.action_space = gym.spaces.Discrete(2)

        # Observation space: position, velocity, angle, angular velocity
        self.observation_space = gym.spaces.Box(
            low=np.array([-2.4, -10, -0.2, -10], dtype=np.float32),
            high=np.array([2.4, 10, 0.2, 10], dtype=np.float32),
            dtype=np.float32
        )

        # Episode variables
        self.state = None
        self.steps = 0
        self.max_steps = 500

Step 2: Implement reset()

    def reset(self, seed=None):
        """Reset to initial state

        Returns:
            obs (np.ndarray): Initial observation
            info (dict): Empty dict
        """
        super().reset(seed=seed)

        # Initialize state to center position with small noise
        self.state = np.array(
            [
                self.np_random.uniform(-0.05, 0.05),  # position
                0.0,  # velocity
                self.np_random.uniform(-0.05, 0.05),  # angle
                0.0,  # angular velocity
            ],
            dtype=np.float32
        )
        self.steps = 0

        # Validate and return
        assert self.observation_space.contains(self.state)
        return self.state, {}

Step 3: Implement step()

    def step(self, action):
        """Execute one step of the environment

        Args:
            action: 0 (push left) or 1 (push right)

        Returns:
            obs, reward, terminated, truncated, info
        """
        assert self.action_space.contains(action)

        # Validate state
        assert self.observation_space.contains(self.state)

        x, x_dot, theta, theta_dot = self.state

        # Physics: apply force based on action
        force = 10.0 if action == 1 else -10.0

        # Simplified cartpole physics
        acceleration = (force + 0.1 * theta) / 1.0
        theta_dot_new = theta_dot + 0.02 * acceleration
        theta_new = theta + 0.02 * theta_dot

        x_dot_new = x_dot + 0.02 * acceleration
        x_new = x + 0.02 * x_dot

        # Update state
        self.state = np.array(
            [x_new, x_dot_new, theta_new, theta_dot_new],
            dtype=np.float32
        )

        # Clamp values to stay in bounds
        self.state = np.clip(self.state,
                            self.observation_space.low,
                            self.observation_space.high)

        # Compute reward
        reward = 1.0 if abs(theta) < 0.2 else -1.0

        # Check termination
        x, theta = self.state[0], self.state[2]
        terminated = abs(x) > 2.4 or abs(theta) > 0.2

        # Check truncation (max steps)
        self.steps += 1
        truncated = self.steps >= self.max_steps

        # Validate output
        assert self.observation_space.contains(self.state)
        assert isinstance(reward, (int, float))

        return self.state, float(reward), terminated, truncated, {}

Step 4: Implement render() and close() (Optional)

    def render(self):
        """Render the environment (optional)"""
        if self.render_mode == "human":
            # Print state for visualization
            x, x_dot, theta, theta_dot = self.state
            print(f"Position: {x:.2f}, Angle: {theta:.2f}")

    def close(self):
        """Cleanup (optional)"""
        pass

Complete Custom Environment Example

import gymnasium as gym
import numpy as np

class GridWorldEnv(gym.Env):
    """Simple 5x5 grid world where agent seeks goal"""

    def __init__(self):
        # Actions: up=0, right=1, down=2, left=3
        self.action_space = gym.spaces.Discrete(4)

        # Observation: (x, y) position
        self.observation_space = gym.spaces.Box(
            low=0, high=4, shape=(2,), dtype=np.int32
        )

        self.grid_size = 5
        self.goal = np.array([4, 4], dtype=np.int32)
        self.agent_pos = np.array([0, 0], dtype=np.int32)
        self.steps = 0
        self.max_steps = 50

    def reset(self, seed=None):
        super().reset(seed=seed)
        self.agent_pos = np.array([0, 0], dtype=np.int32)
        self.steps = 0
        assert self.observation_space.contains(self.agent_pos)
        return self.agent_pos.copy(), {}

    def step(self, action):
        assert self.action_space.contains(action)

        # Move agent
        moves = {
            0: np.array([0, 1], dtype=np.int32),   # up
            1: np.array([1, 0], dtype=np.int32),   # right
            2: np.array([0, -1], dtype=np.int32),  # down
            3: np.array([-1, 0], dtype=np.int32),  # left
        }

        self.agent_pos += moves[action]
        self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size - 1)

        # Reward
        distance_to_goal = np.linalg.norm(self.agent_pos - self.goal)
        reward = 1.0 if np.array_equal(self.agent_pos, self.goal) else -0.01

        # Done
        terminated = np.array_equal(self.agent_pos, self.goal)
        self.steps += 1
        truncated = self.steps >= self.max_steps

        return self.agent_pos.copy(), reward, terminated, truncated, {}

Part 4: Environment Wrappers

Why Use Wrappers?

Wrappers add functionality without modifying the original environment:

# Without wrappers: modify environment directly (WRONG - mixes concerns)
class CartPoleNormalized(CartPole):
    def step(self, action):
        obs, reward, done, info = super().step(action)
        obs = obs / 2.4  # Normalize observation
        reward = reward / 100  # Normalize reward
        return obs, reward, done, info

# With wrappers: compose functionality (RIGHT - clean separation)
env = CartPole()
env = NormalizeObservation(env)
env = NormalizeReward(env)

Wrapper Pattern

class BaseWrapper(gym.Wrapper):
    """Base class for all wrappers"""

    def __init__(self, env):
        super().__init__(env)
        # Don't modify spaces unless you redefine them

    def reset(self, seed=None):
        obs, info = self.env.reset(seed=seed)
        return self._process_observation(obs), info

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        obs = self._process_observation(obs)
        reward = self._process_reward(reward)
        return obs, reward, terminated, truncated, info

    def _process_observation(self, obs):
        return obs

    def _process_reward(self, reward):
        return reward

Common Built-in Wrappers

TimeLimit: Add episode time limit

env = gym.make("CartPole-v1")
env = gym.wrappers.TimeLimit(env, max_episode_steps=500)
# Now truncated=True after 500 steps

NormalizeObservation: Normalize observations to [-1, 1]

env = gym.wrappers.NormalizeObservation(env)
# Observations normalized using running mean/std

RecordVideo: Save episode videos

env = gym.wrappers.RecordVideo(
    env,
    video_folder="videos/",
    episode_trigger=lambda ep: ep % 10 == 0
)

ClipAction: Clip actions to action space bounds

env = gym.wrappers.ClipAction(env)
# Actions automatically clipped to [-1, 1] or similar

Custom Wrapper Example: Scale Rewards

class ScaleRewardWrapper(gym.Wrapper):
    """Scale rewards by a constant factor"""

    def __init__(self, env, scale=0.1):
        super().__init__(env)
        self.scale = scale

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        return obs, reward * self.scale, terminated, truncated, info

Custom Wrapper Example: Frame Stacking

class FrameStackWrapper(gym.Wrapper):
    """Stack last 4 frames for temporal information"""

    def __init__(self, env, num_frames=4):
        super().__init__(env)
        self.num_frames = num_frames
        self.frame_buffer = collections.deque(maxlen=num_frames)

        # Modify observation space to include stacking
        old_space = env.observation_space
        self.observation_space = gym.spaces.Box(
            low=old_space.low.min(),
            high=old_space.high.max(),
            shape=(old_space.shape[0], old_space.shape[1],
                   old_space.shape[2] * num_frames),
            dtype=old_space.dtype
        )

    def reset(self, seed=None):
        obs, info = self.env.reset(seed=seed)
        self.frame_buffer.clear()
        for _ in range(self.num_frames):
            self.frame_buffer.append(obs)
        return self._get_stacked_obs(), info

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        self.frame_buffer.append(obs)
        return self._get_stacked_obs(), reward, terminated, truncated, info

    def _get_stacked_obs(self):
        # Stack frames along channel dimension
        return np.concatenate(list(self.frame_buffer), axis=2)

Wrapper Chaining

# Correct: Chain wrappers for composable functionality
env = gym.make("Atari2600-v0")
env = gym.wrappers.TimeLimit(env, max_episode_steps=4500)
env = gym.wrappers.ClipAction(env)
env = FrameStackWrapper(env, num_frames=4)
env = gym.wrappers.NormalizeObservation(env)

# Order matters: think about data flow
# raw env -> ClipAction -> FrameStack -> NormalizeObservation

Part 5: Vectorized Environments

Types of Vectorized Environments

DummyVectorEnv: Serial execution (simple, slowest)

from gymnasium.vector import DummyVectorEnv

# Create 4 independent environments (serial)
envs = DummyVectorEnv([
    lambda: gym.make("CartPole-v1")
    for i in range(4)
])

obs, info = envs.reset()  # obs shape: (4, 4)
actions = np.array([0, 1, 1, 0])  # 4 actions
obs, rewards, terminateds, truncateds, info = envs.step(actions)
# rewards shape: (4,)

SyncVectorEnv: Synchronized parallel (fast, moderate complexity)

from gymnasium.vector import SyncVectorEnv

# Create 8 parallel environments (all step together)
envs = SyncVectorEnv([
    lambda: gym.make("CartPole-v1")
    for i in range(8)
])

obs, info = envs.reset()
# All 8 envs step synchronously
obs, rewards, terminateds, truncateds, info = envs.step(actions)

AsyncVectorEnv: Asynchronous parallel (fastest, most complex)

from gymnasium.vector import AsyncVectorEnv

# Create 16 parallel environments (independent processes)
envs = AsyncVectorEnv([
    lambda: gym.make("CartPole-v1")
    for i in range(16)
])

# Same API as SyncVectorEnv but faster
obs, info = envs.reset()
obs, rewards, terminateds, truncateds, info = envs.step(actions)
envs.close()  # IMPORTANT: Close async envs to cleanup processes

Comparison and Decision Tree

Feature Dummy Sync Async
Speed Slow Fast Fastest
CPU cores 1 1 (+ GIL) N
Memory Low Moderate High
Complexity Simple Medium Complex
Debugging Easy Medium Hard
Best for Testing Training Large-scale training

When to use each:

num_envs = 32

if num_envs <= 1:
    # Single environment
    env = gym.make("CartPole-v1")
elif num_envs <= 4:
    # Few environments: use Dummy for simplicity
    env = DummyVectorEnv([gym.make("CartPole-v1") for _ in range(num_envs)])
elif num_envs <= 8:
    # Medium: use Sync for speed without complexity
    env = SyncVectorEnv([gym.make("CartPole-v1") for _ in range(num_envs)])
else:
    # Many: use Async for maximum speed
    env = AsyncVectorEnv([gym.make("CartPole-v1") for _ in range(num_envs)])

Common Vectorized Environment Bugs

Bug 1: Forgetting to close AsyncVectorEnv

# WRONG: Processes leak
envs = AsyncVectorEnv([...] for _ in range(16))
# ... training ...
# Forgot to close! Processes stay alive, memory leaks

# CORRECT: Always close
try:
    envs = AsyncVectorEnv([...] for _ in range(16))
    # ... training ...
finally:
    envs.close()  # Cleanup

# Or use context manager
from contextlib import contextmanager

@contextmanager
def make_async_envs(num_envs):
    envs = AsyncVectorEnv([...] for _ in range(num_envs))
    try:
        yield envs
    finally:
        envs.close()

Bug 2: Non-parallel-safe environment

# WRONG: Environment uses shared state, breaks with AsyncVectorEnv
class NonParallelEnv(gym.Env):
    global_counter = 0  # SHARED STATE!

    def step(self, action):
        self.global_counter += 1  # Race condition with async!
        ...

# CORRECT: No shared state
class ParallelSafeEnv(gym.Env):
    def __init__(self):
        self.counter = 0  # Instance variable, not shared

    def step(self, action):
        self.counter += 1  # Safe in parallel
        ...

Bug 3: Handling auto-reset in vectorized envs

# When an episode terminates in vectorized env, it auto-resets
obs, rewards, terminateds, truncateds, info = envs.step(actions)

# If terminateds[i] is True, envs[i] has been auto-reset
# The obs[i] is the NEW initial observation from the reset
# NOT the final observation of the episode

# To get final observation before reset:
obs, rewards, terminateds, truncateds, info = envs.step(actions)
final_obs = info['final_observation']  # Original terminal obs
reset_obs = obs  # New obs from auto-reset

Part 6: Common Environment Bugs and Fixes

Bug 1: Reward Scale Too Large

Symptom: Training unstable, losses spike, agent behavior random

# WRONG: Reward in range [0, 1000]
def step(self, action):
    reward = self.goal_distance * 1000  # Can be up to 1000!
    return obs, reward, done, truncated, info

# Problem: Gradients huge -> param updates too large -> training breaks

# CORRECT: Reward in [-1, 1]
def step(self, action):
    reward = self.goal_distance  # Range [0, 1]
    reward = reward - 0.5  # Scale to [-0.5, 0.5]
    return obs, reward, done, truncated, info

# Or normalize post-hoc
reward = np.clip(reward / 1000, -1, 1)

Bug 2: Action Not Applied Correctly

Symptom: Agent learns but behavior doesn't match reward signal

# WRONG: Action read but not used
def step(self, action):
    obs = self._get_next_obs()  # Doesn't use action!
    reward = 1.0  # Reward independent of action
    return obs, reward, False, False, {}

# CORRECT: Action determines next state
def step(self, action):
    self._apply_action_to_physics(action)
    obs = self._get_next_obs()
    reward = self._compute_reward(action)
    return obs, reward, False, False, {}

Bug 3: Missing Terminal State Flag

Symptom: Episodes don't end properly, agent never learns boundaries

# WRONG: Always done=False
def step(self, action):
    ...
    return obs, reward, False, False, {}  # Episode never ends!

# CORRECT: Set terminated when episode should end
def step(self, action):
    ...
    terminated = self._check_done_condition()
    if terminated:
        reward += 100  # Bonus for reaching goal
    return obs, reward, terminated, False, {}

# Also differentiate from truncation
def step(self, action):
    ...
    self.steps += 1
    terminated = self._reached_goal()  # Success condition
    truncated = self.steps >= self.max_steps  # Time limit
    return obs, reward, terminated, truncated, {}

Bug 4: Observation/Space Mismatch

Symptom: Training crashes or behaves oddly after environment change

# WRONG: Space and observation don't match
def __init__(self):
    self.observation_space = gym.spaces.Box(0, 1, (4,), dtype=np.float32)

def step(self, action):
    obs = np.random.randint(0, 256, (4,), dtype=np.uint8)  # uint8!
    return obs, reward, done, truncated, {}  # Mismatch!

# CORRECT: Match dtype and range
def __init__(self):
    self.observation_space = gym.spaces.Box(0, 255, (4,), dtype=np.uint8)

def step(self, action):
    obs = np.random.randint(0, 256, (4,), dtype=np.uint8)  # Matches!
    assert self.observation_space.contains(obs)
    return obs, reward, done, truncated, {}

Bug 5: Reset Not Initializing State

Symptom: First episode works, subsequent episodes fail

# WRONG: Reset doesn't actually reset
def reset(self, seed=None):
    super().reset(seed=seed)
    # Forgot to initialize state!
    return self.state, {}  # self.state is stale from last episode

# CORRECT: Reset initializes everything
def reset(self, seed=None):
    super().reset(seed=seed)
    self.state = self._initialize_state()
    self.steps = 0
    return self.state, {}

Bug 6: Non-Deterministic Environment Without Proper Seeding

Symptom: Same reset produces different initial states, breaks reproducibility

# WRONG: Randomness not seeded
def reset(self, seed=None):
    super().reset(seed=seed)
    self.state = np.random.randn(4)  # Uses default RNG, ignores seed!
    return self.state, {}

# CORRECT: Use self.np_random which respects seed
def reset(self, seed=None):
    super().reset(seed=seed)
    # self.np_random is seeded by super().reset()
    self.state = self.np_random.randn(4)
    return self.state, {}

Bug 7: Info Dict Contains Non-Serializable Objects

Symptom: Episode fails when saving/loading with replay buffers

# WRONG: Info dict contains unpicklable objects
def step(self, action):
    info = {
        'env': self,  # Can't pickle!
        'callback': self.callback_fn,  # Can't pickle!
    }
    return obs, reward, done, truncated, info

# CORRECT: Only basic types in info dict
def step(self, action):
    info = {
        'level': self.level,
        'score': self.score,
        'x_position': float(self.x),
    }
    return obs, reward, done, truncated, info

Bug 8: Action Space Not Enforced

Symptom: Agent takes actions outside valid range, causes crashes

# WRONG: Action space defined but not enforced
def __init__(self):
    self.action_space = gym.spaces.Box(-1, 1, (3,))

def step(self, action):
    # action could be [10, 10, 10] and we don't catch it!
    velocity = action * 10  # Huge velocity!
    ...

# CORRECT: Clip or validate actions
def step(self, action):
    assert self.action_space.contains(action), \
        f"Invalid action {action}"

    # Or clip to bounds
    action = np.clip(action,
                     self.action_space.low,
                     self.action_space.high)
    ...

Bug 9: Observation Normalization Not Applied

Symptom: Training unstable when observations are in [0, 255] instead of [0, 1]

# WRONG: Large observation range breaks training
def step(self, action):
    obs = self.render_to_image()  # Range [0, 255]
    return obs, reward, done, truncated, {}

# CORRECT: Normalize observations
def step(self, action):
    obs = self.render_to_image()  # Range [0, 255]
    obs = obs.astype(np.float32) / 255.0  # Normalize to [0, 1]
    return obs, reward, done, truncated, {}

# Or use NormalizeObservation wrapper
env = NormalizeObservation(env)

Bug 10: Forgetting to Return Info Dict

Symptom: Step returns wrong number of values, crashes agent training loop

# WRONG: Step returns 4 values (old Gym API)
def step(self, action):
    return obs, reward, done, info  # WRONG!

# CORRECT: Step returns 5 values (Gymnasium API)
def step(self, action):
    return obs, reward, terminated, truncated, info

# Or use try-except during migration
try:
    obs, reward, terminated, truncated, info = env.step(action)
except ValueError:
    obs, reward, done, info = env.step(action)
    terminated = done
    truncated = False

Part 7: Environment Testing Checklist

Before training an RL agent on a custom environment, validate:

Pre-Training Validation Checklist

class EnvironmentValidator:
    """Validate custom environment before training"""

    def validate_all(self, env):
        """Run all validation tests"""
        print("Validating environment...")

        # 1. Spaces are valid
        self.validate_spaces(env)
        print("✓ Spaces valid")

        # 2. Reset works
        obs, info = self.validate_reset(env)
        print("✓ Reset works")

        # 3. Step works and returns correct format
        self.validate_step(env, obs)
        print("✓ Step works")

        # 4. Observations are valid
        self.validate_observations(env, obs)
        print("✓ Observations valid")

        # 5. Actions are enforced
        self.validate_actions(env)
        print("✓ Actions enforced")

        # 6. Terminal states work
        self.validate_termination(env)
        print("✓ Termination works")

        # 7. Environment is reproducible
        self.validate_reproducibility(env)
        print("✓ Reproducibility verified")

        # 8. Random agent can run
        self.validate_random_agent(env)
        print("✓ Random agent runs")

        print("\nEnvironment validation PASSED!")

    def validate_spaces(self, env):
        """Check spaces are defined"""
        assert hasattr(env, 'action_space'), "No action_space"
        assert hasattr(env, 'observation_space'), "No observation_space"
        assert isinstance(env.action_space, gym.spaces.Space)
        assert isinstance(env.observation_space, gym.spaces.Space)

    def validate_reset(self, env):
        """Check reset returns (obs, info)"""
        result = env.reset()
        assert isinstance(result, tuple) and len(result) == 2, \
            f"Reset should return (obs, info), got {result}"
        obs, info = result
        assert isinstance(info, dict), "Info should be dict"
        return obs, info

    def validate_step(self, env, obs):
        """Check step returns 5-tuple"""
        action = env.action_space.sample()
        result = env.step(action)
        assert isinstance(result, tuple) and len(result) == 5, \
            f"Step should return 5-tuple, got {len(result)}"
        obs, reward, terminated, truncated, info = result
        assert isinstance(reward, (int, float)), "Reward must be number"
        assert isinstance(terminated, (bool, np.bool_)), "terminated must be bool"
        assert isinstance(truncated, (bool, np.bool_)), "truncated must be bool"
        assert isinstance(info, dict), "Info must be dict"

    def validate_observations(self, env, obs):
        """Check observations match space"""
        assert env.observation_space.contains(obs), \
            f"Observation {obs.shape} not in space {env.observation_space}"

    def validate_actions(self, env):
        """Check invalid actions fail"""
        if isinstance(env.action_space, gym.spaces.Discrete):
            invalid_action = env.action_space.n + 10
            assert not env.action_space.contains(invalid_action)

    def validate_termination(self, env):
        """Check episodes can terminate"""
        obs, _ = env.reset()
        for _ in range(1000):
            action = env.action_space.sample()
            obs, reward, terminated, truncated, info = env.step(action)
            if terminated or truncated:
                break
        assert terminated or truncated, \
            "Episode never terminated in 1000 steps!"

    def validate_reproducibility(self, env):
        """Check reset with seed is reproducible"""
        obs1, _ = env.reset(seed=42)
        obs2, _ = env.reset(seed=42)
        assert np.allclose(obs1, obs2), "Reset not reproducible!"

    def validate_random_agent(self, env):
        """Check environment works with random actions"""
        obs, _ = env.reset()
        total_reward = 0
        for _ in range(100):
            action = env.action_space.sample()
            obs, reward, terminated, truncated, info = env.step(action)
            total_reward += reward
            if terminated or truncated:
                break
        assert total_reward is not None, "No reward computed!"

# Usage
validator = EnvironmentValidator()
validator.validate_all(env)

Manual Testing

Before training, play with the environment manually:

# Manual environment exploration
env = GridWorldEnv()
obs, _ = env.reset()

while True:
    action = int(input("Action (0=up, 1=right, 2=down, 3=left): "))
    obs, reward, terminated, truncated, info = env.step(action)
    print(f"Position: {obs}, Reward: {reward}, Done: {terminated}")

    if terminated or truncated:
        obs, _ = env.reset()
        print("Episode reset")

Part 8: Red Flags and Anti-Patterns

Red Flag 1: Reward Scale Issue

# RED FLAG: Rewards in [0, 1000000]
reward = distance_to_goal * 1000000  # HUGE!

# Solution: Scale to [-1, 1]
reward = -distance_to_goal / max_distance
assert -1 <= reward <= 1

Red Flag 2: Observation Type Mismatch

# RED FLAG: Observation dtype doesn't match space
observation_space = Box(0, 255, (84, 84, 3), dtype=np.uint8)
obs = np.random.random((84, 84, 3)).astype(np.float32)  # MISMATCH!

# Solution: Match dtype exactly
obs = (obs * 255).astype(np.uint8)

Red Flag 3: Missing Done Flag

# RED FLAG: Episodes never end
def step(self, action):
    return obs, reward, False, False, {}  # Always False!

# Solution: Implement termination logic
terminated = self.check_goal_reached() or self.check_failure()

Red Flag 4: Action Bounds Not Enforced

# RED FLAG: Network outputs unconstrained
def step(self, action):  # action could be [1000, -1000]
    velocity = action  # HUGE velocity!

# Solution: Clip or validate
action = np.clip(action,
                 self.action_space.low,
                 self.action_space.high)

Red Flag 5: Vectorized Environment Auto-Reset Confusion

# RED FLAG: Treating auto-reset obs as terminal obs
obs, rewards, terminateds, truncateds, info = envs.step(actions)
# obs contains NEW reset observations, not final observations!

# Solution: Use info['final_observation']
final_obs = info['final_observation']

Red Flag 6: Non-Parallel-Safe Shared State

# RED FLAG: Shared state breaks AsyncVectorEnv
class Env(gym.Env):
    global_counter = 0  # SHARED!

    def step(self, action):
        Env.global_counter += 1  # Race condition!

# Solution: Instance variables only
def __init__(self):
    self.counter = 0  # Instance-specific

Red Flag 7: Info Dict with Unpicklable Objects

# RED FLAG: Can't serialize for replay buffer
info = {
    'env': self,
    'callback': self.fn,
}

# Solution: Only basic types
info = {
    'level': 5,
    'score': 100,
}

Red Flag 8: Forgetting to Close AsyncVectorEnv

# RED FLAG: Process leak
envs = AsyncVectorEnv([...])
# ... forgot env.close()

# Solution: Always close
envs.close()  # or use try/finally

Part 9: Rationalization Resistance

Common Wrong Beliefs About Environments:

Claim 1: "My custom environment should just work without testing"

  • Reality: 80% of RL failures are environment bugs. Test before training.
  • Evidence: Standard validation checklist catches bugs 95% of the time

Claim 2: "Reward scaling doesn't matter, only matters for learning rate"

  • Reality: Reward scale affects gradient magnitudes directly. Too large = instability.
  • Evidence: Scaling reward by 100x often breaks training even with correct learning rate

Claim 3: "Wrappers are optional complexity I don't need"

  • Reality: Wrappers enforce separation of concerns. Without them, environments become unmaintainable.
  • Evidence: Real RL code uses 3-5 wrappers (TimeLimit, Normalize, ClipAction, etc)

Claim 4: "Vectorized environments are always faster"

  • Reality: Parallelization overhead for small envs can make them slower.
  • Evidence: For < 4 envs, DummyVectorEnv is faster than AsyncVectorEnv

Claim 5: "My environment is correct if the agent learns something"

  • Reality: Agent can learn to game a broken reward signal.
  • Evidence: Agent learning ≠ environment correctness. Run tests.

Claim 6: "AsyncVectorEnv doesn't need explicit close()"

  • Reality: Processes leak if not closed, draining system resources.
  • Evidence: Unmanaged AsyncVectorEnv with 16+ processes brings systems to halt

Claim 7: "Observation normalization breaks training"

  • Reality: Unnormalized large observations (like [0, 255]) break training.
  • Evidence: Normalizing [0, 255] images to [0, 1] is standard practice

Claim 8: "I don't need to validate action space enforcement"

  • Reality: Network outputs can violate bounds, causing physics errors.
  • Evidence: Unclipped continuous actions often cause simulation failures

Part 10: Pressure Test Scenarios

Scenario 1: Custom Environment Debugging

# Subagent challenge WITHOUT skill:
# "I built a custom CartPole variant. Training fails silently
# (agent doesn't learn). The environment seems fine when I test it.
# Where do I start debugging?"

# Expected WITH skill:
# 1. Validate observation space matches actual observations
# 2. Validate action space bounds are enforced
# 3. Check reward scale is in [-1, 1]
# 4. Verify reset/step API is correct (Gym vs Gymnasium)
# 5. Run environment validator checklist
# 6. Manual play-test to check physics
# 7. Verify terminal state logic

Scenario 2: Wrapper Composition

# Challenge: Build a correct wrapper stack
# env = gym.make("CartPole-v1")
# env = TimeLimit(env, 500)  # Add time limit
# env = NormalizeObservation(env)  # Normalize
# Should be safe to use with any policy training

# WITHOUT skill: Guess order, wrong wrapping
# WITH skill: Know correct order, understand composition

Scenario 3: Vectorization Decision

# Challenge: "I need to train on 32 parallel CartPoles.
# Which vectorized environment type is best?"

# WITHOUT skill: Try all three, pick whichever runs
# WITH skill: Analyze trade-offs
#   - 32 envs -> AsyncVectorEnv
#   - Memory acceptable? -> Yes
#   - Debugging needed? -> No -> Use Async

Scenario 4: Space Mismatch Detection

# Challenge: Environment crashes during training with cryptic error.
# Observation is (84, 84, 3) uint8 but CNN expects float32 in [0, 1]

# WITHOUT skill: Spend hours debugging network
# WITH skill: Immediately suspect observation/space mismatch
# Run validator, find dtype mismatch, fix preprocessing

Part 11: Advanced Patterns - Multi-Agent Environments

Multi-Agent Observation Spaces

Scenario: Multi-agent game with individual agent observations

class MultiAgentGridWorld(gym.Env):
    """2-agent cooperative environment"""

    def __init__(self, num_agents=2):
        self.num_agents = num_agents

        # Each agent has its own action space
        self.action_space = gym.spaces.MultiDiscrete([4] * num_agents)

        # Each agent observes its own position + other agents' positions
        # Dict space allows per-agent observations
        self.observation_space = gym.spaces.Dict({
            f'agent_{i}': gym.spaces.Box(0, 4, (2 * num_agents,), dtype=np.int32)
            for i in range(num_agents)
        })

        self.agents = [np.array([i, 0], dtype=np.int32) for i in range(num_agents)]
        self.goal = np.array([4, 4], dtype=np.int32)

    def reset(self, seed=None):
        super().reset(seed=seed)
        self.agents = [np.array([i, 0], dtype=np.int32) for i in range(self.num_agents)]

        obs = {}
        for i in range(self.num_agents):
            agent_obs = np.concatenate([agent.copy() for agent in self.agents])
            obs[f'agent_{i}'] = agent_obs.astype(np.int32)

        return obs, {}

    def step(self, actions):
        """actions is array of length num_agents"""
        moves = [
            np.array([0, 1], dtype=np.int32),
            np.array([1, 0], dtype=np.int32),
            np.array([0, -1], dtype=np.int32),
            np.array([-1, 0], dtype=np.int32),
        ]

        # Apply each agent's action
        for i, action in enumerate(actions):
            self.agents[i] += moves[action]
            self.agents[i] = np.clip(self.agents[i], 0, 4)

        # Shared reward: both agents get reward for reaching goal
        distances = [np.linalg.norm(agent - self.goal) for agent in self.agents]
        reward = sum(1.0 / (1.0 + d) for d in distances)

        # Both must reach goal
        terminated = all(np.array_equal(agent, self.goal) for agent in self.agents)

        # Construct observation for each agent
        obs = {}
        for i in range(self.num_agents):
            agent_obs = np.concatenate([agent.copy() for agent in self.agents])
            obs[f'agent_{i}'] = agent_obs.astype(np.int32)

        truncated = False
        return obs, reward, terminated, truncated, {}

Key Multi-Agent Patterns

# Pattern 1: Separate rewards per agent
rewards = {
    f'agent_{i}': compute_reward_for_agent(i)
    for i in range(num_agents)
}

# Pattern 2: Shared team reward
team_reward = sum(individual_rewards) / num_agents

# Pattern 3: Mixed observations (shared + individual)
obs = {
    f'agent_{i}': {
        'own_state': agent_states[i],
        'other_positions': [s for j, s in enumerate(agent_states) if j != i],
        'global_state': shared_state,
    }
    for i in range(num_agents)
}

# Pattern 4: Synchronized reset for coordinated behavior
def reset(self, seed=None):
    super().reset(seed=seed)
    # All agents reset to coordinated starting positions
    self.agents = initialize_team_formation()

Part 12: Integration with Training Loops

Proper Environment Integration

class TrainingLoop:
    """Shows correct environment integration pattern"""

    def __init__(self, env, num_parallel=4):
        self.env = self._setup_environment(env, num_parallel)
        self.policy = build_policy()

    def _setup_environment(self, env, num_parallel):
        """Proper environment setup"""
        if num_parallel == 1:
            env = gym.make(env)
        elif num_parallel <= 4:
            env = DummyVectorEnv([lambda: gym.make(env) for _ in range(num_parallel)])
        else:
            env = SyncVectorEnv([lambda: gym.make(env) for _ in range(num_parallel)])

        # Add standard wrappers
        env = gym.wrappers.TimeLimit(env, max_episode_steps=1000)
        env = NormalizeObservation(env)

        return env

    def train_one_episode(self):
        """Correct training loop"""
        obs, info = self.env.reset()

        total_reward = 0
        steps = 0

        while True:
            # Get action from policy
            action = self.policy.get_action(obs)

            # CRITICAL: Validate action is in space
            assert self.env.action_space.contains(action)

            # Step environment
            obs, reward, terminated, truncated, info = self.env.step(action)

            # CRITICAL: Handle auto-reset in vectorized case
            if 'final_observation' in info:
                final_obs = info['final_observation']
                # Store final obs in replay buffer, not reset obs
            else:
                final_obs = obs

            # Store experience
            self.store_experience(obs, reward, terminated, truncated, info)

            total_reward += np.mean(reward) if isinstance(reward, np.ndarray) else reward
            steps += 1

            # Check termination
            if np.any(terminated) or np.any(truncated):
                break

        return total_reward / steps

    def store_experience(self, obs, reward, terminated, truncated, info):
        """Correct experience storage"""
        # Handle vectorized case (obs, reward are arrays)
        if isinstance(reward, np.ndarray):
            for i in range(len(reward)):
                self.replay_buffer.add(
                    obs=obs[i] if isinstance(obs, np.ndarray) else obs,
                    action=None,  # Set before storing
                    reward=reward[i],
                    done=terminated[i] or truncated[i],
                    next_obs=obs[i] if isinstance(obs, np.ndarray) else obs,
                )

Common Integration Mistakes

Mistake 1: Not closing AsyncVectorEnv

# WRONG: Process leak
envs = AsyncVectorEnv([...] for _ in range(16))
for episode in range(1000):
    obs, _ = envs.reset()
    # ... training ...
# Processes never cleaned up

# CORRECT: Always cleanup
try:
    envs = AsyncVectorEnv([...] for _ in range(16))
    for episode in range(1000):
        obs, _ = envs.reset()
        # ... training ...
finally:
    envs.close()

Mistake 2: Using wrong observation after auto-reset

# WRONG: Mixing terminal and reset observations
obs, reward, terminated, truncated, info = envs.step(actions)
# obs is reset observation, but we treat it as terminal!
store_in_replay_buffer(obs, reward, terminated)

# CORRECT: Use final_observation for training
final_obs = info.get('final_observation', obs)
if np.any(terminated):
    store_in_replay_buffer(final_obs, reward, terminated)
else:
    next_obs = obs

Mistake 3: Not validating agent actions

# WRONG: Trust agent always outputs valid action
action = policy(obs)
obs, reward, terminated, truncated, info = env.step(action)

# CORRECT: Validate before stepping
action = policy(obs)
action = np.clip(action, env.action_space.low, env.action_space.high)
assert env.action_space.contains(action)
obs, reward, terminated, truncated, info = env.step(action)

Part 13: Performance Optimization

Observation Preprocessing Performance

class OptimizedObservationPreprocessing:
    """Efficient observation handling"""

    def __init__(self, env):
        self.env = env

    def preprocess_observation(self, obs):
        """Optimized preprocessing"""
        # Avoid unnecessary copies
        if obs.dtype == np.uint8:
            # In-place division for efficiency
            obs = obs.astype(np.float32) / 255.0
        else:
            obs = obs / 255.0

        # Use memmap for large observations
        if obs.nbytes > 1_000_000:  # > 1MB
            # Consider using memory-mapped arrays
            pass

        return obs

    def batch_preprocess(self, obs_batch):
        """Batch processing for vectorized envs"""
        # Vectorized preprocessing is faster than per-obs
        if isinstance(obs_batch, np.ndarray) and obs_batch.ndim == 4:
            # (batch_size, H, W, C) image batch
            obs_batch = obs_batch.astype(np.float32) / 255.0
        return obs_batch

Vectorization Performance Tips

# Benchmark: When does parallelization help?

# For CartPole (fast env):
# - 1 env: 10k steps/sec on 1 core
# - 4 Dummy: 9k steps/sec (overhead)
# - 4 Sync: 15k steps/sec (parallelism helps)
# - 4 Async: 12k steps/sec (context switch overhead)

# For Atari (slow env):
# - 1 env: 0.5k steps/sec on 1 core
# - 16 Dummy: 7k steps/sec (overhead worth it)
# - 16 Sync: 15k steps/sec (GIL limits)
# - 16 Async: 25k steps/sec (parallelism dominates)

# Rule of thumb:
# - env_step_time < 1ms: parallelization overhead dominates, use Dummy
# - env_step_time 1-10ms: parallelization helps, use Sync
# - env_step_time > 10ms: parallelization essential, use Async

Part 14: Debugging Environment Issues Systematically

Diagnostic Checklist for Broken Training

class EnvironmentDebugger:
    """Systematic environment debugging"""

    def full_diagnosis(self, env, policy):
        """Complete environment diagnostic"""
        print("=== Environment Diagnostic ===")

        # 1. Check environment API
        self.check_api(env)
        print("✓ API correct")

        # 2. Check spaces
        self.check_spaces(env)
        print("✓ Spaces valid")

        # 3. Check reset/step mechanics
        self.check_mechanics(env)
        print("✓ Reset/step mechanics correct")

        # 4. Check observation statistics
        obs_stats = self.analyze_observations(env)
        print(f"✓ Observations: mean={obs_stats['mean']:.3f}, std={obs_stats['std']:.3f}")

        # 5. Check reward statistics
        reward_stats = self.analyze_rewards(env)
        print(f"✓ Rewards: mean={reward_stats['mean']:.3f}, std={reward_stats['std']:.3f}")
        if abs(reward_stats['mean']) > 1 or reward_stats['std'] > 1:
            print("  WARNING: Reward scale may be too large")

        # 6. Check episode lengths
        lengths = self.analyze_episode_lengths(env)
        print(f"✓ Episode lengths: mean={lengths['mean']:.1f}, min={lengths['min']}, max={lengths['max']}")

        # 7. Check reproducibility
        self.check_reproducibility(env)
        print("✓ Reproducibility verified")

        # 8. Check with policy
        self.check_policy_integration(env, policy)
        print("✓ Policy integration works")

    def analyze_observations(self, env, num_episodes=10):
        """Analyze observation distribution"""
        obs_list = []
        for _ in range(num_episodes):
            obs, _ = env.reset()
            for _ in range(100):
                action = env.action_space.sample()
                obs, _, terminated, truncated, _ = env.step(action)
                obs_list.append(obs.flatten())
                if terminated or truncated:
                    break

        obs_array = np.concatenate(obs_list)
        return {
            'mean': np.mean(obs_array),
            'std': np.std(obs_array),
            'min': np.min(obs_array),
            'max': np.max(obs_array),
        }

    def analyze_rewards(self, env, num_episodes=10):
        """Analyze reward distribution"""
        rewards = []
        for _ in range(num_episodes):
            obs, _ = env.reset()
            for _ in range(100):
                action = env.action_space.sample()
                obs, reward, terminated, truncated, _ = env.step(action)
                rewards.append(reward)
                if terminated or truncated:
                    break

        rewards = np.array(rewards)
        return {
            'mean': np.mean(rewards),
            'std': np.std(rewards),
            'min': np.min(rewards),
            'max': np.max(rewards),
        }

    def analyze_episode_lengths(self, env, num_episodes=20):
        """Analyze episode length distribution"""
        lengths = []
        for _ in range(num_episodes):
            obs, _ = env.reset()
            steps = 0
            for step in range(10000):  # Max steps
                action = env.action_space.sample()
                obs, reward, terminated, truncated, _ = env.step(action)
                steps += 1
                if terminated or truncated:
                    break
            lengths.append(steps)

        lengths = np.array(lengths)
        return {
            'mean': np.mean(lengths),
            'min': int(np.min(lengths)),
            'max': int(np.max(lengths)),
            'median': int(np.median(lengths)),
        }

Summary: When to Invoke This Skill

Use rl-environments skill when:

  1. Creating custom environments from scratch
  2. Debugging environment-related training failures
  3. Implementing observation/action spaces
  4. Using or creating wrappers
  5. Parallelizing environments
  6. Testing environments before training
  7. Handling Gym vs Gymnasium differences
  8. Migrating environment code between versions
  9. Building multi-agent or multi-component environments
  10. Enforcing action/observation bounds correctly
  11. Optimizing environment performance
  12. Debugging training failures systematically

This skill prevents:

  • 80% of RL bugs (environment issues)
  • Silent training failures from broken environments
  • Vectorization-related data corruption
  • Observation/action space mismatches
  • Reward scaling instabilities
  • Terminal state logic errors
  • Reproducibility issues from poor seeding
  • Performance degradation from inefficient environments
  • Multi-agent coordination failures
  • Integration issues with training loops