| name | data-products |
| description | Data product design patterns with contracts, SLAs, and governance for building self-serve data platforms using Data Mesh principles. |
| triggers | data products, data mesh, data contracts, sla definition, data governance |
| category | architecture |
Data Products Skill
Overview
Data products are self-contained, discoverable data assets with clear ownership, SLAs, and contracts. This skill provides patterns for designing, implementing, and governing data products using Data Mesh principles.
Key Benefits:
- Clear ownership and accountability
- Explicit contracts and SLAs
- Discoverable and self-serve
- Version-controlled interfaces
- Quality guarantees
- Consumer-oriented design
When to Use This Skill
Use data product patterns when you need to:
- Build self-serve data platforms
- Implement Data Mesh architecture
- Define clear data contracts between teams
- Establish data ownership and accountability
- Provide discoverable data assets
- Guarantee data quality and freshness
- Enable cross-domain data sharing
Core Concepts
1. Data Product Definition
Product Specification:
product:
name: customer-360
version: 2.1.0
description: Complete customer view with demographics, behavior, and preferences
owner:
team: customer-data-platform
email: cdp-team@company.com
domain: customer
subdomain: customer-intelligence
interfaces:
- name: customer_profile
type: delta_table
location: catalog.customer.profile_v2
format: delta
access_pattern: batch_query
- name: customer_events_stream
type: kafka_topic
location: customer.events.v2
format: avro
access_pattern: streaming
contracts:
schema:
version: 2.1.0
evolution_policy: backward_compatible
sla:
freshness:
profile: 1_hour
events: real_time
availability: 99.9%
completeness: 99.5%
accuracy: 99%
quality_rules:
- name: primary_key_uniqueness
rule: customer_id IS NOT NULL AND UNIQUE
severity: critical
- name: email_format
rule: email matches RFC5322 pattern
severity: high
governance:
classification: PII
retention_days: 2555 # 7 years
access_controls:
- role: data_analyst
permissions: [SELECT]
- role: data_engineer
permissions: [SELECT, INSERT]
2. Data Contracts
Contract Schema Definition:
"""
Data contract implementation.
"""
from dataclasses import dataclass
from typing import List, Dict, Any
from enum import Enum
class DataType(Enum):
"""Supported data types."""
STRING = "string"
INTEGER = "integer"
DOUBLE = "double"
TIMESTAMP = "timestamp"
BOOLEAN = "boolean"
@dataclass
class FieldContract:
"""Contract for a single field."""
name: str
type: DataType
required: bool
description: str
constraints: Dict[str, Any] = None
pii: bool = False
@dataclass
class DataContract:
"""Complete data contract specification."""
product_name: str
version: str
fields: List[FieldContract]
primary_keys: List[str]
partitioning: List[str]
quality_rules: List[Dict[str, Any]]
sla: Dict[str, Any]
def validate_schema(self, df) -> bool:
"""Validate DataFrame against contract."""
# Check all required fields present
contract_fields = {f.name for f in self.fields}
df_fields = set(df.columns)
if not contract_fields.issubset(df_fields):
missing = contract_fields - df_fields
raise ValueError(f"Missing required fields: {missing}")
# Check field types
for field in self.fields:
if field.required and df.filter(df[field.name].isNull()).count() > 0:
raise ValueError(f"Required field {field.name} has null values")
return True
# Example usage
customer_contract = DataContract(
product_name="customer-360",
version="2.1.0",
fields=[
FieldContract(
name="customer_id",
type=DataType.STRING,
required=True,
description="Unique customer identifier",
pii=False
),
FieldContract(
name="email",
type=DataType.STRING,
required=True,
description="Customer email address",
constraints={"format": "email"},
pii=True
)
],
primary_keys=["customer_id"],
partitioning=["registration_date"],
quality_rules=[
{"name": "uniqueness", "column": "customer_id", "rule": "unique"},
{"name": "email_format", "column": "email", "rule": "matches_regex", "pattern": r"^[A-Za-z0-9._%+-]+@"}
],
sla={
"freshness_hours": 1,
"availability_percent": 99.9,
"completeness_percent": 99.5
}
)
3. SLA Management
SLA Monitoring:
"""
SLA monitoring and enforcement.
"""
from datetime import datetime, timedelta
from typing import Dict, Any
class SLAMonitor:
"""Monitor and enforce data product SLAs."""
def __init__(self, spark):
self.spark = spark
def check_freshness(
self,
table_name: str,
timestamp_column: str,
max_age_hours: int
) -> Dict[str, Any]:
"""Check if data meets freshness SLA."""
df = self.spark.table(table_name)
latest_timestamp = df.agg(
{timestamp_column: "max"}
).collect()[0][0]
age_hours = (
datetime.now() - latest_timestamp
).total_seconds() / 3600
return {
"met": age_hours <= max_age_hours,
"latest_timestamp": latest_timestamp,
"age_hours": age_hours,
"sla_hours": max_age_hours
}
def check_completeness(
self,
table_name: str,
required_columns: List[str],
threshold_percent: float = 99.0
) -> Dict[str, Any]:
"""Check if data meets completeness SLA."""
df = self.spark.table(table_name)
total_records = df.count()
results = {}
for column in required_columns:
non_null_count = df.filter(df[column].isNotNull()).count()
completeness = (non_null_count / total_records) * 100
results[column] = {
"completeness_percent": completeness,
"met": completeness >= threshold_percent,
"missing_count": total_records - non_null_count
}
overall_met = all(r["met"] for r in results.values())
return {
"met": overall_met,
"by_column": results
}
def check_availability(
self,
table_name: str,
lookback_days: int = 7
) -> Dict[str, Any]:
"""Check if data product meets availability SLA."""
# Query system tables for availability metrics
query = f"""
SELECT
COUNT(*) as total_checks,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_checks
FROM system.monitoring.table_health_checks
WHERE table_name = '{table_name}'
AND check_time >= current_date() - INTERVAL {lookback_days} DAYS
"""
result = self.spark.sql(query).first()
availability_percent = (
result["successful_checks"] / result["total_checks"] * 100
if result["total_checks"] > 0 else 0
)
return {
"availability_percent": availability_percent,
"total_checks": result["total_checks"],
"successful_checks": result["successful_checks"],
"lookback_days": lookback_days
}
4. Product Metadata
Metadata Management:
"""
Data product metadata and discovery.
"""
from dataclasses import dataclass, asdict
from typing import List, Dict, Any
from datetime import datetime
import json
@dataclass
class ProductMetadata:
"""Metadata for data product discovery."""
name: str
version: str
description: str
owner_team: str
owner_email: str
domain: str
tags: List[str]
interfaces: List[Dict[str, Any]]
documentation_url: str
created_at: datetime
updated_at: datetime
def to_delta(self, spark, catalog_table: str):
"""Persist metadata to Unity Catalog."""
metadata_dict = asdict(self)
metadata_dict["created_at"] = self.created_at.isoformat()
metadata_dict["updated_at"] = self.updated_at.isoformat()
metadata_dict["interfaces"] = json.dumps(self.interfaces)
metadata_dict["tags"] = json.dumps(self.tags)
df = spark.createDataFrame([metadata_dict])
df.write.format("delta").mode("append").saveAsTable(catalog_table)
class ProductCatalog:
"""Centralized product catalog for discovery."""
def __init__(self, spark, catalog_table: str):
self.spark = spark
self.catalog_table = catalog_table
def register_product(self, metadata: ProductMetadata):
"""Register new data product."""
metadata.to_delta(self.spark, self.catalog_table)
def search_products(
self,
domain: str = None,
tags: List[str] = None
) -> List[Dict[str, Any]]:
"""Search for data products."""
query = f"SELECT * FROM {self.catalog_table} WHERE 1=1"
if domain:
query += f" AND domain = '{domain}'"
if tags:
for tag in tags:
query += f" AND array_contains(tags, '{tag}')"
return self.spark.sql(query).collect()
def get_product(self, name: str, version: str = None) -> Dict[str, Any]:
"""Get specific product metadata."""
query = f"SELECT * FROM {self.catalog_table} WHERE name = '{name}'"
if version:
query += f" AND version = '{version}'"
else:
query += " ORDER BY updated_at DESC LIMIT 1"
result = self.spark.sql(query).first()
return result.asDict() if result else None
Implementation Patterns
Pattern 1: Building a Data Product
Complete Product Implementation:
"""
Customer 360 Data Product Implementation
"""
import dlt
from pyspark.sql.functions import *
# Product metadata
PRODUCT_NAME = "customer-360"
PRODUCT_VERSION = "2.1.0"
PRODUCT_OWNER = "customer-data-platform"
@dlt.table(
name="customer_360_profile",
comment="Customer 360 profile data product",
table_properties={
"product.name": PRODUCT_NAME,
"product.version": PRODUCT_VERSION,
"product.owner": PRODUCT_OWNER,
"product.interface": "customer_profile",
"quality.tier": "gold",
"pii.contains": "true"
}
)
@dlt.expect_or_fail("pk_not_null", "customer_id IS NOT NULL")
@dlt.expect_or_fail("pk_unique", "customer_id IS UNIQUE")
@dlt.expect_or_drop("valid_email", "email RLIKE '^[A-Za-z0-9._%+-]+@'")
@dlt.expect("complete_profile", "phone IS NOT NULL AND address IS NOT NULL")
def customer_360_profile():
"""
Build customer 360 profile data product.
SLA:
- Freshness: 1 hour
- Availability: 99.9%
- Completeness: 99.5%
"""
# Source data
demographics = dlt.read("silver_customer_demographics")
behavior = dlt.read("silver_customer_behavior")
preferences = dlt.read("silver_customer_preferences")
# Join sources
profile = (
demographics
.join(behavior, "customer_id", "left")
.join(preferences, "customer_id", "left")
.select(
"customer_id",
"email",
"phone",
"address",
"registration_date",
"segment",
"lifetime_value",
"last_purchase_date",
"total_orders",
"preferred_category",
"communication_preference"
)
.withColumn("product_version", lit(PRODUCT_VERSION))
.withColumn("updated_at", current_timestamp())
)
return profile
# Product quality monitoring
@dlt.table(
name="customer_360_quality_metrics",
comment="Quality metrics for customer 360 product"
)
def quality_metrics():
"""Monitor product quality against SLAs."""
profile = dlt.read("customer_360_profile")
return spark.sql(f"""
SELECT
current_timestamp() as metric_timestamp,
'{PRODUCT_NAME}' as product_name,
'{PRODUCT_VERSION}' as product_version,
COUNT(*) as total_records,
COUNT_IF(customer_id IS NOT NULL) * 100.0 / COUNT(*) as id_completeness,
COUNT_IF(email IS NOT NULL) * 100.0 / COUNT(*) as email_completeness,
COUNT_IF(phone IS NOT NULL) * 100.0 / COUNT(*) as phone_completeness,
MAX(updated_at) as last_update_time,
(unix_timestamp(current_timestamp()) - unix_timestamp(MAX(updated_at))) / 3600 as data_age_hours
FROM LIVE.customer_360_profile
""")
Pattern 2: Contract Enforcement
Automated Contract Validation:
"""
Contract validation in data pipelines.
"""
from typing import Dict, Any
from pyspark.sql import DataFrame
class ContractValidator:
"""Validate data against contracts."""
def __init__(self, contract: DataContract):
self.contract = contract
def validate(self, df: DataFrame) -> Dict[str, Any]:
"""
Validate DataFrame against contract.
Returns validation results with pass/fail status.
"""
results = {
"product": self.contract.product_name,
"version": self.contract.version,
"timestamp": datetime.now().isoformat(),
"validations": []
}
# Schema validation
try:
self.contract.validate_schema(df)
results["validations"].append({
"check": "schema",
"status": "PASS"
})
except ValueError as e:
results["validations"].append({
"check": "schema",
"status": "FAIL",
"error": str(e)
})
# Primary key validation
pk_violations = (
df.groupBy(self.contract.primary_keys)
.count()
.filter(col("count") > 1)
.count()
)
results["validations"].append({
"check": "primary_key_uniqueness",
"status": "PASS" if pk_violations == 0 else "FAIL",
"violations": pk_violations
})
# Quality rules validation
for rule in self.contract.quality_rules:
violations = self._check_quality_rule(df, rule)
results["validations"].append({
"check": rule["name"],
"status": "PASS" if violations == 0 else "FAIL",
"violations": violations
})
results["overall_status"] = (
"PASS" if all(v["status"] == "PASS" for v in results["validations"])
else "FAIL"
)
return results
def _check_quality_rule(self, df: DataFrame, rule: Dict[str, Any]) -> int:
"""Check individual quality rule."""
if rule["rule"] == "unique":
total = df.count()
unique = df.select(rule["column"]).distinct().count()
return total - unique
elif rule["rule"] == "matches_regex":
return df.filter(
~col(rule["column"]).rlike(rule["pattern"])
).count()
return 0
Pattern 3: Product Versioning
Semantic Versioning for Data:
"""
Data product versioning strategy.
"""
class ProductVersion:
"""Manage data product versions."""
def __init__(self, major: int, minor: int, patch: int):
self.major = major
self.minor = minor
self.patch = patch
def __str__(self):
return f"{self.major}.{self.minor}.{self.patch}"
def bump_major(self):
"""
Increment major version (breaking changes).
Examples:
- Removing columns
- Changing column types
- Changing primary keys
"""
return ProductVersion(self.major + 1, 0, 0)
def bump_minor(self):
"""
Increment minor version (backward compatible changes).
Examples:
- Adding new columns
- Adding new quality rules
- Improving data quality
"""
return ProductVersion(self.major, self.minor + 1, 0)
def bump_patch(self):
"""
Increment patch version (bug fixes).
Examples:
- Fixing data quality issues
- Correcting transformations
- Performance improvements
"""
return ProductVersion(self.major, self.minor, self.patch + 1)
def is_compatible(self, other: 'ProductVersion') -> bool:
"""
Check if versions are compatible.
Compatibility: same major version.
"""
return self.major == other.major
Pattern 4: Self-Service Discovery
Product Discovery Interface:
"""
Self-service data product discovery.
"""
class ProductDiscovery:
"""Enable self-service product discovery."""
def __init__(self, spark, catalog_table: str):
self.spark = spark
self.catalog_table = catalog_table
def list_products_by_domain(self, domain: str) -> DataFrame:
"""List all products in a domain."""
return self.spark.sql(f"""
SELECT
name,
version,
description,
owner_team,
tags,
documentation_url
FROM {self.catalog_table}
WHERE domain = '{domain}'
ORDER BY name, version DESC
""")
def get_product_lineage(self, product_name: str) -> Dict[str, Any]:
"""Get upstream and downstream dependencies."""
lineage_query = f"""
SELECT
upstream_table,
downstream_table,
transformation_logic
FROM system.access.table_lineage
WHERE downstream_table LIKE '%{product_name}%'
OR upstream_table LIKE '%{product_name}%'
"""
lineage = self.spark.sql(lineage_query).collect()
return {
"product": product_name,
"upstream": [row.upstream_table for row in lineage],
"downstream": [row.downstream_table for row in lineage]
}
def get_product_usage(
self,
product_name: str,
days: int = 30
) -> DataFrame:
"""Get product usage statistics."""
return self.spark.sql(f"""
SELECT
date_trunc('day', request_time) as date,
user_name,
COUNT(*) as query_count
FROM system.access.audit
WHERE table_name LIKE '%{product_name}%'
AND request_time >= current_date() - INTERVAL {days} DAYS
GROUP BY 1, 2
ORDER BY 1 DESC, 3 DESC
""")
Best Practices
1. Product Organization
products/
├── customer-360/
│ ├── product.yaml
│ ├── contract.json
│ ├── pipelines/
│ │ └── build_product.py
│ ├── tests/
│ │ └── test_product.py
│ └── docs/
│ └── README.md
2. Contract Evolution
- Major version: Breaking changes
- Minor version: Backward compatible additions
- Patch version: Bug fixes and improvements
- Always maintain backward compatibility within major version
- Deprecate fields before removing
- Provide migration guides for major versions
3. SLA Definition
Define realistic, measurable SLAs:
- Freshness: Maximum data age
- Availability: Uptime percentage
- Completeness: Minimum non-null percentage
- Accuracy: Data correctness threshold
4. Ownership Model
- Clear team ownership
- On-call rotation for incidents
- Documented escalation paths
- Regular SLA reviews
Common Pitfalls to Avoid
Don't:
- Create products without clear consumers
- Skip contract definition
- Ignore versioning strategy
- Overcomplicate products
- Neglect monitoring
Do:
- Start with consumer needs
- Define explicit contracts
- Version semantically
- Keep products focused
- Monitor SLAs continuously
Complete Examples
See /examples/ directory for:
customer360_product.py: Complete data productsales_analytics_product.py: Analytics product example
Related Skills
data-quality: Quality guaranteesdelta-live-tables: Product pipelinesdelta-sharing: Product distributioncicd-workflows: Product deployment