๐Ÿ“… let's chat! explore the endless possibilities creating industries that don't exist. click here

databricks-performance-tuning

Optimize Databricks cluster and query performance. Use when jobs are running slowly, optimizing Spark configurations, or improving Delta Lake query performance. Trigger with phrases like "databricks performance", "spark tuning", "databricks slow", "optimize databricks", "cluster performance". allowed-tools: Read, Write, Edit, Bash(databricks:*), Grep version: 1.0.0 license: MIT author: Jeremy Longshore <jeremy@intentsolutions.io>

Allowed Tools

No tools specified

Provided by Plugin

databricks-pack

Claude Code skill pack for Databricks (24 skills)

saas packs v1.0.0
View Plugin

Installation

This skill is included in the databricks-pack plugin:

/plugin install databricks-pack@claude-code-plugins-plus

Click to copy

Instructions

# Databricks Performance Tuning ## Overview Optimize Databricks cluster, Spark, and Delta Lake performance. ## Prerequisites - Access to cluster configuration - Understanding of workload characteristics - Query history access ## Instructions ### Step 1: Cluster Sizing ```python # Cluster sizing calculator def recommend_cluster_size( data_size_gb: float, complexity: str = "medium", # low, medium, high parallelism_need: str = "standard", # standard, high ) -> dict: """ Recommend cluster configuration based on workload. Args: data_size_gb: Estimated data size to process complexity: Query/transform complexity parallelism_need: Required parallelism level Returns: Recommended cluster configuration """ # Memory per executor (standard DS3_v2 = 14GB) memory_per_worker = 14 # Base calculation workers_by_data = max(1, int(data_size_gb / memory_per_worker / 2)) # Adjust for complexity complexity_multiplier = {"low": 1, "medium": 1.5, "high": 2.5} workers = int(workers_by_data * complexity_multiplier.get(complexity, 1.5)) # Adjust for parallelism if parallelism_need == "high": workers = max(workers, 8) return { "node_type_id": "Standard_DS3_v2", "num_workers": workers, "autoscale": { "min_workers": max(1, workers // 2), "max_workers": workers * 2, }, "spark_conf": { "spark.sql.shuffle.partitions": str(workers * 4), "spark.default.parallelism": str(workers * 4), } } ``` ### Step 2: Spark Configuration Optimization ```python # Optimized Spark configurations by workload type spark_configs = { "etl_batch": { # Memory and parallelism "spark.sql.shuffle.partitions": "200", "spark.default.parallelism": "200", "spark.sql.files.maxPartitionBytes": "134217728", # 128MB # Delta Lake optimizations "spark.databricks.delta.optimizeWrite.enabled": "true", "spark.databricks.delta.autoCompact.enabled": "true", "spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite": "true", # Adaptive query execution "spark.sql.adaptive.enabled": "true", "spark.sql.adaptive.coalescePartitions.enabled": "true", "spark.sql.adaptive.skewJoin.enabled": "true", }, "ml_training": { # Memory for ML workloads "spark.driver.memory": "16g", "spark.executor.memory": "16g", "spark.memory.fraction": "0.8", "spark.memory.storageFraction": "0.3", # Serialization for ML "spark.serializer": "org.apache.spark.serializer.KryoSerializer", "spark.kryoserializer.buffer.max": "1024m", }, "streaming": { # Streaming configurations "spark.sql.streaming.schemaInference": "true", "spark.sql.streaming.checkpointLocation": "/mnt/checkpoints", "spark.databricks.delta.autoCompact.minNumFiles": "10", # Micro-batch tuning "spark.sql.streaming.forEachBatch.enabled": "true", }, "interactive": { # Fast startup "spark.databricks.cluster.profile": "singleNode", "spark.master": "local[*]", # Caching "spark.sql.inMemoryColumnarStorage.compressed": "true", "spark.sql.inMemoryColumnarStorage.batchSize": "10000", } } ``` ### Step 3: Delta Lake Optimization ```python # delta_optimization.py from pyspark.sql import SparkSession def optimize_delta_table( spark: SparkSession, table_name: str, z_order_columns: list[str] = None, vacuum_hours: int = 168, # 7 days ) -> dict: """ Optimize Delta table for query performance. Args: spark: SparkSession table_name: Fully qualified table name z_order_columns: Columns for Z-ordering (max 4) vacuum_hours: Retention period for vacuum Returns: Optimization results """ results = {} # 1. Run OPTIMIZE with Z-ordering if z_order_columns: z_order_clause = ", ".join(z_order_columns[:4]) # Max 4 columns spark.sql(f"OPTIMIZE {table_name} ZORDER BY ({z_order_clause})") results["z_order"] = z_order_columns else: spark.sql(f"OPTIMIZE {table_name}") results["optimized"] = True # 2. Analyze table statistics spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS") results["statistics_computed"] = True # 3. Vacuum old files spark.sql(f"VACUUM {table_name} RETAIN {vacuum_hours} HOURS") results["vacuumed"] = True # 4. Get table metrics detail = spark.sql(f"DESCRIBE DETAIL {table_name}").first() results["metrics"] = { "num_files": detail.numFiles, "size_bytes": detail.sizeInBytes, "partitions": detail.partitionColumns, } return results def enable_liquid_clustering( spark: SparkSession, table_name: str, cluster_columns: list[str], ) -> None: """ Enable Liquid Clustering for automatic data layout optimization. Liquid Clustering replaces traditional partitioning and Z-ordering with automatic, incremental clustering. """ columns = ", ".join(cluster_columns) spark.sql(f""" ALTER TABLE {table_name} CLUSTER BY ({columns}) """) def enable_predictive_optimization( spark: SparkSession, table_name: str, ) -> None: """Enable Databricks Predictive Optimization.""" spark.sql(f""" ALTER TABLE {table_name} SET TBLPROPERTIES ( 'delta.enableDeletionVectors' = 'true', 'delta.targetFileSize' = '104857600' ) """) ``` ### Step 4: Query Performance Analysis ```sql -- Find slow queries from query history SELECT query_id, query_text, duration / 1000 as seconds, rows_produced, bytes_read, start_time FROM system.query.history WHERE duration > 60000 -- > 60 seconds AND start_time > current_timestamp() - INTERVAL 24 HOURS ORDER BY duration DESC LIMIT 20; -- Analyze query plan EXPLAIN FORMATTED SELECT * FROM main.silver.orders WHERE order_date > '2024-01-01' AND region = 'US'; -- Check table scan statistics SELECT table_name, SUM(bytes_read) / 1024 / 1024 / 1024 as gb_read, SUM(rows_produced) as total_rows, COUNT(*) as query_count FROM system.query.history WHERE start_time > current_timestamp() - INTERVAL 7 DAYS GROUP BY table_name ORDER BY gb_read DESC; ``` ### Step 5: Caching Strategy ```python # Intelligent caching for repeated queries from pyspark.sql import DataFrame from functools import lru_cache class CacheManager: """Manage Spark DataFrame caching.""" def __init__(self, spark: SparkSession): self.spark = spark self._cache_registry = {} def cache_table( self, table_name: str, cache_level: str = "MEMORY_AND_DISK", ) -> DataFrame: """Cache table with specified storage level.""" if table_name in self._cache_registry: return self._cache_registry[table_name] df = self.spark.table(table_name) if cache_level == "MEMORY_ONLY": df.cache() elif cache_level == "MEMORY_AND_DISK": from pyspark import StorageLevel df.persist(StorageLevel.MEMORY_AND_DISK) elif cache_level == "DISK_ONLY": from pyspark import StorageLevel df.persist(StorageLevel.DISK_ONLY) # Trigger caching df.count() self._cache_registry[table_name] = df return df def uncache_all(self): """Clear all cached DataFrames.""" for df in self._cache_registry.values(): df.unpersist() self._cache_registry.clear() self.spark.catalog.clearCache() # Delta Cache (automatic) # Enable in cluster config: # "spark.databricks.io.cache.enabled": "true" # "spark.databricks.io.cache.maxDiskUsage": "50g" ``` ### Step 6: Join Optimization ```python from pyspark.sql import DataFrame from pyspark.sql.functions import broadcast def optimize_join( df_large: DataFrame, df_small: DataFrame, join_key: str, small_table_threshold_mb: int = 100, ) -> DataFrame: """ Optimize join based on table sizes. Uses broadcast join for small tables, sort-merge join for large tables. """ # Estimate small table size small_size_mb = df_small.count() * 100 / 1024 / 1024 # rough estimate if small_size_mb < small_table_threshold_mb: # Broadcast join (small table fits in memory) return df_large.join(broadcast(df_small), join_key) else: # Sort-merge join with bucketing hint return df_large.join(df_small, join_key, "inner") # Bucketed tables for frequent joins def create_bucketed_table( spark: SparkSession, df: DataFrame, table_name: str, bucket_columns: list[str], num_buckets: int = 100, ): """Create bucketed table for join optimization.""" ( df.write .bucketBy(num_buckets, *bucket_columns) .sortBy(*bucket_columns) .saveAsTable(table_name) ) ``` ## Output - Optimized cluster configuration - Tuned Spark settings - Optimized Delta tables - Improved query performance ## Error Handling | Issue | Cause | Solution | |-------|-------|----------| | OOM errors | Insufficient memory | Increase executor memory or reduce partition size | | Skewed data | Uneven distribution | Use salting or AQE skew handling | | Slow joins | Large shuffle | Use broadcast for small tables | | Too many files | Small files problem | Run OPTIMIZE regularly | ## Examples ### Performance Benchmark ```python import time def benchmark_query(spark, query: str, runs: int = 3) -> dict: """Benchmark query execution time.""" times = [] for _ in range(runs): spark.catalog.clearCache() start = time.time() spark.sql(query).collect() times.append(time.time() - start) return { "min": min(times), "max": max(times), "avg": sum(times) / len(times), "runs": runs, } ``` ## Resources - [Databricks Performance Guide](https://docs.databricks.com/optimizations/index.html) - [Delta Lake Optimization](https://docs.databricks.com/delta/optimizations/index.html) - [Spark Tuning Guide](https://spark.apache.org/docs/latest/tuning.html) ## Next Steps For cost optimization, see `databricks-cost-tuning`.

Skill file: plugins/saas-packs/databricks-pack/skills/databricks-performance-tuning/SKILL.md