| name | database-operations |
| description | Connect to and query SQL databases, execute queries, and handle database transactions. Use when reading from or writing to databases, performing batch inserts, managing connections, or executing complex SQL queries in data pipelines. |
Database Operations
Provides patterns for database interactions in ETL pipelines.
SQLite Connection
import sqlite3
from contextlib import contextmanager
@contextmanager
def get_db_connection(db_path: str):
"""Create database connection with automatic cleanup."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
# Usage
with get_db_connection('/app/data/erp.db') as conn:
cursor = conn.execute("SELECT * FROM orders")
rows = cursor.fetchall()
Query Execution
def execute_query(conn, query: str, params: tuple = None) -> list[dict]:
"""Execute query and return results as list of dicts."""
cursor = conn.execute(query, params or ())
columns = [col[0] for col in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
def execute_many(conn, query: str, data: list[tuple]):
"""Execute query with multiple parameter sets."""
cursor = conn.executemany(query, data)
conn.commit()
return cursor.rowcount
Batch Insert
def batch_insert(conn, table: str, records: list[dict], batch_size: int = 1000):
"""Insert records in batches."""
if not records:
return 0
columns = list(records[0].keys())
placeholders = ', '.join(['?' for _ in columns])
query = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({placeholders})"
total_inserted = 0
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
data = [tuple(r[col] for col in columns) for r in batch]
total_inserted += execute_many(conn, query, data)
return total_inserted
Schema Inspection
def get_table_schema(conn, table: str) -> list[dict]:
"""Get table column information."""
cursor = conn.execute(f"PRAGMA table_info({table})")
return [
{'name': row[1], 'type': row[2], 'nullable': not row[3], 'primary_key': bool(row[5])}
for row in cursor.fetchall()
]
def get_tables(conn) -> list[str]:
"""List all tables in database."""
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
return [row[0] for row in cursor.fetchall()]