databricks-rate-limits
Implement Databricks API rate limiting, backoff, and idempotency patterns. Use when handling rate limit errors, implementing retry logic, or optimizing API request throughput for Databricks. Trigger with phrases like "databricks rate limit", "databricks throttling", "databricks 429", "databricks retry", "databricks backoff". allowed-tools: Read, Write, Edit version: 1.0.0 license: MIT author: Jeremy Longshore <jeremy@intentsolutions.io>
Allowed Tools
No tools specified
Provided by Plugin
databricks-pack
Claude Code skill pack for Databricks (24 skills)
Installation
This skill is included in the databricks-pack plugin:
/plugin install databricks-pack@claude-code-plugins-plus
Click to copy
Instructions
# Databricks Rate Limits
## Overview
Handle Databricks API rate limits gracefully with exponential backoff.
## Prerequisites
- Databricks SDK installed
- Understanding of async/await patterns
- Access to Databricks workspace
## Instructions
### Step 1: Understand Rate Limit Tiers
| API Category | Rate Limit | Notes |
|--------------|------------|-------|
| Workspace APIs | 30 req/s | Clusters, jobs, notebooks |
| SQL Warehouse | 10 req/s | Statement execution |
| Unity Catalog | 100 req/s | Tables, grants, lineage |
| Jobs Runs | 1000 runs/hour | Per workspace |
| Clusters | 20 creates/hour | Per workspace |
### Step 2: Implement Exponential Backoff with Jitter
```python
# src/databricks/rate_limit.py
import time
import random
from typing import Callable, TypeVar
from databricks.sdk.errors import (
DatabricksError,
TooManyRequests,
TemporarilyUnavailable,
ResourceConflict,
)
T = TypeVar("T")
def with_exponential_backoff(
operation: Callable[[], T],
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter_factor: float = 0.5,
) -> T:
"""
Execute operation with exponential backoff on rate limits.
Args:
operation: Callable to execute
max_retries: Maximum retry attempts
base_delay: Initial delay in seconds
max_delay: Maximum delay cap
jitter_factor: Random jitter factor (0-1)
Returns:
Result of operation
Raises:
Original exception after max retries
"""
retryable_errors = (TooManyRequests, TemporarilyUnavailable, ResourceConflict)
for attempt in range(max_retries + 1):
try:
return operation()
except retryable_errors as e:
if attempt == max_retries:
raise
# Calculate delay with exponential backoff
exponential_delay = base_delay * (2 ** attempt)
# Add jitter to prevent thundering herd
jitter = random.uniform(0, jitter_factor * exponential_delay)
delay = min(exponential_delay + jitter, max_delay)
# Check for Retry-After header
if hasattr(e, 'retry_after') and e.retry_after:
delay = max(delay, float(e.retry_after))
print(f"Rate limited (attempt {attempt + 1}/{max_retries}). "
f"Retrying in {delay:.2f}s...")
time.sleep(delay)
except DatabricksError as e:
# Don't retry client errors (4xx except 429)
if e.error_code and e.error_code.startswith("4"):
raise
if attempt == max_retries:
raise
time.sleep(base_delay * (2 ** attempt))
raise RuntimeError("Unreachable")
```
### Step 3: Implement Request Queue for Bulk Operations
```python
# src/databricks/queue.py
import time
from collections import deque
from threading import Lock
from typing import Callable, TypeVar
T = TypeVar("T")
class RateLimitedQueue:
"""Token bucket rate limiter for API calls."""
def __init__(
self,
requests_per_second: float = 10.0,
burst_size: int = 20,
):
self.rate = requests_per_second
self.burst_size = burst_size
self.tokens = burst_size
self.last_update = time.monotonic()
self.lock = Lock()
def _refill_tokens(self) -> None:
"""Refill tokens based on elapsed time."""
now = time.monotonic()
elapsed = now - self.last_update
self.tokens = min(
self.burst_size,
self.tokens + elapsed * self.rate
)
self.last_update = now
def acquire(self, tokens: int = 1) -> None:
"""Block until tokens are available."""
with self.lock:
while True:
self._refill_tokens()
if self.tokens >= tokens:
self.tokens -= tokens
return
# Calculate wait time
wait_time = (tokens - self.tokens) / self.rate
time.sleep(wait_time)
def execute(self, operation: Callable[[], T]) -> T:
"""Execute operation with rate limiting."""
self.acquire()
return operation()
# Global rate limiters by API category
rate_limiters = {
"workspace": RateLimitedQueue(requests_per_second=25, burst_size=30),
"sql": RateLimitedQueue(requests_per_second=8, burst_size=10),
"unity_catalog": RateLimitedQueue(requests_per_second=80, burst_size=100),
}
def rate_limited(category: str = "workspace"):
"""Decorator for rate-limited API calls."""
def decorator(func):
def wrapper(*args, **kwargs):
limiter = rate_limiters.get(category, rate_limiters["workspace"])
return limiter.execute(lambda: func(*args, **kwargs))
return wrapper
return decorator
# Usage
@rate_limited("workspace")
def list_clusters(w):
return list(w.clusters.list())
```
### Step 4: Async Batch Processing
```python
# src/databricks/batch.py
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Callable, TypeVar
from databricks.sdk import WorkspaceClient
T = TypeVar("T")
async def batch_api_calls(
w: WorkspaceClient,
items: List[any],
operation: Callable[[WorkspaceClient, any], T],
concurrency: int = 5,
delay_between_batches: float = 1.0,
) -> List[T]:
"""
Execute API calls in batches with rate limiting.
Args:
w: WorkspaceClient instance
items: Items to process
operation: Function to call for each item
concurrency: Max concurrent requests
delay_between_batches: Delay between batches
Returns:
List of results
"""
results = []
semaphore = asyncio.Semaphore(concurrency)
loop = asyncio.get_event_loop()
async def process_item(item):
async with semaphore:
# Run in thread pool since SDK is sync
with ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(
executor,
lambda: with_exponential_backoff(
lambda: operation(w, item)
)
)
return result
# Process in batches
batch_size = concurrency * 2
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_results = await asyncio.gather(
*[process_item(item) for item in batch],
return_exceptions=True
)
results.extend(batch_results)
# Delay between batches
if i + batch_size < len(items):
await asyncio.sleep(delay_between_batches)
return results
# Usage
async def update_all_job_tags(w: WorkspaceClient, job_ids: List[int], tags: dict):
def update_job(w, job_id):
return w.jobs.update(job_id=job_id, new_settings={"tags": tags})
results = await batch_api_calls(
w, job_ids, update_job,
concurrency=5,
delay_between_batches=1.0
)
return results
```
### Step 5: Idempotency for Job Submissions
```python
# src/databricks/idempotent.py
import hashlib
import json
from typing import Optional
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import RunNow, BaseRun
def generate_idempotency_token(
job_id: int,
params: dict,
timestamp_bucket: int = 3600, # 1 hour buckets
) -> str:
"""
Generate deterministic idempotency token for job run.
Uses timestamp buckets to allow retries within a time window
while preventing duplicate runs across windows.
"""
import time
bucket = int(time.time()) // timestamp_bucket
data = json.dumps({
"job_id": job_id,
"params": params,
"bucket": bucket,
}, sort_keys=True)
return hashlib.sha256(data.encode()).hexdigest()[:32]
def idempotent_run_now(
w: WorkspaceClient,
job_id: int,
notebook_params: Optional[dict] = None,
idempotency_token: Optional[str] = None,
) -> BaseRun:
"""
Submit job run with idempotency guarantee.
If a run with the same idempotency token exists,
returns the existing run instead of creating a new one.
"""
# Generate token if not provided
if not idempotency_token:
idempotency_token = generate_idempotency_token(
job_id,
notebook_params or {}
)
# Check for existing run with same token
recent_runs = list(w.jobs.list_runs(
job_id=job_id,
active_only=True,
))
for run in recent_runs:
# Check if run was started with same token
# (Token would need to be stored in run tags in practice)
pass
# Submit new run
run = w.jobs.run_now(
job_id=job_id,
notebook_params=notebook_params,
)
return run
```
## Output
- Reliable API calls with automatic retry
- Rate-limited request queue
- Async batch processing for bulk operations
- Idempotent job submissions
## Error Handling
| Scenario | Behavior | Configuration |
|----------|----------|---------------|
| HTTP 429 | Exponential backoff | `max_retries=5` |
| HTTP 503 | Retry with delay | `base_delay=1.0` |
| Conflict (409) | Retry once | Check idempotency |
| Timeout | Retry with increased timeout | `max_delay=60` |
## Examples
### Bulk Cluster Operations
```python
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Rate-limited listing
@rate_limited("workspace")
def safe_list_clusters():
return list(w.clusters.list())
# Bulk updates with queue
cluster_ids = [c.cluster_id for c in safe_list_clusters()]
async def update_all_clusters():
results = await batch_api_calls(
w,
cluster_ids,
lambda w, cid: w.clusters.edit(
cluster_id=cid,
autotermination_minutes=30
),
concurrency=3
)
return results
# Run async
import asyncio
asyncio.run(update_all_clusters())
```
### Monitor Rate Limit Usage
```python
class RateLimitMonitor:
"""Track rate limit usage across API calls."""
def __init__(self):
self.calls = []
self.window_seconds = 60
def record_call(self, category: str):
import time
now = time.time()
self.calls.append({"time": now, "category": category})
# Cleanup old entries
self.calls = [c for c in self.calls if now - c["time"] < self.window_seconds]
def get_rate(self, category: str) -> float:
import time
now = time.time()
recent = [c for c in self.calls
if c["category"] == category and now - c["time"] < self.window_seconds]
return len(recent) / self.window_seconds
def should_throttle(self, category: str, limit: float) -> bool:
return self.get_rate(category) > limit * 0.8 # 80% threshold
```
## Resources
- [Databricks API Rate Limits](https://docs.databricks.com/dev-tools/api/latest/rate-limits.html)
- [Best Practices for API Usage](https://docs.databricks.com/dev-tools/api/latest/best-practices.html)
## Next Steps
For security configuration, see `databricks-security-basics`.