| name | pytd |
| description | Expert assistance for using pytd (Python SDK) to query and import data with Treasure Data. Use this skill when users need help with Python-based data analysis, querying Presto/Hive, importing pandas DataFrames, bulk data uploads, or integrating TD with Python analytical workflows. |
pytd - Treasure Data Python SDK
Expert assistance for querying and importing data to Treasure Data using pytd, the official Python driver for analytical workflows.
When to Use This Skill
Use this skill when:
- Querying Treasure Data from Python scripts or Jupyter notebooks
- Importing pandas DataFrames to TD tables
- Running Presto or Hive queries from Python
- Building data pipelines with Python and TD
- Performing bulk data imports or exports
- Migrating from deprecated pandas-td library
- Integrating TD with Python data science workflows
- Handling large result sets with iterative retrieval
Core Principles
1. Installation
Standard Installation:
pip install pytd
Requirements:
- Python 3.9 or later
- pandas 2.0 or later
2. Authentication & Configuration
Environment Variables (Recommended):
export TD_API_KEY="your_api_key_here"
export TD_API_SERVER="https://api.treasuredata.com/"
Client Initialization:
import pytd
# Using environment variables
client = pytd.Client(database='sample_datasets')
# Explicit credentials (not recommended for production)
client = pytd.Client(
apikey='your_api_key',
endpoint='https://api.treasuredata.com/',
database='your_database',
default_engine='presto' # or 'hive'
)
Configuration Options:
apikey: TD API key (read fromTD_API_KEYenv var if not specified)endpoint: TD API server URL (read fromTD_API_SERVERenv var)database: Default database name for queriesdefault_engine: Query engine -'presto'(default) or'hive'
Regional Endpoints:
- US:
https://api.treasuredata.com/ - Tokyo:
https://api.treasuredata.co.jp/ - EU:
https://api.eu01.treasuredata.com/
3. Querying Data
Basic Query Execution
import pytd
client = pytd.Client(database='sample_datasets')
# Execute Presto query
result = client.query('SELECT symbol, COUNT(1) as cnt FROM nasdaq GROUP BY symbol LIMIT 10')
# Result format: {'columns': ['symbol', 'cnt'], 'data': [['AAIT', 590], ['AAL', 82], ...]}
print(result['columns']) # ['symbol', 'cnt']
print(result['data']) # [['AAIT', 590], ['AAL', 82], ...]
Query with Hive Engine
# Create Hive client
client_hive = pytd.Client(
database='sample_datasets',
default_engine='hive'
)
# Execute Hive query
result = client_hive.query('SELECT hivemall_version()')
Convert Results to pandas DataFrame
import pandas as pd
result = client.query('SELECT * FROM nasdaq LIMIT 100')
# Convert to DataFrame
df = pd.DataFrame(result['data'], columns=result['columns'])
print(df.head())
4. Writing Data to TD
Load DataFrame to Table
import pandas as pd
import pytd
# Create sample DataFrame
df = pd.DataFrame({
'user_id': [1, 2, 3, 4],
'event_name': ['login', 'purchase', 'logout', 'login'],
'amount': [None, 99.99, None, None],
'timestamp': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-02', '2024-01-03'])
})
# Initialize client
client = pytd.Client(database='your_database')
# Upload DataFrame
client.load_table_from_dataframe(
df,
'events', # table name
writer='bulk_import', # writer type
if_exists='overwrite' # or 'append', 'error', 'ignore'
)
Parameters:
df: pandas DataFrame to uploadtable: Target table name (can be'database.table'or just'table')writer: Import method -'bulk_import'(default),'insert_into', or'spark'if_exists: What to do if table exists -'error'(default),'overwrite','append', or'ignore'
Common Patterns
Pattern 1: ETL Pipeline - Query, Transform, Load
import pytd
import pandas as pd
# Initialize client
client = pytd.Client(database='analytics')
# Step 1: Extract - Query data from TD
query = """
SELECT
user_id,
event_name,
event_date,
COUNT(*) as event_count
FROM raw_events
WHERE TD_INTERVAL(time, '-1d', 'JST')
GROUP BY user_id, event_name, event_date
"""
result = client.query(query)
df = pd.DataFrame(result['data'], columns=result['columns'])
# Step 2: Transform - Process data with pandas
df['event_date'] = pd.to_datetime(df['event_date'])
df['is_weekend'] = df['event_date'].dt.dayofweek >= 5
df['event_count_log'] = df['event_count'].apply(lambda x: pd.np.log1p(x))
# Add metadata
df['processed_at'] = pd.Timestamp.now()
df['pipeline_version'] = '1.0'
# Step 3: Load - Write back to TD
client.load_table_from_dataframe(
df,
'analytics.user_daily_events',
writer='bulk_import',
if_exists='append'
)
print(f"Loaded {len(df)} rows to user_daily_events")
Explanation: Complete ETL workflow that extracts yesterday's data, performs pandas transformations, and loads results back to TD. Uses bulk_import for efficient loading.
Pattern 2: Incremental Data Loading
import pytd
import pandas as pd
from datetime import datetime, timedelta
client = pytd.Client(database='sales')
def load_incremental_data(source_file, table_name, date_column='import_date'):
"""Load new data incrementally, avoiding duplicates"""
# Read new data from source
new_data = pd.read_csv(source_file)
new_data[date_column] = datetime.now()
# Get max date from existing table
try:
result = client.query(f"""
SELECT MAX({date_column}) as max_date
FROM {table_name}
""")
max_date = result['data'][0][0] if result['data'][0][0] else None
if max_date:
# Filter only new records
new_data = new_data[new_data[date_column] > max_date]
print(f"Loading {len(new_data)} new records after {max_date}")
else:
print(f"Table empty, loading all {len(new_data)} records")
except Exception as e:
# Table doesn't exist yet
print(f"Creating new table with {len(new_data)} records")
if len(new_data) > 0:
client.load_table_from_dataframe(
new_data,
table_name,
writer='bulk_import',
if_exists='append'
)
print("Load complete")
else:
print("No new data to load")
# Usage
load_incremental_data('daily_sales.csv', 'sales.transactions')
Explanation: Implements incremental loading by checking the latest timestamp in the target table and only loading newer records. Handles first-time loads gracefully.
Pattern 3: Large Result Set Processing with DB-API
import pytd
from pytd.dbapi import connect
client = pytd.Client(database='large_dataset')
# Create DB-API connection
conn = connect(client)
cursor = conn.cursor()
# Execute query that might timeout with standard query()
cursor.execute("""
SELECT user_id, event_name, event_time, properties
FROM events
WHERE TD_INTERVAL(time, '-7d', 'JST')
""")
# Process results iteratively (memory efficient)
batch_size = 10000
processed_count = 0
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
# Process batch
for row in rows:
user_id, event_name, event_time, properties = row
# Process each row
process_event(user_id, event_name, event_time, properties)
processed_count += len(rows)
print(f"Processed {processed_count} rows...")
cursor.close()
conn.close()
print(f"Total processed: {processed_count} rows")
Explanation: Uses DB-API for iterative retrieval of large result sets. Prevents memory issues and query timeouts by fetching data in batches. Essential for processing millions of rows.
Pattern 4: Multi-Database Operations
import pytd
import pandas as pd
# Connect to different databases
source_client = pytd.Client(database='raw_data')
target_client = pytd.Client(database='analytics')
# Query from source database
query = """
SELECT
customer_id,
product_id,
purchase_date,
amount
FROM purchases
WHERE TD_INTERVAL(time, '-1d', 'JST')
"""
result = source_client.query(query)
df = pd.DataFrame(result['data'], columns=result['columns'])
# Enrich data by querying another source
product_query = "SELECT product_id, product_name, category FROM products"
products_result = source_client.query(product_query)
products_df = pd.DataFrame(products_result['data'], columns=products_result['columns'])
# Join data
enriched_df = df.merge(products_df, on='product_id', how='left')
# Calculate metrics
daily_summary = enriched_df.groupby(['category', 'purchase_date']).agg({
'amount': ['sum', 'mean', 'count'],
'customer_id': 'nunique'
}).reset_index()
daily_summary.columns = ['category', 'date', 'total_sales', 'avg_sale', 'transaction_count', 'unique_customers']
# Write to analytics database
target_client.load_table_from_dataframe(
daily_summary,
'daily_category_sales',
writer='bulk_import',
if_exists='append'
)
print(f"Loaded {len(daily_summary)} rows to analytics.daily_category_sales")
Explanation: Demonstrates working with multiple databases, joining data, performing aggregations, and writing to a different target database.
Pattern 5: Handling Time-based Data with TD Functions
import pytd
import pandas as pd
from datetime import datetime
client = pytd.Client(database='events')
# Query with TD time functions
query = """
SELECT
TD_TIME_FORMAT(time, 'yyyy-MM-dd', 'JST') as date_jst,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users,
APPROX_PERCENTILE(session_duration, 0.5) as median_duration,
APPROX_PERCENTILE(session_duration, 0.95) as p95_duration
FROM user_sessions
WHERE TD_INTERVAL(time, '-7d', 'JST')
GROUP BY 1
ORDER BY 1 DESC
"""
result = client.query(query)
df = pd.DataFrame(result['data'], columns=result['columns'])
# Convert date strings to datetime
df['date_jst'] = pd.to_datetime(df['date_jst'])
# Add derived metrics
df['events_per_user'] = df['event_count'] / df['unique_users']
# Write summary back
client.load_table_from_dataframe(
df,
'weekly_session_summary',
writer='bulk_import',
if_exists='overwrite'
)
Explanation: Shows proper use of TD time functions (TD_INTERVAL, TD_TIME_FORMAT) in queries and how to handle the results in pandas.
Writer Types Comparison
pytd supports three writer methods for loading data:
1. bulk_import (Default - Recommended)
Best for: Most use cases, especially large datasets
client.load_table_from_dataframe(
df,
'table_name',
writer='bulk_import',
if_exists='append'
)
Characteristics:
- ✓ Scalable to large datasets
- ✓ Memory efficient (streams data)
- ✓ No special permissions required
- ✓ Best balance of performance and simplicity
- ✗ Slower than Spark for very large datasets
- Uses CSV format internally
When to use: Default choice for most data loads (100s of MB to GBs)
2. insert_into
Best for: Small datasets, real-time updates
client.load_table_from_dataframe(
df,
'table_name',
writer='insert_into',
if_exists='append'
)
Characteristics:
- ✓ Simple, no dependencies
- ✓ Good for small datasets (<1000 rows)
- ✗ Not scalable (issues individual INSERT queries)
- ✗ Slow for large datasets
- ✗ Uses Presto query capacity
- Uses Presto INSERT INTO statements
When to use: Only for small datasets or when you need immediate writes without bulk import delay
3. spark (High Performance)
Best for: Very large datasets, high-performance pipelines
from pytd.writer import SparkWriter
writer = SparkWriter(
td_spark_path='/path/to/td-spark-assembly.jar' # Optional
)
client.load_table_from_dataframe(
df,
'table_name',
writer=writer,
if_exists='append'
)
Characteristics:
- ✓ Highest performance
- ✓ Direct writes to Plazma storage
- ✓ Best for very large datasets (10s of GBs+)
- ✗ Requires
pytd[spark]installation - ✗ Requires Plazma Public API access (contact support)
- ✗ Additional dependencies
When to use: Large-scale data pipelines requiring maximum throughput
Enabling Spark Writer:
- Install:
pip install pytd[spark] - Contact
support@treasuredata.comto enable Plazma Public API access - (Optional) Download td-spark JAR for custom versions
Best Practices
Use Environment Variables for Credentials
export TD_API_KEY="your_api_key" export TD_API_SERVER="https://api.treasuredata.com/"Never hardcode API keys in scripts
Choose the Right Writer
bulk_import: Default choice for most scenariosinsert_into: Only for small datasets (<1000 rows)spark: For very large datasets with proper setup
Use TD Time Functions in Queries
# Good: Uses partition pruning query = "SELECT * FROM table WHERE TD_INTERVAL(time, '-1d', 'JST')" # Avoid: Scans entire table query = "SELECT * FROM table WHERE date = '2024-01-01'"Handle Large Results with DB-API Use
pytd.dbapifor queries returning millions of rows to avoid memory issuesSpecify Database in Table Name
# Explicit database (recommended) client.load_table_from_dataframe(df, 'database.table') # Uses client's default database client.load_table_from_dataframe(df, 'table')Add Time Column for Partitioning
df['time'] = pd.to_datetime(df['timestamp']).astype(int) // 10**9 client.load_table_from_dataframe(df, 'table')Use Presto for Analytics, Hive for Special Functions
- Presto: Faster for most analytical queries
- Hive: Required for Hivemall, UDFs, some advanced features
Batch Processing for Large ETL Process data in chunks to avoid memory issues:
for chunk in pd.read_csv('large_file.csv', chunksize=100000): # Process chunk client.load_table_from_dataframe(chunk, 'table', if_exists='append')Error Handling
try: result = client.query(query) except Exception as e: print(f"Query failed: {e}") # Handle error appropriatelyClose Connections in Long-Running Scripts
from pytd.dbapi import connect conn = connect(client) try: # Use connection cursor = conn.cursor() cursor.execute(query) # Process results finally: conn.close()
Common Issues and Solutions
Issue: Import Errors or Module Not Found
Symptoms:
ModuleNotFoundError: No module named 'pytd'ImportError: cannot import name 'SparkWriter'
Solutions:
Verify Installation
pip list | grep pytdInstall/Upgrade pytd
pip install --upgrade pytdFor Spark Support
pip install pytd[spark]Check Python Version
python --version # Should be 3.9+
Issue: Authentication Errors
Symptoms:
Unauthorized: Invalid API key403 Forbidden
Solutions:
Verify Environment Variables
echo $TD_API_KEY echo $TD_API_SERVERCheck API Key Format
# Verify API key is set correctly import os print(os.getenv('TD_API_KEY'))Verify Regional Endpoint
# US endpoint = 'https://api.treasuredata.com/' # Tokyo endpoint = 'https://api.treasuredata.co.jp/' # EU endpoint = 'https://api.eu01.treasuredata.com/'Check API Key Permissions
- Ensure key has appropriate read/write permissions
- Regenerate key if necessary from TD console
Issue: Query Timeout or Memory Errors
Symptoms:
- Query times out after several minutes
MemoryErrorwhen fetching large results- Connection drops during query execution
Solutions:
Use DB-API for Large Results
from pytd.dbapi import connect conn = connect(client) cursor = conn.cursor() cursor.execute(query) # Fetch in batches for row in cursor.fetchmany(10000): process(row)Add Time Filters for Partition Pruning
query = """ SELECT * FROM large_table WHERE TD_INTERVAL(time, '-1d', 'JST') -- Add this! """Limit Result Size
query = "SELECT * FROM table WHERE ... LIMIT 100000"Use Aggregations Instead of Raw Data
# Instead of fetching all rows query = "SELECT * FROM table" # Aggregate first query = """ SELECT date, user_id, COUNT(*) as cnt FROM table GROUP BY 1, 2 """
Issue: DataFrame Upload Fails
Symptoms:
ValueError: DataFrame is empty- Type errors during upload
- Data corruption in uploaded table
Solutions:
Check DataFrame is Not Empty
if df.empty: print("DataFrame is empty, skipping upload") else: client.load_table_from_dataframe(df, 'table')Handle Data Types Properly
# Convert timestamps to Unix epoch df['time'] = pd.to_datetime(df['timestamp']).astype(int) // 10**9 # Handle NaN values df['amount'] = df['amount'].fillna(0) # Convert to appropriate types df['user_id'] = df['user_id'].astype(str) df['count'] = df['count'].astype(int)Check Column Names
# TD column names should be lowercase and use underscores df.columns = df.columns.str.lower().str.replace(' ', '_')Remove Invalid Characters
# Remove or replace problematic characters df = df.applymap(lambda x: str(x).replace('\x00', '') if isinstance(x, str) else x)Try Different Writer
# If bulk_import fails, try insert_into for debugging client.load_table_from_dataframe( df.head(10), # Test with small sample 'table', writer='insert_into' )
Issue: Spark Writer Not Working
Symptoms:
ImportError: Spark writer not available- Spark job fails
- Permission denied errors
Solutions:
Install Spark Dependencies
pip install pytd[spark]Enable Plazma Public API
- Contact
support@treasuredata.com - Request Plazma Public API access for your account
- Contact
Specify JAR Path (if needed)
from pytd.writer import SparkWriter writer = SparkWriter( td_spark_path='/path/to/td-spark-assembly.jar' )Check Permissions
- Ensure API key has write access to target database
- Verify Plazma access is enabled
Advanced Topics
Custom Query Options
# Query with custom parameters
result = client.query(
'SELECT * FROM table',
engine='presto',
priority=1, # Higher priority (1-2, default 0)
retry_limit=3
)
Working with Job Status
# Start query asynchronously
job = client.query('SELECT COUNT(*) FROM large_table', wait=False)
# Check job status
print(f"Job ID: {job.job_id}")
print(f"Status: {job.status()}")
# Wait for completion
job.wait()
# Get results
if job.success():
result = job.result()
else:
print(f"Job failed: {job.error()}")
Custom Writers
from pytd.writer import BulkImportWriter
# Configure writer with custom options
writer = BulkImportWriter(
chunk_size=10000, # Rows per chunk
time_column='time' # Specify time column
)
client.load_table_from_dataframe(
df,
'table',
writer=writer,
if_exists='append'
)
Migrating from pandas-td
If you have existing code using the deprecated pandas-td library:
Before (pandas-td):
import pandas_td as td
con = td.connect(apikey='your_api_key', endpoint='https://api.treasuredata.com/')
df = td.read_td('SELECT * FROM sample_datasets.nasdaq', con)
After (pytd):
import pytd.pandas_td as td
con = td.connect(apikey='your_api_key', endpoint='https://api.treasuredata.com/')
df = td.read_td('SELECT * FROM sample_datasets.nasdaq', con)
Or use the modern pytd API:
import pytd
import pandas as pd
client = pytd.Client(database='sample_datasets')
result = client.query('SELECT * FROM nasdaq')
df = pd.DataFrame(result['data'], columns=result['columns'])
Testing and Development
Test Connection
import pytd
try:
client = pytd.Client(database='sample_datasets')
result = client.query('SELECT 1 as test')
print("Connection successful!")
print(result)
except Exception as e:
print(f"Connection failed: {e}")
Verify Data Upload
import pandas as pd
import pytd
# Create test data
test_df = pd.DataFrame({
'id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Charlie'],
'value': [100, 200, 300],
'time': [1704067200, 1704153600, 1704240000] # Unix timestamps
})
client = pytd.Client(database='test_db')
# Upload
print("Uploading test data...")
client.load_table_from_dataframe(
test_df,
'test_table',
writer='bulk_import',
if_exists='overwrite'
)
# Verify
print("Verifying upload...")
result = client.query('SELECT * FROM test_table ORDER BY id')
verify_df = pd.DataFrame(result['data'], columns=result['columns'])
print("\nUploaded data:")
print(verify_df)
# Check counts match
assert len(test_df) == len(verify_df), "Row count mismatch!"
print("\nVerification successful!")
Performance Testing
import pytd
import pandas as pd
import time
client = pytd.Client(database='test_db')
# Generate test data
df = pd.DataFrame({
'id': range(100000),
'value': range(100000),
'time': int(time.time())
})
# Test bulk_import
start = time.time()
client.load_table_from_dataframe(df, 'perf_test_bulk', writer='bulk_import', if_exists='overwrite')
bulk_time = time.time() - start
print(f"bulk_import: {bulk_time:.2f}s for {len(df)} rows")
# Test insert_into (small sample only!)
small_df = df.head(100)
start = time.time()
client.load_table_from_dataframe(small_df, 'perf_test_insert', writer='insert_into', if_exists='overwrite')
insert_time = time.time() - start
print(f"insert_into: {insert_time:.2f}s for {len(small_df)} rows")
Jupyter Notebook Integration
pytd works seamlessly with Jupyter notebooks:
# Notebook cell 1: Setup
import pytd
import pandas as pd
import matplotlib.pyplot as plt
client = pytd.Client(database='analytics')
# Notebook cell 2: Query data
query = """
SELECT
TD_TIME_FORMAT(time, 'yyyy-MM-dd', 'JST') as date,
COUNT(*) as events
FROM user_events
WHERE TD_INTERVAL(time, '-30d', 'JST')
GROUP BY 1
ORDER BY 1
"""
result = client.query(query)
df = pd.DataFrame(result['data'], columns=result['columns'])
df['date'] = pd.to_datetime(df['date'])
# Notebook cell 3: Visualize
plt.figure(figsize=(12, 6))
plt.plot(df['date'], df['events'])
plt.title('Daily Events - Last 30 Days')
plt.xlabel('Date')
plt.ylabel('Event Count')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
# Notebook cell 4: Write results back
summary = df.describe()
# Process and save summary back to TD if needed
Resources
- Documentation: https://pytd-doc.readthedocs.io/
- GitHub Repository: https://github.com/treasure-data/pytd
- PyPI Package: https://pypi.org/project/pytd/
- TD Python Guide: https://docs.treasuredata.com/
- Example Notebooks: See GitHub repository for Google Colab examples
Related Skills
- trino: Understanding Trino SQL syntax for queries in pytd
- hive: Using Hive-specific functions and syntax
- digdag: Orchestrating Python scripts using pytd in workflows
- td-javascript-sdk: Browser-based data collection (frontend) vs pytd (backend/analytics)
Comparison with Other Tools
| Tool | Purpose | When to Use |
|---|---|---|
| pytd | Full-featured Python driver | Analytics, data pipelines, pandas integration |
| td-client-python | Basic REST API wrapper | Simple CRUD, when pytd is too heavy |
| pandas-td (deprecated) | Legacy pandas integration | Don't use - migrate to pytd |
| TD Toolbelt | CLI tool | Command-line operations, shell scripts |
Recommendation: Use pytd for all Python-based analytical work and ETL pipelines. Use td-client-python only for basic REST API operations.
Last updated: 2025-01 | pytd version: Latest (Python 3.9+)