| name | parquet-coder |
| description | Columnar file patterns including partitioning, predicate pushdown, and schema evolution. |
| allowed-tools | Read, Write, Edit, Grep, Glob, Bash |
Parquet-Coder
Patterns for efficient columnar data storage with Parquet.
Basic Operations
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
# Write with compression
df.to_parquet('data.parquet', compression='snappy', index=False)
# Common compression options:
# - snappy: Fast, good compression (default)
# - gzip: Slower, better compression
# - zstd: Best balance of speed/compression
# - None: No compression (fastest writes)
# Read entire file
df = pd.read_parquet('data.parquet')
# Read specific columns only (predicate pushdown)
df = pd.read_parquet('data.parquet', columns=['id', 'name', 'value'])
PyArrow for Large Files
# Read as PyArrow Table (more memory efficient)
table = pq.read_table('data.parquet')
# Convert to pandas when needed
df = table.to_pandas()
# Filter while reading (row group filtering)
table = pq.read_table(
'data.parquet',
filters=[
('date', '>=', '2024-01-01'),
('status', '=', 'active')
]
)
# Read in batches for huge files
parquet_file = pq.ParquetFile('huge.parquet')
for batch in parquet_file.iter_batches(batch_size=100_000):
df_batch = batch.to_pandas()
process(df_batch)
Partitioned Datasets
# Write partitioned by columns
df.to_parquet(
'data/',
partition_cols=['year', 'month'],
compression='snappy'
)
# Creates: data/year=2024/month=01/part-0.parquet
# Read partitioned dataset
df = pd.read_parquet('data/') # Reads all partitions
# Read specific partitions only
df = pd.read_parquet('data/year=2024/')
# With PyArrow dataset API (more control)
import pyarrow.dataset as ds
dataset = ds.dataset('data/', format='parquet', partitioning='hive')
# Filter on partition columns (very fast)
table = dataset.to_table(
filter=(ds.field('year') == 2024) & (ds.field('month') >= 6)
)
Schema Definition
# Explicit schema for consistency
schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('value', pa.float64()),
('date', pa.date32()),
('tags', pa.list_(pa.string())),
('metadata', pa.map_(pa.string(), pa.string())),
])
# Write with schema
table = pa.Table.from_pandas(df, schema=schema)
pq.write_table(table, 'data.parquet')
# Read and validate schema
file_schema = pq.read_schema('data.parquet')
assert file_schema.equals(schema), "Schema mismatch!"
Schema Evolution
def merge_schemas(old_schema: pa.Schema, new_schema: pa.Schema) -> pa.Schema:
"""Create unified schema from old and new."""
fields = {f.name: f for f in old_schema}
for field in new_schema:
if field.name not in fields:
fields[field.name] = field
elif fields[field.name].type != field.type:
# Handle type conflicts (e.g., promote int to float)
fields[field.name] = pa.field(
field.name,
promote_type(fields[field.name].type, field.type)
)
return pa.schema(list(fields.values()))
def append_with_schema_evolution(
existing_path: str,
new_df: pd.DataFrame,
output_path: str
) -> None:
"""Append data with automatic schema evolution."""
existing = pq.read_table(existing_path)
new_table = pa.Table.from_pandas(new_df)
# Unify schemas
unified_schema = pa.unify_schemas([existing.schema, new_table.schema])
# Cast both to unified schema
existing = existing.cast(unified_schema)
new_table = new_table.cast(unified_schema)
# Concatenate and write
combined = pa.concat_tables([existing, new_table])
pq.write_table(combined, output_path)
Row Group Optimization
# Control row group size (affects read performance)
pq.write_table(
table,
'data.parquet',
row_group_size=100_000, # Rows per group
compression='snappy'
)
# Read metadata to see row groups
parquet_file = pq.ParquetFile('data.parquet')
print(f"Num row groups: {parquet_file.metadata.num_row_groups}")
print(f"Num rows: {parquet_file.metadata.num_rows}")
# Read specific row groups
table = parquet_file.read_row_groups([0, 1]) # First two groups
Metadata and Statistics
# Add custom metadata
custom_metadata = {
b'created_by': b'etl_pipeline',
b'version': b'1.0',
b'source': b'api_export'
}
schema = table.schema.with_metadata(custom_metadata)
table = table.cast(schema)
pq.write_table(table, 'data.parquet')
# Read metadata
parquet_file = pq.ParquetFile('data.parquet')
print(parquet_file.schema.metadata)
# Column statistics (min/max for filtering)
metadata = parquet_file.metadata
for i in range(metadata.num_row_groups):
rg = metadata.row_group(i)
for j in range(rg.num_columns):
col = rg.column(j)
if col.statistics:
print(f"{col.path_in_schema}: min={col.statistics.min}, max={col.statistics.max}")
Delta Lake Integration
from deltalake import DeltaTable, write_deltalake
# Write as Delta table (versioned parquet)
write_deltalake('delta_table/', df, mode='overwrite')
# Append data
write_deltalake('delta_table/', new_df, mode='append')
# Read Delta table
dt = DeltaTable('delta_table/')
df = dt.to_pandas()
# Time travel
df_old = dt.load_version(0).to_pandas()
# Compact small files
dt.optimize.compact()
# Vacuum old versions
dt.vacuum(retention_hours=168) # Keep 7 days
Performance Tips
# 1. Use appropriate column types
schema = pa.schema([
('category', pa.dictionary(pa.int8(), pa.string())), # For repeated strings
('count', pa.int32()), # Not int64 if values fit
])
# 2. Sort data before writing (improves predicate pushdown)
df = df.sort_values(['date', 'category'])
df.to_parquet('sorted.parquet')
# 3. Use column selection
df = pd.read_parquet('data.parquet', columns=['needed', 'columns'])
# 4. Use filters for row pruning
df = pd.read_parquet('data/', filters=[('status', '==', 'active')])
# 5. Parallel reads
import pyarrow.dataset as ds
dataset = ds.dataset('data/', format='parquet')
table = dataset.to_table(use_threads=True)
Conversion Utilities
def csv_to_parquet(
csv_path: str,
parquet_path: str,
chunk_size: int = 100_000
) -> None:
"""Convert large CSV to Parquet efficiently."""
writer = None
for chunk in pd.read_csv(csv_path, chunksize=chunk_size):
table = pa.Table.from_pandas(chunk)
if writer is None:
writer = pq.ParquetWriter(parquet_path, table.schema)
writer.write_table(table)
writer.close()
def json_to_parquet(json_path: str, parquet_path: str) -> None:
"""Convert JSON lines to Parquet."""
df = pd.read_json(json_path, lines=True)
df.to_parquet(parquet_path, compression='snappy')