๐Ÿ“… let's chat! explore the endless possibilities creating industries that don't exist. click here

databricks-observability

Set up comprehensive observability for Databricks with metrics, traces, and alerts. Use when implementing monitoring for Databricks jobs, setting up dashboards, or configuring alerting for pipeline health. Trigger with phrases like "databricks monitoring", "databricks metrics", "databricks observability", "monitor databricks", "databricks alerts", "databricks logging". allowed-tools: Read, Write, Edit, Bash(databricks:*) version: 1.0.0 license: MIT author: Jeremy Longshore <jeremy@intentsolutions.io>

Allowed Tools

No tools specified

Provided by Plugin

databricks-pack

Claude Code skill pack for Databricks (24 skills)

saas packs v1.0.0
View Plugin

Installation

This skill is included in the databricks-pack plugin:

/plugin install databricks-pack@claude-code-plugins-plus

Click to copy

Instructions

# Databricks Observability ## Overview Set up comprehensive observability for Databricks workloads. ## Prerequisites - Access to system tables - SQL Warehouse for dashboards - Notification destinations configured - Alert recipients defined ## Metrics Collection ### Key Metrics | Metric | Source | Description | |--------|--------|-------------| | Job success rate | system.lakeflow.job_run_timeline | % of successful job runs | | Job duration | system.lakeflow.job_run_timeline | Run time in minutes | | Cluster utilization | system.compute.cluster_events | CPU/memory usage | | Data freshness | table history | Hours since last update | | DBU consumption | system.billing.usage | Cost tracking | ## Instructions ### Step 1: System Tables Access ```sql -- Enable system tables (workspace admin) -- Check available system tables SELECT * FROM system.information_schema.tables WHERE table_schema = 'billing' OR table_schema = 'lakeflow'; -- Job run history SELECT job_id, job_name, run_id, start_time, end_time, result_state, error_message, (end_time - start_time) / 60000 as duration_minutes FROM system.lakeflow.job_run_timeline WHERE start_time > current_timestamp() - INTERVAL 24 HOURS ORDER BY start_time DESC; -- Cluster events SELECT cluster_id, timestamp, type, details FROM system.compute.cluster_events WHERE timestamp > current_timestamp() - INTERVAL 24 HOURS ORDER BY timestamp DESC; ``` ### Step 2: Create Monitoring Views ```sql -- Job health summary view CREATE OR REPLACE VIEW monitoring.job_health_summary AS SELECT job_name, COUNT(*) as total_runs, SUM(CASE WHEN result_state = 'SUCCESS' THEN 1 ELSE 0 END) as successes, SUM(CASE WHEN result_state = 'FAILED' THEN 1 ELSE 0 END) as failures, ROUND(SUM(CASE WHEN result_state = 'SUCCESS' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as success_rate, AVG((end_time - start_time) / 60000) as avg_duration_minutes, PERCENTILE((end_time - start_time) / 60000, 0.95) as p95_duration_minutes, MAX(start_time) as last_run_time, MAX(CASE WHEN result_state = 'FAILED' THEN start_time END) as last_failure_time FROM system.lakeflow.job_run_timeline WHERE start_time > current_timestamp() - INTERVAL 7 DAYS GROUP BY job_name; -- Data freshness view CREATE OR REPLACE VIEW monitoring.data_freshness AS SELECT table_catalog, table_schema, table_name, MAX(commit_timestamp) as last_update, TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) as hours_since_update, CASE WHEN TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) < 1 THEN 'FRESH' WHEN TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) < 6 THEN 'RECENT' WHEN TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) < 24 THEN 'STALE' ELSE 'VERY_STALE' END as freshness_status FROM system.information_schema.table_history GROUP BY table_catalog, table_schema, table_name; -- Cost tracking view CREATE OR REPLACE VIEW monitoring.daily_costs AS SELECT DATE(usage_date) as date, workspace_id, sku_name, usage_type, SUM(usage_quantity) as total_dbus, SUM(usage_quantity * list_price) as estimated_cost FROM system.billing.usage WHERE usage_date > current_date() - INTERVAL 30 DAYS GROUP BY DATE(usage_date), workspace_id, sku_name, usage_type ORDER BY date DESC, estimated_cost DESC; ``` ### Step 3: Configure Alerts ```sql -- Alert: Job failure CREATE ALERT job_failure_alert AS SELECT job_name, run_id, error_message, start_time FROM system.lakeflow.job_run_timeline WHERE result_state = 'FAILED' AND start_time > current_timestamp() - INTERVAL 15 MINUTES SCHEDULE CRON '*/15 * * * *' NOTIFICATIONS ( email_addresses = ['oncall@company.com'], webhook_destinations = ['slack-alerts'] ); -- Alert: Long-running jobs CREATE ALERT long_running_job_alert AS SELECT job_name, run_id, start_time, TIMESTAMPDIFF(MINUTE, start_time, current_timestamp()) as running_minutes FROM system.lakeflow.job_run_timeline WHERE end_time IS NULL AND TIMESTAMPDIFF(MINUTE, start_time, current_timestamp()) > 120 SCHEDULE CRON '*/30 * * * *' NOTIFICATIONS ( email_addresses = ['oncall@company.com'] ); -- Alert: Data freshness SLA breach CREATE ALERT data_freshness_sla AS SELECT table_name, hours_since_update FROM monitoring.data_freshness WHERE table_schema = 'gold' AND hours_since_update > 6 SCHEDULE CRON '0 * * * *' NOTIFICATIONS ( email_addresses = ['data-team@company.com'] ); -- Alert: Cost spike CREATE ALERT daily_cost_spike AS SELECT date, estimated_cost, LAG(estimated_cost) OVER (ORDER BY date) as prev_day_cost, (estimated_cost - LAG(estimated_cost) OVER (ORDER BY date)) / NULLIF(LAG(estimated_cost) OVER (ORDER BY date), 0) * 100 as percent_change FROM monitoring.daily_costs WHERE date = current_date() - 1 HAVING percent_change > 50 -- 50% increase SCHEDULE CRON '0 8 * * *' NOTIFICATIONS ( email_addresses = ['finops@company.com'] ); ``` ### Step 4: Structured Logging ```python # src/utils/logging.py import logging import json from datetime import datetime from typing import Any class StructuredLogger: """Structured logging for Databricks notebooks.""" def __init__(self, job_name: str, run_id: str = None): self.job_name = job_name self.run_id = run_id or str(datetime.now().timestamp()) self.logger = logging.getLogger(job_name) self.logger.setLevel(logging.INFO) # JSON formatter handler = logging.StreamHandler() handler.setFormatter(JsonFormatter()) self.logger.addHandler(handler) def _log(self, level: str, message: str, **context): """Log with structured context.""" log_entry = { "timestamp": datetime.utcnow().isoformat(), "job_name": self.job_name, "run_id": self.run_id, "level": level, "message": message, **context } getattr(self.logger, level.lower())(json.dumps(log_entry)) def info(self, message: str, **context): self._log("INFO", message, **context) def error(self, message: str, **context): self._log("ERROR", message, **context) def metric(self, name: str, value: Any, **tags): """Log a metric for monitoring.""" self._log("METRIC", f"{name}={value}", metric_name=name, metric_value=value, **tags) class JsonFormatter(logging.Formatter): """JSON log formatter.""" def format(self, record): return record.getMessage() # Usage in notebooks logger = StructuredLogger("etl-pipeline", dbutils.notebook.entry_point.getDbutils().notebook().getContext().runId()) logger.info("Starting bronze ingestion", source="s3://bucket/raw") logger.metric("rows_processed", 10000, table="orders") ``` ### Step 5: Custom Metrics Dashboard ```python # src/monitoring/dashboard.py from databricks.sdk import WorkspaceClient from databricks.sdk.service.sql import ( Dashboard, Widget, Query, ) def create_monitoring_dashboard(w: WorkspaceClient) -> str: """Create operational monitoring dashboard.""" # Create dashboard dashboard = w.dashboards.create( name="Data Platform Monitoring", tags=["monitoring", "operations"], ) # Job Success Rate Widget job_success_query = """ SELECT DATE(start_time) as date, job_name, ROUND(SUM(CASE WHEN result_state = 'SUCCESS' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as success_rate FROM system.lakeflow.job_run_timeline WHERE start_time > current_timestamp() - INTERVAL 7 DAYS GROUP BY DATE(start_time), job_name ORDER BY date, job_name """ # Add widgets... # (Dashboard API implementation) return dashboard.id # Generate Grafana dashboard JSON def generate_grafana_dashboard() -> dict: """Generate Grafana dashboard configuration.""" return { "dashboard": { "title": "Databricks Monitoring", "panels": [ { "title": "Job Success Rate", "type": "timeseries", "targets": [{ "rawSql": """ SELECT start_time as time, success_rate FROM monitoring.job_health_summary """ }] }, { "title": "Daily DBU Usage", "type": "bargauge", "targets": [{ "rawSql": """ SELECT date, total_dbus FROM monitoring.daily_costs WHERE date > current_date() - 7 """ }] } ] } } ``` ### Step 6: Integration with External Monitoring ```python # src/monitoring/external.py import requests from dataclasses import dataclass @dataclass class MetricPoint: name: str value: float tags: dict timestamp: int = None class DatadogExporter: """Export metrics to Datadog.""" def __init__(self, api_key: str, app_key: str): self.api_key = api_key self.app_key = app_key self.base_url = "https://api.datadoghq.com/api/v2" def send_metrics(self, metrics: list[MetricPoint]): """Send metrics to Datadog.""" series = [] for m in metrics: series.append({ "metric": f"databricks.{m.name}", "points": [[m.timestamp or int(time.time()), m.value]], "tags": [f"{k}:{v}" for k, v in m.tags.items()] }) response = requests.post( f"{self.base_url}/series", headers={ "DD-API-KEY": self.api_key, "DD-APPLICATION-KEY": self.app_key, }, json={"series": series} ) return response.status_code == 202 # Usage exporter = DatadogExporter(api_key, app_key) exporter.send_metrics([ MetricPoint("job.duration_minutes", 45.2, {"job": "etl-pipeline", "env": "prod"}), MetricPoint("job.rows_processed", 1000000, {"job": "etl-pipeline", "env": "prod"}), ]) ``` ## Output - System table queries configured - Monitoring views created - SQL alerts active - Structured logging implemented - External integrations ready ## Error Handling | Issue | Cause | Solution | |-------|-------|----------| | System tables unavailable | Feature not enabled | Contact admin to enable | | Alert not triggering | Wrong schedule | Check cron expression | | Missing metrics | Query timeout | Optimize query or increase warehouse | | High cardinality | Too many tags | Reduce label dimensions | ## Examples ### Quick Health Check Query ```sql SELECT job_name, success_rate, avg_duration_minutes, last_run_time FROM monitoring.job_health_summary WHERE success_rate < 95 ORDER BY success_rate ASC; ``` ## Resources - [Databricks System Tables](https://docs.databricks.com/administration-guide/system-tables/index.html) - [SQL Alerts](https://docs.databricks.com/sql/user/alerts/index.html) - [Dashboards](https://docs.databricks.com/sql/user/dashboards/index.html) ## Next Steps For incident response, see `databricks-incident-runbook`.

Skill file: plugins/saas-packs/databricks-pack/skills/databricks-observability/SKILL.md