databricks-performance-tuning
Optimize Databricks cluster and query performance. Use when jobs are running slowly, optimizing Spark configurations, or improving Delta Lake query performance. Trigger with phrases like "databricks performance", "spark tuning", "databricks slow", "optimize databricks", "cluster performance". allowed-tools: Read, Write, Edit, Bash(databricks:*), Grep version: 1.0.0 license: MIT author: Jeremy Longshore <jeremy@intentsolutions.io>
Allowed Tools
No tools specified
Provided by Plugin
databricks-pack
Claude Code skill pack for Databricks (24 skills)
Installation
This skill is included in the databricks-pack plugin:
/plugin install databricks-pack@claude-code-plugins-plus
Click to copy
Instructions
# Databricks Performance Tuning
## Overview
Optimize Databricks cluster, Spark, and Delta Lake performance.
## Prerequisites
- Access to cluster configuration
- Understanding of workload characteristics
- Query history access
## Instructions
### Step 1: Cluster Sizing
```python
# Cluster sizing calculator
def recommend_cluster_size(
data_size_gb: float,
complexity: str = "medium", # low, medium, high
parallelism_need: str = "standard", # standard, high
) -> dict:
"""
Recommend cluster configuration based on workload.
Args:
data_size_gb: Estimated data size to process
complexity: Query/transform complexity
parallelism_need: Required parallelism level
Returns:
Recommended cluster configuration
"""
# Memory per executor (standard DS3_v2 = 14GB)
memory_per_worker = 14
# Base calculation
workers_by_data = max(1, int(data_size_gb / memory_per_worker / 2))
# Adjust for complexity
complexity_multiplier = {"low": 1, "medium": 1.5, "high": 2.5}
workers = int(workers_by_data * complexity_multiplier.get(complexity, 1.5))
# Adjust for parallelism
if parallelism_need == "high":
workers = max(workers, 8)
return {
"node_type_id": "Standard_DS3_v2",
"num_workers": workers,
"autoscale": {
"min_workers": max(1, workers // 2),
"max_workers": workers * 2,
},
"spark_conf": {
"spark.sql.shuffle.partitions": str(workers * 4),
"spark.default.parallelism": str(workers * 4),
}
}
```
### Step 2: Spark Configuration Optimization
```python
# Optimized Spark configurations by workload type
spark_configs = {
"etl_batch": {
# Memory and parallelism
"spark.sql.shuffle.partitions": "200",
"spark.default.parallelism": "200",
"spark.sql.files.maxPartitionBytes": "134217728", # 128MB
# Delta Lake optimizations
"spark.databricks.delta.optimizeWrite.enabled": "true",
"spark.databricks.delta.autoCompact.enabled": "true",
"spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite": "true",
# Adaptive query execution
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",
},
"ml_training": {
# Memory for ML workloads
"spark.driver.memory": "16g",
"spark.executor.memory": "16g",
"spark.memory.fraction": "0.8",
"spark.memory.storageFraction": "0.3",
# Serialization for ML
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.kryoserializer.buffer.max": "1024m",
},
"streaming": {
# Streaming configurations
"spark.sql.streaming.schemaInference": "true",
"spark.sql.streaming.checkpointLocation": "/mnt/checkpoints",
"spark.databricks.delta.autoCompact.minNumFiles": "10",
# Micro-batch tuning
"spark.sql.streaming.forEachBatch.enabled": "true",
},
"interactive": {
# Fast startup
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
# Caching
"spark.sql.inMemoryColumnarStorage.compressed": "true",
"spark.sql.inMemoryColumnarStorage.batchSize": "10000",
}
}
```
### Step 3: Delta Lake Optimization
```python
# delta_optimization.py
from pyspark.sql import SparkSession
def optimize_delta_table(
spark: SparkSession,
table_name: str,
z_order_columns: list[str] = None,
vacuum_hours: int = 168, # 7 days
) -> dict:
"""
Optimize Delta table for query performance.
Args:
spark: SparkSession
table_name: Fully qualified table name
z_order_columns: Columns for Z-ordering (max 4)
vacuum_hours: Retention period for vacuum
Returns:
Optimization results
"""
results = {}
# 1. Run OPTIMIZE with Z-ordering
if z_order_columns:
z_order_clause = ", ".join(z_order_columns[:4]) # Max 4 columns
spark.sql(f"OPTIMIZE {table_name} ZORDER BY ({z_order_clause})")
results["z_order"] = z_order_columns
else:
spark.sql(f"OPTIMIZE {table_name}")
results["optimized"] = True
# 2. Analyze table statistics
spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS")
results["statistics_computed"] = True
# 3. Vacuum old files
spark.sql(f"VACUUM {table_name} RETAIN {vacuum_hours} HOURS")
results["vacuumed"] = True
# 4. Get table metrics
detail = spark.sql(f"DESCRIBE DETAIL {table_name}").first()
results["metrics"] = {
"num_files": detail.numFiles,
"size_bytes": detail.sizeInBytes,
"partitions": detail.partitionColumns,
}
return results
def enable_liquid_clustering(
spark: SparkSession,
table_name: str,
cluster_columns: list[str],
) -> None:
"""
Enable Liquid Clustering for automatic data layout optimization.
Liquid Clustering replaces traditional partitioning and Z-ordering
with automatic, incremental clustering.
"""
columns = ", ".join(cluster_columns)
spark.sql(f"""
ALTER TABLE {table_name}
CLUSTER BY ({columns})
""")
def enable_predictive_optimization(
spark: SparkSession,
table_name: str,
) -> None:
"""Enable Databricks Predictive Optimization."""
spark.sql(f"""
ALTER TABLE {table_name}
SET TBLPROPERTIES (
'delta.enableDeletionVectors' = 'true',
'delta.targetFileSize' = '104857600'
)
""")
```
### Step 4: Query Performance Analysis
```sql
-- Find slow queries from query history
SELECT
query_id,
query_text,
duration / 1000 as seconds,
rows_produced,
bytes_read,
start_time
FROM system.query.history
WHERE duration > 60000 -- > 60 seconds
AND start_time > current_timestamp() - INTERVAL 24 HOURS
ORDER BY duration DESC
LIMIT 20;
-- Analyze query plan
EXPLAIN FORMATTED
SELECT * FROM main.silver.orders
WHERE order_date > '2024-01-01'
AND region = 'US';
-- Check table scan statistics
SELECT
table_name,
SUM(bytes_read) / 1024 / 1024 / 1024 as gb_read,
SUM(rows_produced) as total_rows,
COUNT(*) as query_count
FROM system.query.history
WHERE start_time > current_timestamp() - INTERVAL 7 DAYS
GROUP BY table_name
ORDER BY gb_read DESC;
```
### Step 5: Caching Strategy
```python
# Intelligent caching for repeated queries
from pyspark.sql import DataFrame
from functools import lru_cache
class CacheManager:
"""Manage Spark DataFrame caching."""
def __init__(self, spark: SparkSession):
self.spark = spark
self._cache_registry = {}
def cache_table(
self,
table_name: str,
cache_level: str = "MEMORY_AND_DISK",
) -> DataFrame:
"""Cache table with specified storage level."""
if table_name in self._cache_registry:
return self._cache_registry[table_name]
df = self.spark.table(table_name)
if cache_level == "MEMORY_ONLY":
df.cache()
elif cache_level == "MEMORY_AND_DISK":
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)
elif cache_level == "DISK_ONLY":
from pyspark import StorageLevel
df.persist(StorageLevel.DISK_ONLY)
# Trigger caching
df.count()
self._cache_registry[table_name] = df
return df
def uncache_all(self):
"""Clear all cached DataFrames."""
for df in self._cache_registry.values():
df.unpersist()
self._cache_registry.clear()
self.spark.catalog.clearCache()
# Delta Cache (automatic)
# Enable in cluster config:
# "spark.databricks.io.cache.enabled": "true"
# "spark.databricks.io.cache.maxDiskUsage": "50g"
```
### Step 6: Join Optimization
```python
from pyspark.sql import DataFrame
from pyspark.sql.functions import broadcast
def optimize_join(
df_large: DataFrame,
df_small: DataFrame,
join_key: str,
small_table_threshold_mb: int = 100,
) -> DataFrame:
"""
Optimize join based on table sizes.
Uses broadcast join for small tables,
sort-merge join for large tables.
"""
# Estimate small table size
small_size_mb = df_small.count() * 100 / 1024 / 1024 # rough estimate
if small_size_mb < small_table_threshold_mb:
# Broadcast join (small table fits in memory)
return df_large.join(broadcast(df_small), join_key)
else:
# Sort-merge join with bucketing hint
return df_large.join(df_small, join_key, "inner")
# Bucketed tables for frequent joins
def create_bucketed_table(
spark: SparkSession,
df: DataFrame,
table_name: str,
bucket_columns: list[str],
num_buckets: int = 100,
):
"""Create bucketed table for join optimization."""
(
df.write
.bucketBy(num_buckets, *bucket_columns)
.sortBy(*bucket_columns)
.saveAsTable(table_name)
)
```
## Output
- Optimized cluster configuration
- Tuned Spark settings
- Optimized Delta tables
- Improved query performance
## Error Handling
| Issue | Cause | Solution |
|-------|-------|----------|
| OOM errors | Insufficient memory | Increase executor memory or reduce partition size |
| Skewed data | Uneven distribution | Use salting or AQE skew handling |
| Slow joins | Large shuffle | Use broadcast for small tables |
| Too many files | Small files problem | Run OPTIMIZE regularly |
## Examples
### Performance Benchmark
```python
import time
def benchmark_query(spark, query: str, runs: int = 3) -> dict:
"""Benchmark query execution time."""
times = []
for _ in range(runs):
spark.catalog.clearCache()
start = time.time()
spark.sql(query).collect()
times.append(time.time() - start)
return {
"min": min(times),
"max": max(times),
"avg": sum(times) / len(times),
"runs": runs,
}
```
## Resources
- [Databricks Performance Guide](https://docs.databricks.com/optimizations/index.html)
- [Delta Lake Optimization](https://docs.databricks.com/delta/optimizations/index.html)
- [Spark Tuning Guide](https://spark.apache.org/docs/latest/tuning.html)
## Next Steps
For cost optimization, see `databricks-cost-tuning`.