| name | genie-integration |
| description | Integrate Databricks Genie rooms as tools in agent workflows. Use when integrating Genie spaces with AI agents, querying Genie rooms programmatically via SDK or MCP, managing Genie conversations and polling, handling Genie API responses and errors, or building tool-calling agents that use Genie as a data source. Covers SDK patterns, MCP tool integration, conversation management, error handling, and performance optimization for Genie-based agent tools. |
Genie Integration Patterns
Integrate Databricks Genie rooms as powerful tools in your agent workflows using SDK or MCP approaches.
Core Concepts
What is Databricks Genie?
Genie is a conversational BI interface that:
- Translates natural language to SQL
- Executes queries against your data
- Returns formatted results and visualizations
- Maintains conversation context
Key advantage: Existing Genie rooms become instant agent tools without rebuilding data pipelines.
Integration Approaches
SDK Integration:
- Direct control via Databricks Python SDK
- Manual conversation management
- Flexible polling strategies
- Best for custom workflows
MCP Tools:
- Pre-configured tool interfaces
- Simplified API surface
- Built-in polling logic
- Best for standard patterns
Problem-Solution Patterns
Problem 1: Genie Query Timeouts
Symptoms:
- Queries hang indefinitely
- Agent gets stuck waiting for response
- No error handling for slow queries
Root causes:
- Insufficient polling timeout
- Complex SQL generated by Genie
- Large dataset queries
- No backoff strategy
Solution:
@tool
def query_genie_with_timeout(question: str, space_id: str, max_attempts: int = 30) -> str:
"""Query Genie with proper timeout handling"""
import time
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
try:
# Start conversation
response = w.genie.start_conversation(
space_id=space_id,
content=question
)
conversation_id = response.conversation_id
message_id = response.message_id
# Poll with exponential backoff
wait_time = 2
for attempt in range(max_attempts):
message = w.genie.get_message(
space_id=space_id,
conversation_id=conversation_id,
message_id=message_id
)
if message.status == "COMPLETED":
return extract_response(message)
elif message.status in ["FAILED", "CANCELLED"]:
return f"Query failed: {message.status}. Try simplifying your question."
# Exponential backoff: 2s, 2s, 4s, 4s, 8s, 8s...
time.sleep(wait_time)
if attempt % 2 == 1: # Double wait time every 2 attempts
wait_time = min(wait_time * 2, 10) # Cap at 10 seconds
return "Query timeout. The query may be too complex or the dataset too large."
except Exception as e:
return f"Error: {str(e)}"
def extract_response(message) -> str:
"""Extract formatted response from Genie message"""
response_text = ""
if message.attachments:
for attachment in message.attachments:
if hasattr(attachment, 'text') and attachment.text:
response_text += attachment.text.content + "\n"
elif hasattr(attachment, 'query') and attachment.query:
if hasattr(attachment.query, 'description'):
response_text += f"{attachment.query.description}\n"
return response_text or message.content or "No response available"
Problem 2: Verbose Genie Responses Confuse Agent
Symptoms:
- Agent overwhelmed by SQL query text
- Long table outputs cause token limits
- Agent can't synthesize due to noise
Root causes:
- Not filtering Genie response attachments
- Including raw SQL in tool output
- No summarization of large results
Solution:
@tool
def query_genie_concise(question: str, space_id: str) -> str:
"""Query Genie and return concise, agent-friendly response"""
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Get raw response (using pattern from Problem 1)
response = w.genie.start_conversation(space_id=space_id, content=question)
message = poll_for_completion(w, space_id, response.conversation_id, response.message_id)
# Extract ONLY the natural language summary
if message.attachments:
for attachment in message.attachments:
# Prioritize text summaries over raw SQL
if hasattr(attachment, 'text') and attachment.text:
# Return first text attachment (usually the summary)
return attachment.text.content
# If query result, extract key insights only
if hasattr(attachment, 'query') and attachment.query:
if hasattr(attachment.query, 'description'):
desc = attachment.query.description
# Truncate if too long
if len(desc) > 500:
return desc[:500] + "... [truncated for brevity]"
return desc
return message.content or "No clear response from Genie"
Best practice: Return summaries, not raw data. Let Genie do the summarization.
Problem 3: Losing Conversation Context
Symptoms:
- Each query starts fresh conversation
- Agent can't do follow-up questions
- "What about last month?" fails
Root causes:
- Not tracking conversation IDs
- Creating new conversation for each query
- No conversation state management
Solution:
class GenieConversationManager:
"""Manage ongoing conversations with Genie rooms"""
def __init__(self):
self.workspace_client = WorkspaceClient()
self.conversations = {} # space_id -> conversation_id
@tool
def query_genie_contextual(self, question: str, space_id: str) -> str:
"""
Query Genie while maintaining conversation context.
Automatically continues existing conversations or starts new ones.
"""
# Check if we have an active conversation for this space
conversation_id = self.conversations.get(space_id)
if conversation_id:
# Continue existing conversation
response = self.workspace_client.genie.create_message(
space_id=space_id,
conversation_id=conversation_id,
content=question
)
else:
# Start new conversation
response = self.workspace_client.genie.start_conversation(
space_id=space_id,
content=question
)
# Save conversation ID
self.conversations[space_id] = response.conversation_id
# Poll and return (using patterns from above)
message = poll_for_completion(
self.workspace_client,
space_id,
response.conversation_id,
response.message_id
)
return extract_response(message)
def reset_conversation(self, space_id: str):
"""Start fresh conversation for a space"""
if space_id in self.conversations:
del self.conversations[space_id]
Usage in agent:
# Initialize once for agent lifecycle
genie_manager = GenieConversationManager()
@tool
def query_customer_behavior(question: str) -> str:
"""Query customer behavior Genie room"""
return genie_manager.query_genie_contextual(
question=question,
space_id="01f09cdbacf01b5fa7ff7c237365502c"
)
Problem 4: Genie Space ID Management
Symptoms:
- Hard-coded space IDs scattered in code
- Errors when space IDs change
- Difficulty managing multiple environments
Root causes:
- No centralized configuration
- Space IDs embedded in tool definitions
- No environment-aware setup
Solution:
# config.py
from dataclasses import dataclass
from typing import Dict
import os
@dataclass
class GenieSpaceConfig:
space_id: str
name: str
description: str
class GenieConfig:
"""Centralized Genie space configuration"""
def __init__(self, environment: str = None):
self.environment = environment or os.getenv("DATABRICKS_ENV", "production")
self.spaces = self._load_spaces()
def _load_spaces(self) -> Dict[str, GenieSpaceConfig]:
"""Load space configurations per environment"""
# Production spaces
if self.environment == "production":
return {
"customer_behavior": GenieSpaceConfig(
space_id="01f09cdbacf01b5fa7ff7c237365502c",
name="Customer Behavior Analysis",
description="Customer trends and preferences"
),
"inventory": GenieSpaceConfig(
space_id="02a10defbcg02c6ga8gg8d348476613d",
name="Real-Time Inventory",
description="Stock levels and turnover"
)
}
# Development spaces
elif self.environment == "development":
return {
"customer_behavior": GenieSpaceConfig(
space_id="dev_customer_space_id",
name="Customer Behavior (Dev)",
description="Dev customer data"
),
"inventory": GenieSpaceConfig(
space_id="dev_inventory_space_id",
name="Inventory (Dev)",
description="Dev inventory data"
)
}
raise ValueError(f"Unknown environment: {self.environment}")
def get_space(self, name: str) -> GenieSpaceConfig:
"""Get space configuration by name"""
if name not in self.spaces:
raise ValueError(f"Unknown Genie space: {name}")
return self.spaces[name]
# Usage in tools
config = GenieConfig()
@tool
def query_customer_behavior(question: str) -> str:
"""Query customer behavior"""
space = config.get_space("customer_behavior")
return query_genie(question, space.space_id)
SDK Integration Pattern
Complete SDK-Based Tool
from databricks.sdk import WorkspaceClient
from langchain.tools import tool
import time
@tool
def query_genie_sdk(
question: str,
space_id: str,
conversation_id: str = None,
max_attempts: int = 30
) -> str:
"""
Query Databricks Genie room using SDK.
Args:
question: Natural language question
space_id: Genie space ID
conversation_id: Optional conversation ID to continue conversation
max_attempts: Max polling attempts
Returns:
Genie's response as string
"""
w = WorkspaceClient()
try:
# Start or continue conversation
if conversation_id:
response = w.genie.create_message(
space_id=space_id,
conversation_id=conversation_id,
content=question
)
else:
response = w.genie.start_conversation(
space_id=space_id,
content=question
)
conv_id = response.conversation_id
msg_id = response.message_id
# Poll for completion with exponential backoff
wait_time = 2
for attempt in range(max_attempts):
message = w.genie.get_message(
space_id=space_id,
conversation_id=conv_id,
message_id=msg_id
)
# Check status
if message.status == "COMPLETED":
# Extract response
result = ""
if message.attachments:
for attachment in message.attachments:
if hasattr(attachment, 'text') and attachment.text:
result += attachment.text.content + "\n"
elif hasattr(attachment, 'query') and attachment.query:
if hasattr(attachment.query, 'description'):
result += attachment.query.description + "\n"
return result.strip() or message.content or "No response"
elif message.status in ["FAILED", "CANCELLED"]:
return f"Query failed: {message.status}"
# Wait with backoff
time.sleep(wait_time)
if attempt % 2 == 1:
wait_time = min(wait_time * 2, 10)
return "Query timeout after 60 seconds"
except Exception as e:
return f"Error querying Genie: {str(e)}"
MCP Tool Integration Pattern
Understanding MCP Tools
MCP (Model Context Protocol) tools provide a higher-level interface:
# MCP tools are pre-configured for specific Genie spaces
# They handle polling and conversation management automatically
# Available in environment as:
# - query_space_<SPACE_ID>
# - poll_response_<SPACE_ID>
# Example: Using existing MCP tool
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Call Genie via MCP
result = w.genie.query_space(
space_id="01f09cdbacf01b5fa7ff7c237365502c",
query="What products are trending?",
conversation_id=None # Optional: continue conversation
)
When to Use MCP vs SDK
Use MCP tools when:
- Standard query/response pattern
- Don't need custom polling logic
- Want simpler code
- MCP tools available for your spaces
Use SDK when:
- Need custom timeout handling
- Complex conversation management
- Custom response parsing
- MCP tools not configured
Performance Optimization
Pattern 1: Caching Genie Responses
from functools import lru_cache
from datetime import datetime, timedelta
class CachedGenieQuery:
"""Cache Genie responses with TTL"""
def __init__(self, ttl_minutes: int = 15):
self.cache = {}
self.ttl = timedelta(minutes=ttl_minutes)
def query(self, question: str, space_id: str) -> str:
"""Query with caching"""
cache_key = f"{space_id}:{question}"
# Check cache
if cache_key in self.cache:
result, timestamp = self.cache[cache_key]
if datetime.now() - timestamp < self.ttl:
return result
# Query Genie
result = query_genie_sdk(question, space_id)
# Cache result
self.cache[cache_key] = (result, datetime.now())
return result
Pattern 2: Parallel Genie Queries
import concurrent.futures
def query_multiple_genie_spaces(questions: list[tuple[str, str]]) -> list[str]:
"""
Query multiple Genie spaces in parallel.
Args:
questions: List of (question, space_id) tuples
Returns:
List of responses in same order
"""
def query_one(question_space):
question, space_id = question_space
return query_genie_sdk(question, space_id)
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(query_one, questions))
return results
# Usage in agent
questions = [
("What products are trending?", "customer_space_id"),
("Which locations have high turnover?", "inventory_space_id")
]
results = query_multiple_genie_spaces(questions)
Error Handling Best Practices
Comprehensive Error Strategy
@tool
def query_genie_robust(question: str, space_id: str) -> str:
"""Query Genie with comprehensive error handling"""
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import DatabricksError
w = WorkspaceClient()
try:
response = w.genie.start_conversation(
space_id=space_id,
content=question
)
message = poll_for_completion(
w, space_id, response.conversation_id, response.message_id
)
if message.status == "COMPLETED":
return extract_response(message)
elif message.status == "FAILED":
# Provide actionable error message
return ("Query failed. This could be due to:\n"
"- Invalid SQL generated\n"
"- Data source unavailable\n"
"- Permissions issue\n"
"Try rephrasing your question or check Genie room configuration.")
else:
return f"Unexpected status: {message.status}"
except DatabricksError as e:
if "not found" in str(e).lower():
return "Genie space not found. Check space ID configuration."
elif "permission" in str(e).lower():
return "Permission denied. Ensure agent has access to Genie space."
else:
return f"Databricks API error: {str(e)}"
except TimeoutError:
return "Query timeout. Try a simpler question or check data volume."
except Exception as e:
return f"Unexpected error: {str(e)}"
Testing Genie Integration
Unit Test Pattern
def test_genie_tool():
"""Test Genie tool in isolation"""
# Test 1: Simple query
result = query_genie_sdk(
question="How many customers do we have?",
space_id="01f09cdbacf01b5fa7ff7c237365502c"
)
assert result, "Should return non-empty result"
assert "error" not in result.lower(), "Should not contain error"
# Test 2: Invalid space ID
result = query_genie_sdk(
question="test",
space_id="invalid_id"
)
assert "not found" in result.lower() or "error" in result.lower()
# Test 3: Conversation continuity
# (Test that conversation_id parameter works)
Integration Test Pattern
def test_genie_in_agent():
"""Test Genie tool within agent workflow"""
from your_agent import GenieAgent
agent = GenieAgent()
# Test single-tool query
result = agent.query("What products are trending?")
assert "customer_behavior" in result['intermediate_steps'][0][0].tool
# Test multi-tool query
result = agent.query("Trending products at risk of overstock?")
tools_called = [step[0].tool for step in result['intermediate_steps']]
assert "customer_behavior" in tools_called
assert "inventory" in tools_called
Quick Reference
Minimum Viable Genie Tool (SDK)
from databricks.sdk import WorkspaceClient
from langchain.tools import tool
import time
@tool
def query_genie(question: str, space_id: str) -> str:
"""Query Genie room"""
w = WorkspaceClient()
resp = w.genie.start_conversation(space_id=space_id, content=question)
for _ in range(30):
msg = w.genie.get_message(space_id, resp.conversation_id, resp.message_id)
if msg.status == "COMPLETED":
return msg.attachments[0].text.content if msg.attachments else msg.content
elif msg.status in ["FAILED", "CANCELLED"]:
return f"Failed: {msg.status}"
time.sleep(2)
return "Timeout"
Related Skills
- mosaic-ai-agent: Design agents that use Genie tools
- agent-mlops: Deploy Genie-powered agents to production