๐Ÿ“… let's chat! explore the endless possibilities creating industries that don't exist. click here

databricks-rate-limits

Implement Databricks API rate limiting, backoff, and idempotency patterns. Use when handling rate limit errors, implementing retry logic, or optimizing API request throughput for Databricks. Trigger with phrases like "databricks rate limit", "databricks throttling", "databricks 429", "databricks retry", "databricks backoff". allowed-tools: Read, Write, Edit version: 1.0.0 license: MIT author: Jeremy Longshore <jeremy@intentsolutions.io>

Allowed Tools

No tools specified

Provided by Plugin

databricks-pack

Claude Code skill pack for Databricks (24 skills)

saas packs v1.0.0
View Plugin

Installation

This skill is included in the databricks-pack plugin:

/plugin install databricks-pack@claude-code-plugins-plus

Click to copy

Instructions

# Databricks Rate Limits ## Overview Handle Databricks API rate limits gracefully with exponential backoff. ## Prerequisites - Databricks SDK installed - Understanding of async/await patterns - Access to Databricks workspace ## Instructions ### Step 1: Understand Rate Limit Tiers | API Category | Rate Limit | Notes | |--------------|------------|-------| | Workspace APIs | 30 req/s | Clusters, jobs, notebooks | | SQL Warehouse | 10 req/s | Statement execution | | Unity Catalog | 100 req/s | Tables, grants, lineage | | Jobs Runs | 1000 runs/hour | Per workspace | | Clusters | 20 creates/hour | Per workspace | ### Step 2: Implement Exponential Backoff with Jitter ```python # src/databricks/rate_limit.py import time import random from typing import Callable, TypeVar from databricks.sdk.errors import ( DatabricksError, TooManyRequests, TemporarilyUnavailable, ResourceConflict, ) T = TypeVar("T") def with_exponential_backoff( operation: Callable[[], T], max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0, jitter_factor: float = 0.5, ) -> T: """ Execute operation with exponential backoff on rate limits. Args: operation: Callable to execute max_retries: Maximum retry attempts base_delay: Initial delay in seconds max_delay: Maximum delay cap jitter_factor: Random jitter factor (0-1) Returns: Result of operation Raises: Original exception after max retries """ retryable_errors = (TooManyRequests, TemporarilyUnavailable, ResourceConflict) for attempt in range(max_retries + 1): try: return operation() except retryable_errors as e: if attempt == max_retries: raise # Calculate delay with exponential backoff exponential_delay = base_delay * (2 ** attempt) # Add jitter to prevent thundering herd jitter = random.uniform(0, jitter_factor * exponential_delay) delay = min(exponential_delay + jitter, max_delay) # Check for Retry-After header if hasattr(e, 'retry_after') and e.retry_after: delay = max(delay, float(e.retry_after)) print(f"Rate limited (attempt {attempt + 1}/{max_retries}). " f"Retrying in {delay:.2f}s...") time.sleep(delay) except DatabricksError as e: # Don't retry client errors (4xx except 429) if e.error_code and e.error_code.startswith("4"): raise if attempt == max_retries: raise time.sleep(base_delay * (2 ** attempt)) raise RuntimeError("Unreachable") ``` ### Step 3: Implement Request Queue for Bulk Operations ```python # src/databricks/queue.py import time from collections import deque from threading import Lock from typing import Callable, TypeVar T = TypeVar("T") class RateLimitedQueue: """Token bucket rate limiter for API calls.""" def __init__( self, requests_per_second: float = 10.0, burst_size: int = 20, ): self.rate = requests_per_second self.burst_size = burst_size self.tokens = burst_size self.last_update = time.monotonic() self.lock = Lock() def _refill_tokens(self) -> None: """Refill tokens based on elapsed time.""" now = time.monotonic() elapsed = now - self.last_update self.tokens = min( self.burst_size, self.tokens + elapsed * self.rate ) self.last_update = now def acquire(self, tokens: int = 1) -> None: """Block until tokens are available.""" with self.lock: while True: self._refill_tokens() if self.tokens >= tokens: self.tokens -= tokens return # Calculate wait time wait_time = (tokens - self.tokens) / self.rate time.sleep(wait_time) def execute(self, operation: Callable[[], T]) -> T: """Execute operation with rate limiting.""" self.acquire() return operation() # Global rate limiters by API category rate_limiters = { "workspace": RateLimitedQueue(requests_per_second=25, burst_size=30), "sql": RateLimitedQueue(requests_per_second=8, burst_size=10), "unity_catalog": RateLimitedQueue(requests_per_second=80, burst_size=100), } def rate_limited(category: str = "workspace"): """Decorator for rate-limited API calls.""" def decorator(func): def wrapper(*args, **kwargs): limiter = rate_limiters.get(category, rate_limiters["workspace"]) return limiter.execute(lambda: func(*args, **kwargs)) return wrapper return decorator # Usage @rate_limited("workspace") def list_clusters(w): return list(w.clusters.list()) ``` ### Step 4: Async Batch Processing ```python # src/databricks/batch.py import asyncio from concurrent.futures import ThreadPoolExecutor from typing import List, Callable, TypeVar from databricks.sdk import WorkspaceClient T = TypeVar("T") async def batch_api_calls( w: WorkspaceClient, items: List[any], operation: Callable[[WorkspaceClient, any], T], concurrency: int = 5, delay_between_batches: float = 1.0, ) -> List[T]: """ Execute API calls in batches with rate limiting. Args: w: WorkspaceClient instance items: Items to process operation: Function to call for each item concurrency: Max concurrent requests delay_between_batches: Delay between batches Returns: List of results """ results = [] semaphore = asyncio.Semaphore(concurrency) loop = asyncio.get_event_loop() async def process_item(item): async with semaphore: # Run in thread pool since SDK is sync with ThreadPoolExecutor() as executor: result = await loop.run_in_executor( executor, lambda: with_exponential_backoff( lambda: operation(w, item) ) ) return result # Process in batches batch_size = concurrency * 2 for i in range(0, len(items), batch_size): batch = items[i:i + batch_size] batch_results = await asyncio.gather( *[process_item(item) for item in batch], return_exceptions=True ) results.extend(batch_results) # Delay between batches if i + batch_size < len(items): await asyncio.sleep(delay_between_batches) return results # Usage async def update_all_job_tags(w: WorkspaceClient, job_ids: List[int], tags: dict): def update_job(w, job_id): return w.jobs.update(job_id=job_id, new_settings={"tags": tags}) results = await batch_api_calls( w, job_ids, update_job, concurrency=5, delay_between_batches=1.0 ) return results ``` ### Step 5: Idempotency for Job Submissions ```python # src/databricks/idempotent.py import hashlib import json from typing import Optional from databricks.sdk import WorkspaceClient from databricks.sdk.service.jobs import RunNow, BaseRun def generate_idempotency_token( job_id: int, params: dict, timestamp_bucket: int = 3600, # 1 hour buckets ) -> str: """ Generate deterministic idempotency token for job run. Uses timestamp buckets to allow retries within a time window while preventing duplicate runs across windows. """ import time bucket = int(time.time()) // timestamp_bucket data = json.dumps({ "job_id": job_id, "params": params, "bucket": bucket, }, sort_keys=True) return hashlib.sha256(data.encode()).hexdigest()[:32] def idempotent_run_now( w: WorkspaceClient, job_id: int, notebook_params: Optional[dict] = None, idempotency_token: Optional[str] = None, ) -> BaseRun: """ Submit job run with idempotency guarantee. If a run with the same idempotency token exists, returns the existing run instead of creating a new one. """ # Generate token if not provided if not idempotency_token: idempotency_token = generate_idempotency_token( job_id, notebook_params or {} ) # Check for existing run with same token recent_runs = list(w.jobs.list_runs( job_id=job_id, active_only=True, )) for run in recent_runs: # Check if run was started with same token # (Token would need to be stored in run tags in practice) pass # Submit new run run = w.jobs.run_now( job_id=job_id, notebook_params=notebook_params, ) return run ``` ## Output - Reliable API calls with automatic retry - Rate-limited request queue - Async batch processing for bulk operations - Idempotent job submissions ## Error Handling | Scenario | Behavior | Configuration | |----------|----------|---------------| | HTTP 429 | Exponential backoff | `max_retries=5` | | HTTP 503 | Retry with delay | `base_delay=1.0` | | Conflict (409) | Retry once | Check idempotency | | Timeout | Retry with increased timeout | `max_delay=60` | ## Examples ### Bulk Cluster Operations ```python from databricks.sdk import WorkspaceClient w = WorkspaceClient() # Rate-limited listing @rate_limited("workspace") def safe_list_clusters(): return list(w.clusters.list()) # Bulk updates with queue cluster_ids = [c.cluster_id for c in safe_list_clusters()] async def update_all_clusters(): results = await batch_api_calls( w, cluster_ids, lambda w, cid: w.clusters.edit( cluster_id=cid, autotermination_minutes=30 ), concurrency=3 ) return results # Run async import asyncio asyncio.run(update_all_clusters()) ``` ### Monitor Rate Limit Usage ```python class RateLimitMonitor: """Track rate limit usage across API calls.""" def __init__(self): self.calls = [] self.window_seconds = 60 def record_call(self, category: str): import time now = time.time() self.calls.append({"time": now, "category": category}) # Cleanup old entries self.calls = [c for c in self.calls if now - c["time"] < self.window_seconds] def get_rate(self, category: str) -> float: import time now = time.time() recent = [c for c in self.calls if c["category"] == category and now - c["time"] < self.window_seconds] return len(recent) / self.window_seconds def should_throttle(self, category: str, limit: float) -> bool: return self.get_rate(category) > limit * 0.8 # 80% threshold ``` ## Resources - [Databricks API Rate Limits](https://docs.databricks.com/dev-tools/api/latest/rate-limits.html) - [Best Practices for API Usage](https://docs.databricks.com/dev-tools/api/latest/best-practices.html) ## Next Steps For security configuration, see `databricks-security-basics`.

Skill file: plugins/saas-packs/databricks-pack/skills/databricks-rate-limits/SKILL.md