Claude Code Plugins

Community-maintained marketplace

Feedback

Integrate Databricks Genie rooms as tools in agent workflows. Use when integrating Genie spaces with AI agents, querying Genie rooms programmatically via SDK or MCP, managing Genie conversations and polling, handling Genie API responses and errors, or building tool-calling agents that use Genie as a data source. Covers SDK patterns, MCP tool integration, conversation management, error handling, and performance optimization for Genie-based agent tools.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name genie-integration
description Integrate Databricks Genie rooms as tools in agent workflows. Use when integrating Genie spaces with AI agents, querying Genie rooms programmatically via SDK or MCP, managing Genie conversations and polling, handling Genie API responses and errors, or building tool-calling agents that use Genie as a data source. Covers SDK patterns, MCP tool integration, conversation management, error handling, and performance optimization for Genie-based agent tools.

Genie Integration Patterns

Integrate Databricks Genie rooms as powerful tools in your agent workflows using SDK or MCP approaches.

Core Concepts

What is Databricks Genie?

Genie is a conversational BI interface that:

  • Translates natural language to SQL
  • Executes queries against your data
  • Returns formatted results and visualizations
  • Maintains conversation context

Key advantage: Existing Genie rooms become instant agent tools without rebuilding data pipelines.

Integration Approaches

SDK Integration:

  • Direct control via Databricks Python SDK
  • Manual conversation management
  • Flexible polling strategies
  • Best for custom workflows

MCP Tools:

  • Pre-configured tool interfaces
  • Simplified API surface
  • Built-in polling logic
  • Best for standard patterns

Problem-Solution Patterns

Problem 1: Genie Query Timeouts

Symptoms:

  • Queries hang indefinitely
  • Agent gets stuck waiting for response
  • No error handling for slow queries

Root causes:

  • Insufficient polling timeout
  • Complex SQL generated by Genie
  • Large dataset queries
  • No backoff strategy

Solution:

@tool
def query_genie_with_timeout(question: str, space_id: str, max_attempts: int = 30) -> str:
    """Query Genie with proper timeout handling"""
    import time
    from databricks.sdk import WorkspaceClient
    
    w = WorkspaceClient()
    
    try:
        # Start conversation
        response = w.genie.start_conversation(
            space_id=space_id,
            content=question
        )
        
        conversation_id = response.conversation_id
        message_id = response.message_id
        
        # Poll with exponential backoff
        wait_time = 2
        for attempt in range(max_attempts):
            message = w.genie.get_message(
                space_id=space_id,
                conversation_id=conversation_id,
                message_id=message_id
            )
            
            if message.status == "COMPLETED":
                return extract_response(message)
            elif message.status in ["FAILED", "CANCELLED"]:
                return f"Query failed: {message.status}. Try simplifying your question."
            
            # Exponential backoff: 2s, 2s, 4s, 4s, 8s, 8s...
            time.sleep(wait_time)
            if attempt % 2 == 1:  # Double wait time every 2 attempts
                wait_time = min(wait_time * 2, 10)  # Cap at 10 seconds
        
        return "Query timeout. The query may be too complex or the dataset too large."
        
    except Exception as e:
        return f"Error: {str(e)}"

def extract_response(message) -> str:
    """Extract formatted response from Genie message"""
    response_text = ""
    if message.attachments:
        for attachment in message.attachments:
            if hasattr(attachment, 'text') and attachment.text:
                response_text += attachment.text.content + "\n"
            elif hasattr(attachment, 'query') and attachment.query:
                if hasattr(attachment.query, 'description'):
                    response_text += f"{attachment.query.description}\n"
    return response_text or message.content or "No response available"

Problem 2: Verbose Genie Responses Confuse Agent

Symptoms:

  • Agent overwhelmed by SQL query text
  • Long table outputs cause token limits
  • Agent can't synthesize due to noise

Root causes:

  • Not filtering Genie response attachments
  • Including raw SQL in tool output
  • No summarization of large results

Solution:

@tool
def query_genie_concise(question: str, space_id: str) -> str:
    """Query Genie and return concise, agent-friendly response"""
    from databricks.sdk import WorkspaceClient
    
    w = WorkspaceClient()
    
    # Get raw response (using pattern from Problem 1)
    response = w.genie.start_conversation(space_id=space_id, content=question)
    message = poll_for_completion(w, space_id, response.conversation_id, response.message_id)
    
    # Extract ONLY the natural language summary
    if message.attachments:
        for attachment in message.attachments:
            # Prioritize text summaries over raw SQL
            if hasattr(attachment, 'text') and attachment.text:
                # Return first text attachment (usually the summary)
                return attachment.text.content
            
            # If query result, extract key insights only
            if hasattr(attachment, 'query') and attachment.query:
                if hasattr(attachment.query, 'description'):
                    desc = attachment.query.description
                    # Truncate if too long
                    if len(desc) > 500:
                        return desc[:500] + "... [truncated for brevity]"
                    return desc
    
    return message.content or "No clear response from Genie"

Best practice: Return summaries, not raw data. Let Genie do the summarization.

Problem 3: Losing Conversation Context

Symptoms:

  • Each query starts fresh conversation
  • Agent can't do follow-up questions
  • "What about last month?" fails

Root causes:

  • Not tracking conversation IDs
  • Creating new conversation for each query
  • No conversation state management

Solution:

class GenieConversationManager:
    """Manage ongoing conversations with Genie rooms"""
    
    def __init__(self):
        self.workspace_client = WorkspaceClient()
        self.conversations = {}  # space_id -> conversation_id
    
    @tool
    def query_genie_contextual(self, question: str, space_id: str) -> str:
        """
        Query Genie while maintaining conversation context.
        Automatically continues existing conversations or starts new ones.
        """
        # Check if we have an active conversation for this space
        conversation_id = self.conversations.get(space_id)
        
        if conversation_id:
            # Continue existing conversation
            response = self.workspace_client.genie.create_message(
                space_id=space_id,
                conversation_id=conversation_id,
                content=question
            )
        else:
            # Start new conversation
            response = self.workspace_client.genie.start_conversation(
                space_id=space_id,
                content=question
            )
            # Save conversation ID
            self.conversations[space_id] = response.conversation_id
        
        # Poll and return (using patterns from above)
        message = poll_for_completion(
            self.workspace_client,
            space_id,
            response.conversation_id,
            response.message_id
        )
        
        return extract_response(message)
    
    def reset_conversation(self, space_id: str):
        """Start fresh conversation for a space"""
        if space_id in self.conversations:
            del self.conversations[space_id]

Usage in agent:

# Initialize once for agent lifecycle
genie_manager = GenieConversationManager()

@tool
def query_customer_behavior(question: str) -> str:
    """Query customer behavior Genie room"""
    return genie_manager.query_genie_contextual(
        question=question,
        space_id="01f09cdbacf01b5fa7ff7c237365502c"
    )

Problem 4: Genie Space ID Management

Symptoms:

  • Hard-coded space IDs scattered in code
  • Errors when space IDs change
  • Difficulty managing multiple environments

Root causes:

  • No centralized configuration
  • Space IDs embedded in tool definitions
  • No environment-aware setup

Solution:

# config.py
from dataclasses import dataclass
from typing import Dict
import os

@dataclass
class GenieSpaceConfig:
    space_id: str
    name: str
    description: str

class GenieConfig:
    """Centralized Genie space configuration"""
    
    def __init__(self, environment: str = None):
        self.environment = environment or os.getenv("DATABRICKS_ENV", "production")
        self.spaces = self._load_spaces()
    
    def _load_spaces(self) -> Dict[str, GenieSpaceConfig]:
        """Load space configurations per environment"""
        
        # Production spaces
        if self.environment == "production":
            return {
                "customer_behavior": GenieSpaceConfig(
                    space_id="01f09cdbacf01b5fa7ff7c237365502c",
                    name="Customer Behavior Analysis",
                    description="Customer trends and preferences"
                ),
                "inventory": GenieSpaceConfig(
                    space_id="02a10defbcg02c6ga8gg8d348476613d",
                    name="Real-Time Inventory",
                    description="Stock levels and turnover"
                )
            }
        
        # Development spaces
        elif self.environment == "development":
            return {
                "customer_behavior": GenieSpaceConfig(
                    space_id="dev_customer_space_id",
                    name="Customer Behavior (Dev)",
                    description="Dev customer data"
                ),
                "inventory": GenieSpaceConfig(
                    space_id="dev_inventory_space_id",
                    name="Inventory (Dev)",
                    description="Dev inventory data"
                )
            }
        
        raise ValueError(f"Unknown environment: {self.environment}")
    
    def get_space(self, name: str) -> GenieSpaceConfig:
        """Get space configuration by name"""
        if name not in self.spaces:
            raise ValueError(f"Unknown Genie space: {name}")
        return self.spaces[name]

# Usage in tools
config = GenieConfig()

@tool
def query_customer_behavior(question: str) -> str:
    """Query customer behavior"""
    space = config.get_space("customer_behavior")
    return query_genie(question, space.space_id)

SDK Integration Pattern

Complete SDK-Based Tool

from databricks.sdk import WorkspaceClient
from langchain.tools import tool
import time

@tool
def query_genie_sdk(
    question: str,
    space_id: str,
    conversation_id: str = None,
    max_attempts: int = 30
) -> str:
    """
    Query Databricks Genie room using SDK.
    
    Args:
        question: Natural language question
        space_id: Genie space ID
        conversation_id: Optional conversation ID to continue conversation
        max_attempts: Max polling attempts
    
    Returns:
        Genie's response as string
    """
    w = WorkspaceClient()
    
    try:
        # Start or continue conversation
        if conversation_id:
            response = w.genie.create_message(
                space_id=space_id,
                conversation_id=conversation_id,
                content=question
            )
        else:
            response = w.genie.start_conversation(
                space_id=space_id,
                content=question
            )
        
        conv_id = response.conversation_id
        msg_id = response.message_id
        
        # Poll for completion with exponential backoff
        wait_time = 2
        for attempt in range(max_attempts):
            message = w.genie.get_message(
                space_id=space_id,
                conversation_id=conv_id,
                message_id=msg_id
            )
            
            # Check status
            if message.status == "COMPLETED":
                # Extract response
                result = ""
                if message.attachments:
                    for attachment in message.attachments:
                        if hasattr(attachment, 'text') and attachment.text:
                            result += attachment.text.content + "\n"
                        elif hasattr(attachment, 'query') and attachment.query:
                            if hasattr(attachment.query, 'description'):
                                result += attachment.query.description + "\n"
                
                return result.strip() or message.content or "No response"
            
            elif message.status in ["FAILED", "CANCELLED"]:
                return f"Query failed: {message.status}"
            
            # Wait with backoff
            time.sleep(wait_time)
            if attempt % 2 == 1:
                wait_time = min(wait_time * 2, 10)
        
        return "Query timeout after 60 seconds"
        
    except Exception as e:
        return f"Error querying Genie: {str(e)}"

MCP Tool Integration Pattern

Understanding MCP Tools

MCP (Model Context Protocol) tools provide a higher-level interface:

# MCP tools are pre-configured for specific Genie spaces
# They handle polling and conversation management automatically

# Available in environment as:
# - query_space_<SPACE_ID>
# - poll_response_<SPACE_ID>

# Example: Using existing MCP tool
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

# Call Genie via MCP
result = w.genie.query_space(
    space_id="01f09cdbacf01b5fa7ff7c237365502c",
    query="What products are trending?",
    conversation_id=None  # Optional: continue conversation
)

When to Use MCP vs SDK

Use MCP tools when:

  • Standard query/response pattern
  • Don't need custom polling logic
  • Want simpler code
  • MCP tools available for your spaces

Use SDK when:

  • Need custom timeout handling
  • Complex conversation management
  • Custom response parsing
  • MCP tools not configured

Performance Optimization

Pattern 1: Caching Genie Responses

from functools import lru_cache
from datetime import datetime, timedelta

class CachedGenieQuery:
    """Cache Genie responses with TTL"""
    
    def __init__(self, ttl_minutes: int = 15):
        self.cache = {}
        self.ttl = timedelta(minutes=ttl_minutes)
    
    def query(self, question: str, space_id: str) -> str:
        """Query with caching"""
        cache_key = f"{space_id}:{question}"
        
        # Check cache
        if cache_key in self.cache:
            result, timestamp = self.cache[cache_key]
            if datetime.now() - timestamp < self.ttl:
                return result
        
        # Query Genie
        result = query_genie_sdk(question, space_id)
        
        # Cache result
        self.cache[cache_key] = (result, datetime.now())
        
        return result

Pattern 2: Parallel Genie Queries

import concurrent.futures

def query_multiple_genie_spaces(questions: list[tuple[str, str]]) -> list[str]:
    """
    Query multiple Genie spaces in parallel.
    
    Args:
        questions: List of (question, space_id) tuples
    
    Returns:
        List of responses in same order
    """
    def query_one(question_space):
        question, space_id = question_space
        return query_genie_sdk(question, space_id)
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(query_one, questions))
    
    return results

# Usage in agent
questions = [
    ("What products are trending?", "customer_space_id"),
    ("Which locations have high turnover?", "inventory_space_id")
]
results = query_multiple_genie_spaces(questions)

Error Handling Best Practices

Comprehensive Error Strategy

@tool
def query_genie_robust(question: str, space_id: str) -> str:
    """Query Genie with comprehensive error handling"""
    from databricks.sdk import WorkspaceClient
    from databricks.sdk.errors import DatabricksError
    
    w = WorkspaceClient()
    
    try:
        response = w.genie.start_conversation(
            space_id=space_id,
            content=question
        )
        
        message = poll_for_completion(
            w, space_id, response.conversation_id, response.message_id
        )
        
        if message.status == "COMPLETED":
            return extract_response(message)
        elif message.status == "FAILED":
            # Provide actionable error message
            return ("Query failed. This could be due to:\n"
                    "- Invalid SQL generated\n"
                    "- Data source unavailable\n"
                    "- Permissions issue\n"
                    "Try rephrasing your question or check Genie room configuration.")
        else:
            return f"Unexpected status: {message.status}"
    
    except DatabricksError as e:
        if "not found" in str(e).lower():
            return "Genie space not found. Check space ID configuration."
        elif "permission" in str(e).lower():
            return "Permission denied. Ensure agent has access to Genie space."
        else:
            return f"Databricks API error: {str(e)}"
    
    except TimeoutError:
        return "Query timeout. Try a simpler question or check data volume."
    
    except Exception as e:
        return f"Unexpected error: {str(e)}"

Testing Genie Integration

Unit Test Pattern

def test_genie_tool():
    """Test Genie tool in isolation"""
    
    # Test 1: Simple query
    result = query_genie_sdk(
        question="How many customers do we have?",
        space_id="01f09cdbacf01b5fa7ff7c237365502c"
    )
    assert result, "Should return non-empty result"
    assert "error" not in result.lower(), "Should not contain error"
    
    # Test 2: Invalid space ID
    result = query_genie_sdk(
        question="test",
        space_id="invalid_id"
    )
    assert "not found" in result.lower() or "error" in result.lower()
    
    # Test 3: Conversation continuity
    # (Test that conversation_id parameter works)

Integration Test Pattern

def test_genie_in_agent():
    """Test Genie tool within agent workflow"""
    from your_agent import GenieAgent
    
    agent = GenieAgent()
    
    # Test single-tool query
    result = agent.query("What products are trending?")
    assert "customer_behavior" in result['intermediate_steps'][0][0].tool
    
    # Test multi-tool query
    result = agent.query("Trending products at risk of overstock?")
    tools_called = [step[0].tool for step in result['intermediate_steps']]
    assert "customer_behavior" in tools_called
    assert "inventory" in tools_called

Quick Reference

Minimum Viable Genie Tool (SDK)

from databricks.sdk import WorkspaceClient
from langchain.tools import tool
import time

@tool
def query_genie(question: str, space_id: str) -> str:
    """Query Genie room"""
    w = WorkspaceClient()
    resp = w.genie.start_conversation(space_id=space_id, content=question)
    
    for _ in range(30):
        msg = w.genie.get_message(space_id, resp.conversation_id, resp.message_id)
        if msg.status == "COMPLETED":
            return msg.attachments[0].text.content if msg.attachments else msg.content
        elif msg.status in ["FAILED", "CANCELLED"]:
            return f"Failed: {msg.status}"
        time.sleep(2)
    
    return "Timeout"

Related Skills

  • mosaic-ai-agent: Design agents that use Genie tools
  • agent-mlops: Deploy Genie-powered agents to production