| name | redis |
| description | Redis caching, pub/sub, and data structures. Activate for Redis operations, caching strategies, session management, and message queuing. |
| allowed-tools | Bash, Read, Write, Edit, Glob, Grep |
Redis Skill
Provides comprehensive Redis capabilities for the Golden Armada AI Agent Fleet Platform.
When to Use This Skill
Activate this skill when working with:
- Caching implementation
- Session management
- Pub/Sub messaging
- Rate limiting
- Distributed locks
Redis CLI Quick Reference
Connection
```bash
Connect
redis-cli -h localhost -p 6379 redis-cli -h localhost -p 6379 -a password
Test connection
redis-cli ping ```
Basic Operations
```bash
Strings
SET key "value" SET key "value" EX 3600 # With TTL GET key DEL key EXISTS key TTL key EXPIRE key 3600
Hashes
HSET user:1 name "John" age "30" HGET user:1 name HGETALL user:1 HDEL user:1 age
Lists
LPUSH queue "item1" RPUSH queue "item2" LPOP queue RPOP queue LRANGE queue 0 -1
Sets
SADD tags "python" "redis" SMEMBERS tags SISMEMBER tags "python" SREM tags "python"
Sorted Sets
ZADD leaderboard 100 "player1" 200 "player2" ZRANGE leaderboard 0 -1 WITHSCORES ZRANK leaderboard "player1" ZINCRBY leaderboard 50 "player1" ```
Python Redis Client
```python import redis from redis import asyncio as aioredis
Synchronous client
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
Async client
async_redis = aioredis.from_url("redis://localhost:6379", decode_responses=True)
Basic operations
r.set('key', 'value', ex=3600) value = r.get('key') r.delete('key')
Hash operations
r.hset('user:1', mapping={'name': 'John', 'age': '30'}) user = r.hgetall('user:1')
List operations
r.lpush('queue', 'item1', 'item2') items = r.lrange('queue', 0, -1) item = r.rpop('queue')
Async operations
async def cache_get(key: str): async with aioredis.from_url("redis://localhost") as redis: return await redis.get(key) ```
Caching Patterns
Cache-Aside Pattern
```python async def get_agent(agent_id: str) -> dict: # Try cache first cache_key = f"agent:{agent_id}" cached = await redis.get(cache_key) if cached: return json.loads(cached)
# Cache miss - fetch from database
agent = await db.get_agent(agent_id)
if agent:
await redis.set(cache_key, json.dumps(agent), ex=3600)
return agent
async def update_agent(agent_id: str, data: dict) -> dict: # Update database agent = await db.update_agent(agent_id, data)
# Invalidate cache
cache_key = f"agent:{agent_id}"
await redis.delete(cache_key)
return agent
```
Rate Limiting
```python async def rate_limit(key: str, limit: int, window: int) -> bool: """ Sliding window rate limiter. Returns True if request is allowed. """ current = int(time.time()) window_key = f"rate:{key}:{current // window}"
async with redis.pipeline() as pipe:
pipe.incr(window_key)
pipe.expire(window_key, window * 2)
results = await pipe.execute()
count = results[0]
return count <= limit
Usage
if not await rate_limit(f"user:{user_id}", limit=100, window=60): raise HTTPException(status_code=429, detail="Rate limit exceeded") ```
Distributed Lock
```python import uuid
class DistributedLock: def init(self, redis_client, key: str, timeout: int = 10): self.redis = redis_client self.key = f"lock:{key}" self.timeout = timeout self.token = str(uuid.uuid4())
async def __aenter__(self):
while True:
acquired = await self.redis.set(
self.key, self.token, nx=True, ex=self.timeout
)
if acquired:
return self
await asyncio.sleep(0.1)
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Only release if we own the lock
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
await self.redis.eval(script, 1, self.key, self.token)
Usage
async with DistributedLock(redis, "resource:123"): await do_critical_work() ```
Pub/Sub
```python
Publisher
async def publish_event(channel: str, message: dict): await redis.publish(channel, json.dumps(message))
Subscriber
async def subscribe_events(): pubsub = redis.pubsub() await pubsub.subscribe("agent:events")
async for message in pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
await handle_event(data)
```
Session Management
```python from fastapi import Depends, HTTPException from fastapi.security import HTTPBearer import secrets
security = HTTPBearer()
async def create_session(user_id: str) -> str: session_id = secrets.token_urlsafe(32) session_key = f"session:{session_id}"
await redis.hset(session_key, mapping={
"user_id": user_id,
"created_at": datetime.utcnow().isoformat()
})
await redis.expire(session_key, 86400) # 24 hours
return session_id
async def get_session(token: str = Depends(security)) -> dict: session_key = f"session:{token.credentials}" session = await redis.hgetall(session_key)
if not session:
raise HTTPException(status_code=401, detail="Invalid session")
# Refresh TTL
await redis.expire(session_key, 86400)
return session
```
Best Practices
- Use connection pooling for production
- Set TTL on all keys to prevent memory bloat
- Use pipelining for batch operations
- Implement proper error handling for connection issues
- Monitor memory usage with
INFO memory - Use Lua scripts for atomic operations