๐Ÿ“… let's chat! explore the endless possibilities creating industries that don't exist. click here

databricks-sdk-patterns

Apply production-ready Databricks SDK patterns for Python and REST API. Use when implementing Databricks integrations, refactoring SDK usage, or establishing team coding standards for Databricks. Trigger with phrases like "databricks SDK patterns", "databricks best practices", "databricks code patterns", "idiomatic databricks". allowed-tools: Read, Write, Edit version: 1.0.0 license: MIT author: Jeremy Longshore <jeremy@intentsolutions.io>

Allowed Tools

No tools specified

Provided by Plugin

databricks-pack

Claude Code skill pack for Databricks (24 skills)

saas packs v1.0.0
View Plugin

Installation

This skill is included in the databricks-pack plugin:

/plugin install databricks-pack@claude-code-plugins-plus

Click to copy

Instructions

# Databricks SDK Patterns ## Overview Production-ready patterns for Databricks SDK usage in Python. ## Prerequisites - Completed `databricks-install-auth` setup - Familiarity with async/await patterns - Understanding of error handling best practices ## Instructions ### Step 1: Implement Singleton Pattern ```python # src/databricks/client.py from databricks.sdk import WorkspaceClient from functools import lru_cache from typing import Optional import os @lru_cache(maxsize=1) def get_workspace_client(profile: Optional[str] = None) -> WorkspaceClient: """Get singleton WorkspaceClient instance.""" return WorkspaceClient(profile=profile) # Multi-workspace support _clients: dict[str, WorkspaceClient] = {} def get_client_for_workspace(workspace_name: str) -> WorkspaceClient: """Get or create client for specific workspace.""" if workspace_name not in _clients: config = get_workspace_config(workspace_name) _clients[workspace_name] = WorkspaceClient( host=config["host"], token=config["token"] ) return _clients[workspace_name] def get_workspace_config(name: str) -> dict: """Load workspace config from environment.""" return { "host": os.environ[f"DATABRICKS_{name.upper()}_HOST"], "token": os.environ[f"DATABRICKS_{name.upper()}_TOKEN"], } ``` ### Step 2: Add Error Handling Wrapper ```python # src/databricks/errors.py from databricks.sdk.errors import ( DatabricksError, NotFound, ResourceAlreadyExists, PermissionDenied, ResourceConflict, BadRequest, ) from dataclasses import dataclass from typing import TypeVar, Generic, Optional import logging logger = logging.getLogger(__name__) T = TypeVar("T") @dataclass class Result(Generic[T]): """Result wrapper for API operations.""" data: Optional[T] = None error: Optional[Exception] = None @property def is_success(self) -> bool: return self.error is None def unwrap(self) -> T: if self.error: raise self.error return self.data def safe_databricks_call(operation): """Decorator for safe Databricks API calls.""" def wrapper(*args, **kwargs) -> Result: try: result = operation(*args, **kwargs) return Result(data=result) except NotFound as e: logger.warning(f"Resource not found: {e}") return Result(error=e) except ResourceAlreadyExists as e: logger.warning(f"Resource already exists: {e}") return Result(error=e) except PermissionDenied as e: logger.error(f"Permission denied: {e}") return Result(error=e) except DatabricksError as e: logger.error(f"Databricks error: {e.error_code} - {e.message}") return Result(error=e) return wrapper ``` ### Step 3: Implement Retry Logic with Backoff ```python # src/databricks/retry.py import time from typing import Callable, TypeVar from databricks.sdk.errors import ( DatabricksError, TemporarilyUnavailable, TooManyRequests, ) T = TypeVar("T") def with_retry( operation: Callable[[], T], max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0, jitter: float = 0.5, ) -> T: """Execute operation with exponential backoff retry.""" import random for attempt in range(max_retries + 1): try: return operation() except (TemporarilyUnavailable, TooManyRequests) as e: if attempt == max_retries: raise delay = min(base_delay * (2 ** attempt), max_delay) delay += random.uniform(0, jitter) print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}") time.sleep(delay) except DatabricksError as e: # Don't retry client errors if e.error_code and e.error_code.startswith("4"): raise if attempt == max_retries: raise delay = min(base_delay * (2 ** attempt), max_delay) time.sleep(delay) raise RuntimeError("Unreachable") ``` ### Step 4: Context Manager for Clusters ```python # src/databricks/cluster.py from contextlib import contextmanager from databricks.sdk import WorkspaceClient from databricks.sdk.service.compute import State import time @contextmanager def managed_cluster(w: WorkspaceClient, cluster_id: str): """Context manager for cluster lifecycle.""" try: # Start cluster if not running cluster = w.clusters.get(cluster_id) if cluster.state != State.RUNNING: print(f"Starting cluster {cluster_id}...") w.clusters.start(cluster_id) w.clusters.wait_get_cluster_running(cluster_id) yield cluster_id finally: # Optionally terminate after use pass # Or: w.clusters.delete(cluster_id) @contextmanager def ephemeral_cluster(w: WorkspaceClient, spec: dict): """Create temporary cluster that auto-deletes.""" cluster = w.clusters.create_and_wait(**spec) try: yield cluster.cluster_id finally: w.clusters.permanent_delete(cluster.cluster_id) ``` ### Step 5: Type-Safe Job Builders ```python # src/databricks/jobs.py from databricks.sdk import WorkspaceClient from databricks.sdk.service.jobs import ( Task, NotebookTask, PythonWheelTask, JobCluster, JobSettings, CreateJob, ) from dataclasses import dataclass, field from typing import Optional @dataclass class JobBuilder: """Fluent builder for Databricks jobs.""" name: str tasks: list[Task] = field(default_factory=list) job_clusters: list[JobCluster] = field(default_factory=list) tags: dict[str, str] = field(default_factory=dict) def add_notebook_task( self, task_key: str, notebook_path: str, cluster_key: str, parameters: Optional[dict] = None, ) -> "JobBuilder": """Add a notebook task.""" self.tasks.append(Task( task_key=task_key, job_cluster_key=cluster_key, notebook_task=NotebookTask( notebook_path=notebook_path, base_parameters=parameters or {}, ) )) return self def add_wheel_task( self, task_key: str, package_name: str, entry_point: str, cluster_key: str, parameters: Optional[list[str]] = None, ) -> "JobBuilder": """Add a Python wheel task.""" self.tasks.append(Task( task_key=task_key, job_cluster_key=cluster_key, python_wheel_task=PythonWheelTask( package_name=package_name, entry_point=entry_point, parameters=parameters or [], ) )) return self def create(self, w: WorkspaceClient) -> int: """Create the job and return job_id.""" job = w.jobs.create( name=self.name, tasks=self.tasks, job_clusters=self.job_clusters, tags=self.tags, ) return job.job_id # Usage job_id = ( JobBuilder("etl-pipeline") .add_notebook_task("bronze", "/pipelines/bronze", "etl-cluster") .add_notebook_task("silver", "/pipelines/silver", "etl-cluster") .create(w) ) ``` ## Output - Type-safe client singleton - Robust error handling with structured logging - Automatic retry with exponential backoff - Fluent job builder pattern ## Error Handling | Pattern | Use Case | Benefit | |---------|----------|---------| | Result wrapper | All API calls | Type-safe error handling | | Retry logic | Transient failures | Improves reliability | | Context managers | Cluster lifecycle | Resource cleanup | | Builders | Job creation | Type safety and fluency | ## Examples ### Factory Pattern (Multi-Tenant) ```python from functools import lru_cache @lru_cache(maxsize=100) def get_tenant_client(tenant_id: str) -> WorkspaceClient: """Get workspace client for tenant.""" config = get_tenant_config(tenant_id) return WorkspaceClient( host=config.workspace_url, token=config.token, ) ``` ### Async Operations ```python import asyncio from concurrent.futures import ThreadPoolExecutor async def async_list_clusters(w: WorkspaceClient): """Async wrapper for cluster listing.""" loop = asyncio.get_event_loop() with ThreadPoolExecutor() as executor: clusters = await loop.run_in_executor( executor, lambda: list(w.clusters.list()) ) return clusters ``` ## Resources - [Databricks SDK for Python](https://docs.databricks.com/dev-tools/sdk-python.html) - [API Reference](https://docs.databricks.com/api/workspace/introduction) - [Error Handling](https://docs.databricks.com/dev-tools/sdk-python.html#error-handling) ## Next Steps Apply patterns in `databricks-core-workflow-a` for Delta Lake ETL.

Skill file: plugins/saas-packs/databricks-pack/skills/databricks-sdk-patterns/SKILL.md