Claude Code Plugins

Community-maintained marketplace

Feedback

Expert assistance for using pytd (Python SDK) to query and import data with Treasure Data. Use this skill when users need help with Python-based data analysis, querying Presto/Hive, importing pandas DataFrames, bulk data uploads, or integrating TD with Python analytical workflows.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name pytd
description Expert assistance for using pytd (Python SDK) to query and import data with Treasure Data. Use this skill when users need help with Python-based data analysis, querying Presto/Hive, importing pandas DataFrames, bulk data uploads, or integrating TD with Python analytical workflows.

pytd - Treasure Data Python SDK

Expert assistance for querying and importing data to Treasure Data using pytd, the official Python driver for analytical workflows.

When to Use This Skill

Use this skill when:

  • Querying Treasure Data from Python scripts or Jupyter notebooks
  • Importing pandas DataFrames to TD tables
  • Running Presto or Hive queries from Python
  • Building data pipelines with Python and TD
  • Performing bulk data imports or exports
  • Migrating from deprecated pandas-td library
  • Integrating TD with Python data science workflows
  • Handling large result sets with iterative retrieval

Core Principles

1. Installation

Standard Installation:

pip install pytd

Requirements:

  • Python 3.9 or later
  • pandas 2.0 or later

2. Authentication & Configuration

Environment Variables (Recommended):

export TD_API_KEY="your_api_key_here"
export TD_API_SERVER="https://api.treasuredata.com/"

Client Initialization:

import pytd

# Using environment variables
client = pytd.Client(database='sample_datasets')

# Explicit credentials (not recommended for production)
client = pytd.Client(
    apikey='your_api_key',
    endpoint='https://api.treasuredata.com/',
    database='your_database',
    default_engine='presto'  # or 'hive'
)

Configuration Options:

  • apikey: TD API key (read from TD_API_KEY env var if not specified)
  • endpoint: TD API server URL (read from TD_API_SERVER env var)
  • database: Default database name for queries
  • default_engine: Query engine - 'presto' (default) or 'hive'

Regional Endpoints:

  • US: https://api.treasuredata.com/
  • Tokyo: https://api.treasuredata.co.jp/
  • EU: https://api.eu01.treasuredata.com/

3. Querying Data

Basic Query Execution

import pytd

client = pytd.Client(database='sample_datasets')

# Execute Presto query
result = client.query('SELECT symbol, COUNT(1) as cnt FROM nasdaq GROUP BY symbol LIMIT 10')

# Result format: {'columns': ['symbol', 'cnt'], 'data': [['AAIT', 590], ['AAL', 82], ...]}
print(result['columns'])  # ['symbol', 'cnt']
print(result['data'])     # [['AAIT', 590], ['AAL', 82], ...]

Query with Hive Engine

# Create Hive client
client_hive = pytd.Client(
    database='sample_datasets',
    default_engine='hive'
)

# Execute Hive query
result = client_hive.query('SELECT hivemall_version()')

Convert Results to pandas DataFrame

import pandas as pd

result = client.query('SELECT * FROM nasdaq LIMIT 100')

# Convert to DataFrame
df = pd.DataFrame(result['data'], columns=result['columns'])
print(df.head())

4. Writing Data to TD

Load DataFrame to Table

import pandas as pd
import pytd

# Create sample DataFrame
df = pd.DataFrame({
    'user_id': [1, 2, 3, 4],
    'event_name': ['login', 'purchase', 'logout', 'login'],
    'amount': [None, 99.99, None, None],
    'timestamp': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-02', '2024-01-03'])
})

# Initialize client
client = pytd.Client(database='your_database')

# Upload DataFrame
client.load_table_from_dataframe(
    df,
    'events',  # table name
    writer='bulk_import',  # writer type
    if_exists='overwrite'  # or 'append', 'error', 'ignore'
)

Parameters:

  • df: pandas DataFrame to upload
  • table: Target table name (can be 'database.table' or just 'table')
  • writer: Import method - 'bulk_import' (default), 'insert_into', or 'spark'
  • if_exists: What to do if table exists - 'error' (default), 'overwrite', 'append', or 'ignore'

Common Patterns

Pattern 1: ETL Pipeline - Query, Transform, Load

import pytd
import pandas as pd

# Initialize client
client = pytd.Client(database='analytics')

# Step 1: Extract - Query data from TD
query = """
    SELECT
        user_id,
        event_name,
        event_date,
        COUNT(*) as event_count
    FROM raw_events
    WHERE TD_INTERVAL(time, '-1d', 'JST')
    GROUP BY user_id, event_name, event_date
"""

result = client.query(query)
df = pd.DataFrame(result['data'], columns=result['columns'])

# Step 2: Transform - Process data with pandas
df['event_date'] = pd.to_datetime(df['event_date'])
df['is_weekend'] = df['event_date'].dt.dayofweek >= 5
df['event_count_log'] = df['event_count'].apply(lambda x: pd.np.log1p(x))

# Add metadata
df['processed_at'] = pd.Timestamp.now()
df['pipeline_version'] = '1.0'

# Step 3: Load - Write back to TD
client.load_table_from_dataframe(
    df,
    'analytics.user_daily_events',
    writer='bulk_import',
    if_exists='append'
)

print(f"Loaded {len(df)} rows to user_daily_events")

Explanation: Complete ETL workflow that extracts yesterday's data, performs pandas transformations, and loads results back to TD. Uses bulk_import for efficient loading.

Pattern 2: Incremental Data Loading

import pytd
import pandas as pd
from datetime import datetime, timedelta

client = pytd.Client(database='sales')

def load_incremental_data(source_file, table_name, date_column='import_date'):
    """Load new data incrementally, avoiding duplicates"""

    # Read new data from source
    new_data = pd.read_csv(source_file)
    new_data[date_column] = datetime.now()

    # Get max date from existing table
    try:
        result = client.query(f"""
            SELECT MAX({date_column}) as max_date
            FROM {table_name}
        """)

        max_date = result['data'][0][0] if result['data'][0][0] else None

        if max_date:
            # Filter only new records
            new_data = new_data[new_data[date_column] > max_date]
            print(f"Loading {len(new_data)} new records after {max_date}")
        else:
            print(f"Table empty, loading all {len(new_data)} records")

    except Exception as e:
        # Table doesn't exist yet
        print(f"Creating new table with {len(new_data)} records")

    if len(new_data) > 0:
        client.load_table_from_dataframe(
            new_data,
            table_name,
            writer='bulk_import',
            if_exists='append'
        )
        print("Load complete")
    else:
        print("No new data to load")

# Usage
load_incremental_data('daily_sales.csv', 'sales.transactions')

Explanation: Implements incremental loading by checking the latest timestamp in the target table and only loading newer records. Handles first-time loads gracefully.

Pattern 3: Large Result Set Processing with DB-API

import pytd
from pytd.dbapi import connect

client = pytd.Client(database='large_dataset')

# Create DB-API connection
conn = connect(client)
cursor = conn.cursor()

# Execute query that might timeout with standard query()
cursor.execute("""
    SELECT user_id, event_name, event_time, properties
    FROM events
    WHERE TD_INTERVAL(time, '-7d', 'JST')
""")

# Process results iteratively (memory efficient)
batch_size = 10000
processed_count = 0

while True:
    rows = cursor.fetchmany(batch_size)
    if not rows:
        break

    # Process batch
    for row in rows:
        user_id, event_name, event_time, properties = row
        # Process each row
        process_event(user_id, event_name, event_time, properties)

    processed_count += len(rows)
    print(f"Processed {processed_count} rows...")

cursor.close()
conn.close()

print(f"Total processed: {processed_count} rows")

Explanation: Uses DB-API for iterative retrieval of large result sets. Prevents memory issues and query timeouts by fetching data in batches. Essential for processing millions of rows.

Pattern 4: Multi-Database Operations

import pytd
import pandas as pd

# Connect to different databases
source_client = pytd.Client(database='raw_data')
target_client = pytd.Client(database='analytics')

# Query from source database
query = """
    SELECT
        customer_id,
        product_id,
        purchase_date,
        amount
    FROM purchases
    WHERE TD_INTERVAL(time, '-1d', 'JST')
"""

result = source_client.query(query)
df = pd.DataFrame(result['data'], columns=result['columns'])

# Enrich data by querying another source
product_query = "SELECT product_id, product_name, category FROM products"
products_result = source_client.query(product_query)
products_df = pd.DataFrame(products_result['data'], columns=products_result['columns'])

# Join data
enriched_df = df.merge(products_df, on='product_id', how='left')

# Calculate metrics
daily_summary = enriched_df.groupby(['category', 'purchase_date']).agg({
    'amount': ['sum', 'mean', 'count'],
    'customer_id': 'nunique'
}).reset_index()

daily_summary.columns = ['category', 'date', 'total_sales', 'avg_sale', 'transaction_count', 'unique_customers']

# Write to analytics database
target_client.load_table_from_dataframe(
    daily_summary,
    'daily_category_sales',
    writer='bulk_import',
    if_exists='append'
)

print(f"Loaded {len(daily_summary)} rows to analytics.daily_category_sales")

Explanation: Demonstrates working with multiple databases, joining data, performing aggregations, and writing to a different target database.

Pattern 5: Handling Time-based Data with TD Functions

import pytd
import pandas as pd
from datetime import datetime

client = pytd.Client(database='events')

# Query with TD time functions
query = """
    SELECT
        TD_TIME_FORMAT(time, 'yyyy-MM-dd', 'JST') as date_jst,
        COUNT(*) as event_count,
        COUNT(DISTINCT user_id) as unique_users,
        APPROX_PERCENTILE(session_duration, 0.5) as median_duration,
        APPROX_PERCENTILE(session_duration, 0.95) as p95_duration
    FROM user_sessions
    WHERE TD_INTERVAL(time, '-7d', 'JST')
    GROUP BY 1
    ORDER BY 1 DESC
"""

result = client.query(query)
df = pd.DataFrame(result['data'], columns=result['columns'])

# Convert date strings to datetime
df['date_jst'] = pd.to_datetime(df['date_jst'])

# Add derived metrics
df['events_per_user'] = df['event_count'] / df['unique_users']

# Write summary back
client.load_table_from_dataframe(
    df,
    'weekly_session_summary',
    writer='bulk_import',
    if_exists='overwrite'
)

Explanation: Shows proper use of TD time functions (TD_INTERVAL, TD_TIME_FORMAT) in queries and how to handle the results in pandas.

Writer Types Comparison

pytd supports three writer methods for loading data:

1. bulk_import (Default - Recommended)

Best for: Most use cases, especially large datasets

client.load_table_from_dataframe(
    df,
    'table_name',
    writer='bulk_import',
    if_exists='append'
)

Characteristics:

  • ✓ Scalable to large datasets
  • ✓ Memory efficient (streams data)
  • ✓ No special permissions required
  • ✓ Best balance of performance and simplicity
  • ✗ Slower than Spark for very large datasets
  • Uses CSV format internally

When to use: Default choice for most data loads (100s of MB to GBs)

2. insert_into

Best for: Small datasets, real-time updates

client.load_table_from_dataframe(
    df,
    'table_name',
    writer='insert_into',
    if_exists='append'
)

Characteristics:

  • ✓ Simple, no dependencies
  • ✓ Good for small datasets (<1000 rows)
  • ✗ Not scalable (issues individual INSERT queries)
  • ✗ Slow for large datasets
  • ✗ Uses Presto query capacity
  • Uses Presto INSERT INTO statements

When to use: Only for small datasets or when you need immediate writes without bulk import delay

3. spark (High Performance)

Best for: Very large datasets, high-performance pipelines

from pytd.writer import SparkWriter

writer = SparkWriter(
    td_spark_path='/path/to/td-spark-assembly.jar'  # Optional
)

client.load_table_from_dataframe(
    df,
    'table_name',
    writer=writer,
    if_exists='append'
)

Characteristics:

  • ✓ Highest performance
  • ✓ Direct writes to Plazma storage
  • ✓ Best for very large datasets (10s of GBs+)
  • ✗ Requires pytd[spark] installation
  • ✗ Requires Plazma Public API access (contact support)
  • ✗ Additional dependencies

When to use: Large-scale data pipelines requiring maximum throughput

Enabling Spark Writer:

  1. Install: pip install pytd[spark]
  2. Contact support@treasuredata.com to enable Plazma Public API access
  3. (Optional) Download td-spark JAR for custom versions

Best Practices

  1. Use Environment Variables for Credentials

    export TD_API_KEY="your_api_key"
    export TD_API_SERVER="https://api.treasuredata.com/"
    

    Never hardcode API keys in scripts

  2. Choose the Right Writer

    • bulk_import: Default choice for most scenarios
    • insert_into: Only for small datasets (<1000 rows)
    • spark: For very large datasets with proper setup
  3. Use TD Time Functions in Queries

    # Good: Uses partition pruning
    query = "SELECT * FROM table WHERE TD_INTERVAL(time, '-1d', 'JST')"
    
    # Avoid: Scans entire table
    query = "SELECT * FROM table WHERE date = '2024-01-01'"
    
  4. Handle Large Results with DB-API Use pytd.dbapi for queries returning millions of rows to avoid memory issues

  5. Specify Database in Table Name

    # Explicit database (recommended)
    client.load_table_from_dataframe(df, 'database.table')
    
    # Uses client's default database
    client.load_table_from_dataframe(df, 'table')
    
  6. Add Time Column for Partitioning

    df['time'] = pd.to_datetime(df['timestamp']).astype(int) // 10**9
    client.load_table_from_dataframe(df, 'table')
    
  7. Use Presto for Analytics, Hive for Special Functions

    • Presto: Faster for most analytical queries
    • Hive: Required for Hivemall, UDFs, some advanced features
  8. Batch Processing for Large ETL Process data in chunks to avoid memory issues:

    for chunk in pd.read_csv('large_file.csv', chunksize=100000):
        # Process chunk
        client.load_table_from_dataframe(chunk, 'table', if_exists='append')
    
  9. Error Handling

    try:
        result = client.query(query)
    except Exception as e:
        print(f"Query failed: {e}")
        # Handle error appropriately
    
  10. Close Connections in Long-Running Scripts

    from pytd.dbapi import connect
    
    conn = connect(client)
    try:
        # Use connection
        cursor = conn.cursor()
        cursor.execute(query)
        # Process results
    finally:
        conn.close()
    

Common Issues and Solutions

Issue: Import Errors or Module Not Found

Symptoms:

  • ModuleNotFoundError: No module named 'pytd'
  • ImportError: cannot import name 'SparkWriter'

Solutions:

  1. Verify Installation

    pip list | grep pytd
    
  2. Install/Upgrade pytd

    pip install --upgrade pytd
    
  3. For Spark Support

    pip install pytd[spark]
    
  4. Check Python Version

    python --version  # Should be 3.9+
    

Issue: Authentication Errors

Symptoms:

  • Unauthorized: Invalid API key
  • 403 Forbidden

Solutions:

  1. Verify Environment Variables

    echo $TD_API_KEY
    echo $TD_API_SERVER
    
  2. Check API Key Format

    # Verify API key is set correctly
    import os
    print(os.getenv('TD_API_KEY'))
    
  3. Verify Regional Endpoint

    # US
    endpoint = 'https://api.treasuredata.com/'
    # Tokyo
    endpoint = 'https://api.treasuredata.co.jp/'
    # EU
    endpoint = 'https://api.eu01.treasuredata.com/'
    
  4. Check API Key Permissions

    • Ensure key has appropriate read/write permissions
    • Regenerate key if necessary from TD console

Issue: Query Timeout or Memory Errors

Symptoms:

  • Query times out after several minutes
  • MemoryError when fetching large results
  • Connection drops during query execution

Solutions:

  1. Use DB-API for Large Results

    from pytd.dbapi import connect
    
    conn = connect(client)
    cursor = conn.cursor()
    cursor.execute(query)
    
    # Fetch in batches
    for row in cursor.fetchmany(10000):
        process(row)
    
  2. Add Time Filters for Partition Pruning

    query = """
        SELECT * FROM large_table
        WHERE TD_INTERVAL(time, '-1d', 'JST')  -- Add this!
    """
    
  3. Limit Result Size

    query = "SELECT * FROM table WHERE ... LIMIT 100000"
    
  4. Use Aggregations Instead of Raw Data

    # Instead of fetching all rows
    query = "SELECT * FROM table"
    
    # Aggregate first
    query = """
        SELECT date, user_id, COUNT(*) as cnt
        FROM table
        GROUP BY 1, 2
    """
    

Issue: DataFrame Upload Fails

Symptoms:

  • ValueError: DataFrame is empty
  • Type errors during upload
  • Data corruption in uploaded table

Solutions:

  1. Check DataFrame is Not Empty

    if df.empty:
        print("DataFrame is empty, skipping upload")
    else:
        client.load_table_from_dataframe(df, 'table')
    
  2. Handle Data Types Properly

    # Convert timestamps to Unix epoch
    df['time'] = pd.to_datetime(df['timestamp']).astype(int) // 10**9
    
    # Handle NaN values
    df['amount'] = df['amount'].fillna(0)
    
    # Convert to appropriate types
    df['user_id'] = df['user_id'].astype(str)
    df['count'] = df['count'].astype(int)
    
  3. Check Column Names

    # TD column names should be lowercase and use underscores
    df.columns = df.columns.str.lower().str.replace(' ', '_')
    
  4. Remove Invalid Characters

    # Remove or replace problematic characters
    df = df.applymap(lambda x: str(x).replace('\x00', '') if isinstance(x, str) else x)
    
  5. Try Different Writer

    # If bulk_import fails, try insert_into for debugging
    client.load_table_from_dataframe(
        df.head(10),  # Test with small sample
        'table',
        writer='insert_into'
    )
    

Issue: Spark Writer Not Working

Symptoms:

  • ImportError: Spark writer not available
  • Spark job fails
  • Permission denied errors

Solutions:

  1. Install Spark Dependencies

    pip install pytd[spark]
    
  2. Enable Plazma Public API

    • Contact support@treasuredata.com
    • Request Plazma Public API access for your account
  3. Specify JAR Path (if needed)

    from pytd.writer import SparkWriter
    
    writer = SparkWriter(
        td_spark_path='/path/to/td-spark-assembly.jar'
    )
    
  4. Check Permissions

    • Ensure API key has write access to target database
    • Verify Plazma access is enabled

Advanced Topics

Custom Query Options

# Query with custom parameters
result = client.query(
    'SELECT * FROM table',
    engine='presto',
    priority=1,  # Higher priority (1-2, default 0)
    retry_limit=3
)

Working with Job Status

# Start query asynchronously
job = client.query('SELECT COUNT(*) FROM large_table', wait=False)

# Check job status
print(f"Job ID: {job.job_id}")
print(f"Status: {job.status()}")

# Wait for completion
job.wait()

# Get results
if job.success():
    result = job.result()
else:
    print(f"Job failed: {job.error()}")

Custom Writers

from pytd.writer import BulkImportWriter

# Configure writer with custom options
writer = BulkImportWriter(
    chunk_size=10000,  # Rows per chunk
    time_column='time'  # Specify time column
)

client.load_table_from_dataframe(
    df,
    'table',
    writer=writer,
    if_exists='append'
)

Migrating from pandas-td

If you have existing code using the deprecated pandas-td library:

Before (pandas-td):

import pandas_td as td

con = td.connect(apikey='your_api_key', endpoint='https://api.treasuredata.com/')
df = td.read_td('SELECT * FROM sample_datasets.nasdaq', con)

After (pytd):

import pytd.pandas_td as td

con = td.connect(apikey='your_api_key', endpoint='https://api.treasuredata.com/')
df = td.read_td('SELECT * FROM sample_datasets.nasdaq', con)

Or use the modern pytd API:

import pytd
import pandas as pd

client = pytd.Client(database='sample_datasets')
result = client.query('SELECT * FROM nasdaq')
df = pd.DataFrame(result['data'], columns=result['columns'])

Testing and Development

Test Connection

import pytd

try:
    client = pytd.Client(database='sample_datasets')
    result = client.query('SELECT 1 as test')
    print("Connection successful!")
    print(result)
except Exception as e:
    print(f"Connection failed: {e}")

Verify Data Upload

import pandas as pd
import pytd

# Create test data
test_df = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie'],
    'value': [100, 200, 300],
    'time': [1704067200, 1704153600, 1704240000]  # Unix timestamps
})

client = pytd.Client(database='test_db')

# Upload
print("Uploading test data...")
client.load_table_from_dataframe(
    test_df,
    'test_table',
    writer='bulk_import',
    if_exists='overwrite'
)

# Verify
print("Verifying upload...")
result = client.query('SELECT * FROM test_table ORDER BY id')
verify_df = pd.DataFrame(result['data'], columns=result['columns'])

print("\nUploaded data:")
print(verify_df)

# Check counts match
assert len(test_df) == len(verify_df), "Row count mismatch!"
print("\nVerification successful!")

Performance Testing

import pytd
import pandas as pd
import time

client = pytd.Client(database='test_db')

# Generate test data
df = pd.DataFrame({
    'id': range(100000),
    'value': range(100000),
    'time': int(time.time())
})

# Test bulk_import
start = time.time()
client.load_table_from_dataframe(df, 'perf_test_bulk', writer='bulk_import', if_exists='overwrite')
bulk_time = time.time() - start
print(f"bulk_import: {bulk_time:.2f}s for {len(df)} rows")

# Test insert_into (small sample only!)
small_df = df.head(100)
start = time.time()
client.load_table_from_dataframe(small_df, 'perf_test_insert', writer='insert_into', if_exists='overwrite')
insert_time = time.time() - start
print(f"insert_into: {insert_time:.2f}s for {len(small_df)} rows")

Jupyter Notebook Integration

pytd works seamlessly with Jupyter notebooks:

# Notebook cell 1: Setup
import pytd
import pandas as pd
import matplotlib.pyplot as plt

client = pytd.Client(database='analytics')

# Notebook cell 2: Query data
query = """
    SELECT
        TD_TIME_FORMAT(time, 'yyyy-MM-dd', 'JST') as date,
        COUNT(*) as events
    FROM user_events
    WHERE TD_INTERVAL(time, '-30d', 'JST')
    GROUP BY 1
    ORDER BY 1
"""

result = client.query(query)
df = pd.DataFrame(result['data'], columns=result['columns'])
df['date'] = pd.to_datetime(df['date'])

# Notebook cell 3: Visualize
plt.figure(figsize=(12, 6))
plt.plot(df['date'], df['events'])
plt.title('Daily Events - Last 30 Days')
plt.xlabel('Date')
plt.ylabel('Event Count')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# Notebook cell 4: Write results back
summary = df.describe()
# Process and save summary back to TD if needed

Resources

Related Skills

  • trino: Understanding Trino SQL syntax for queries in pytd
  • hive: Using Hive-specific functions and syntax
  • digdag: Orchestrating Python scripts using pytd in workflows
  • td-javascript-sdk: Browser-based data collection (frontend) vs pytd (backend/analytics)

Comparison with Other Tools

Tool Purpose When to Use
pytd Full-featured Python driver Analytics, data pipelines, pandas integration
td-client-python Basic REST API wrapper Simple CRUD, when pytd is too heavy
pandas-td (deprecated) Legacy pandas integration Don't use - migrate to pytd
TD Toolbelt CLI tool Command-line operations, shell scripts

Recommendation: Use pytd for all Python-based analytical work and ETL pipelines. Use td-client-python only for basic REST API operations.


Last updated: 2025-01 | pytd version: Latest (Python 3.9+)