databricks-sdk-patterns
Apply production-ready Databricks SDK patterns for Python and REST API. Use when implementing Databricks integrations, refactoring SDK usage, or establishing team coding standards for Databricks. Trigger with phrases like "databricks SDK patterns", "databricks best practices", "databricks code patterns", "idiomatic databricks". allowed-tools: Read, Write, Edit version: 1.0.0 license: MIT author: Jeremy Longshore <jeremy@intentsolutions.io>
Allowed Tools
No tools specified
Provided by Plugin
databricks-pack
Claude Code skill pack for Databricks (24 skills)
Installation
This skill is included in the databricks-pack plugin:
/plugin install databricks-pack@claude-code-plugins-plus
Click to copy
Instructions
# Databricks SDK Patterns
## Overview
Production-ready patterns for Databricks SDK usage in Python.
## Prerequisites
- Completed `databricks-install-auth` setup
- Familiarity with async/await patterns
- Understanding of error handling best practices
## Instructions
### Step 1: Implement Singleton Pattern
```python
# src/databricks/client.py
from databricks.sdk import WorkspaceClient
from functools import lru_cache
from typing import Optional
import os
@lru_cache(maxsize=1)
def get_workspace_client(profile: Optional[str] = None) -> WorkspaceClient:
"""Get singleton WorkspaceClient instance."""
return WorkspaceClient(profile=profile)
# Multi-workspace support
_clients: dict[str, WorkspaceClient] = {}
def get_client_for_workspace(workspace_name: str) -> WorkspaceClient:
"""Get or create client for specific workspace."""
if workspace_name not in _clients:
config = get_workspace_config(workspace_name)
_clients[workspace_name] = WorkspaceClient(
host=config["host"],
token=config["token"]
)
return _clients[workspace_name]
def get_workspace_config(name: str) -> dict:
"""Load workspace config from environment."""
return {
"host": os.environ[f"DATABRICKS_{name.upper()}_HOST"],
"token": os.environ[f"DATABRICKS_{name.upper()}_TOKEN"],
}
```
### Step 2: Add Error Handling Wrapper
```python
# src/databricks/errors.py
from databricks.sdk.errors import (
DatabricksError,
NotFound,
ResourceAlreadyExists,
PermissionDenied,
ResourceConflict,
BadRequest,
)
from dataclasses import dataclass
from typing import TypeVar, Generic, Optional
import logging
logger = logging.getLogger(__name__)
T = TypeVar("T")
@dataclass
class Result(Generic[T]):
"""Result wrapper for API operations."""
data: Optional[T] = None
error: Optional[Exception] = None
@property
def is_success(self) -> bool:
return self.error is None
def unwrap(self) -> T:
if self.error:
raise self.error
return self.data
def safe_databricks_call(operation):
"""Decorator for safe Databricks API calls."""
def wrapper(*args, **kwargs) -> Result:
try:
result = operation(*args, **kwargs)
return Result(data=result)
except NotFound as e:
logger.warning(f"Resource not found: {e}")
return Result(error=e)
except ResourceAlreadyExists as e:
logger.warning(f"Resource already exists: {e}")
return Result(error=e)
except PermissionDenied as e:
logger.error(f"Permission denied: {e}")
return Result(error=e)
except DatabricksError as e:
logger.error(f"Databricks error: {e.error_code} - {e.message}")
return Result(error=e)
return wrapper
```
### Step 3: Implement Retry Logic with Backoff
```python
# src/databricks/retry.py
import time
from typing import Callable, TypeVar
from databricks.sdk.errors import (
DatabricksError,
TemporarilyUnavailable,
TooManyRequests,
)
T = TypeVar("T")
def with_retry(
operation: Callable[[], T],
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: float = 0.5,
) -> T:
"""Execute operation with exponential backoff retry."""
import random
for attempt in range(max_retries + 1):
try:
return operation()
except (TemporarilyUnavailable, TooManyRequests) as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
delay += random.uniform(0, jitter)
print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
time.sleep(delay)
except DatabricksError as e:
# Don't retry client errors
if e.error_code and e.error_code.startswith("4"):
raise
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
time.sleep(delay)
raise RuntimeError("Unreachable")
```
### Step 4: Context Manager for Clusters
```python
# src/databricks/cluster.py
from contextlib import contextmanager
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import State
import time
@contextmanager
def managed_cluster(w: WorkspaceClient, cluster_id: str):
"""Context manager for cluster lifecycle."""
try:
# Start cluster if not running
cluster = w.clusters.get(cluster_id)
if cluster.state != State.RUNNING:
print(f"Starting cluster {cluster_id}...")
w.clusters.start(cluster_id)
w.clusters.wait_get_cluster_running(cluster_id)
yield cluster_id
finally:
# Optionally terminate after use
pass # Or: w.clusters.delete(cluster_id)
@contextmanager
def ephemeral_cluster(w: WorkspaceClient, spec: dict):
"""Create temporary cluster that auto-deletes."""
cluster = w.clusters.create_and_wait(**spec)
try:
yield cluster.cluster_id
finally:
w.clusters.permanent_delete(cluster.cluster_id)
```
### Step 5: Type-Safe Job Builders
```python
# src/databricks/jobs.py
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import (
Task, NotebookTask, PythonWheelTask,
JobCluster, JobSettings, CreateJob,
)
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class JobBuilder:
"""Fluent builder for Databricks jobs."""
name: str
tasks: list[Task] = field(default_factory=list)
job_clusters: list[JobCluster] = field(default_factory=list)
tags: dict[str, str] = field(default_factory=dict)
def add_notebook_task(
self,
task_key: str,
notebook_path: str,
cluster_key: str,
parameters: Optional[dict] = None,
) -> "JobBuilder":
"""Add a notebook task."""
self.tasks.append(Task(
task_key=task_key,
job_cluster_key=cluster_key,
notebook_task=NotebookTask(
notebook_path=notebook_path,
base_parameters=parameters or {},
)
))
return self
def add_wheel_task(
self,
task_key: str,
package_name: str,
entry_point: str,
cluster_key: str,
parameters: Optional[list[str]] = None,
) -> "JobBuilder":
"""Add a Python wheel task."""
self.tasks.append(Task(
task_key=task_key,
job_cluster_key=cluster_key,
python_wheel_task=PythonWheelTask(
package_name=package_name,
entry_point=entry_point,
parameters=parameters or [],
)
))
return self
def create(self, w: WorkspaceClient) -> int:
"""Create the job and return job_id."""
job = w.jobs.create(
name=self.name,
tasks=self.tasks,
job_clusters=self.job_clusters,
tags=self.tags,
)
return job.job_id
# Usage
job_id = (
JobBuilder("etl-pipeline")
.add_notebook_task("bronze", "/pipelines/bronze", "etl-cluster")
.add_notebook_task("silver", "/pipelines/silver", "etl-cluster")
.create(w)
)
```
## Output
- Type-safe client singleton
- Robust error handling with structured logging
- Automatic retry with exponential backoff
- Fluent job builder pattern
## Error Handling
| Pattern | Use Case | Benefit |
|---------|----------|---------|
| Result wrapper | All API calls | Type-safe error handling |
| Retry logic | Transient failures | Improves reliability |
| Context managers | Cluster lifecycle | Resource cleanup |
| Builders | Job creation | Type safety and fluency |
## Examples
### Factory Pattern (Multi-Tenant)
```python
from functools import lru_cache
@lru_cache(maxsize=100)
def get_tenant_client(tenant_id: str) -> WorkspaceClient:
"""Get workspace client for tenant."""
config = get_tenant_config(tenant_id)
return WorkspaceClient(
host=config.workspace_url,
token=config.token,
)
```
### Async Operations
```python
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def async_list_clusters(w: WorkspaceClient):
"""Async wrapper for cluster listing."""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
clusters = await loop.run_in_executor(
executor,
lambda: list(w.clusters.list())
)
return clusters
```
## Resources
- [Databricks SDK for Python](https://docs.databricks.com/dev-tools/sdk-python.html)
- [API Reference](https://docs.databricks.com/api/workspace/introduction)
- [Error Handling](https://docs.databricks.com/dev-tools/sdk-python.html#error-handling)
## Next Steps
Apply patterns in `databricks-core-workflow-a` for Delta Lake ETL.