๐Ÿ“… let's chat! explore the endless possibilities creating industries that don't exist. click here

databricks-migration-deep-dive

Execute comprehensive platform migrations to Databricks from legacy systems. Use when migrating from on-premises Hadoop, other cloud platforms, or legacy data warehouses to Databricks. Trigger with phrases like "migrate to databricks", "hadoop migration", "snowflake to databricks", "legacy migration", "data warehouse migration". allowed-tools: Read, Write, Edit, Bash(databricks:*), Grep version: 1.0.0 license: MIT author: Jeremy Longshore <jeremy@intentsolutions.io>

Allowed Tools

No tools specified

Provided by Plugin

databricks-pack

Claude Code skill pack for Databricks (24 skills)

saas packs v1.0.0
View Plugin

Installation

This skill is included in the databricks-pack plugin:

/plugin install databricks-pack@claude-code-plugins-plus

Click to copy

Instructions

# Databricks Migration Deep Dive ## Overview Comprehensive migration strategies for moving to Databricks from legacy systems. ## Prerequisites - Access to source and target systems - Understanding of current data architecture - Migration timeline and requirements - Stakeholder alignment ## Migration Patterns | Source System | Migration Pattern | Complexity | Timeline | |--------------|-------------------|------------|----------| | On-prem Hadoop | Lift-and-shift + modernize | High | 6-12 months | | Snowflake | Parallel run + cutover | Medium | 3-6 months | | AWS Redshift | ETL rewrite + data copy | Medium | 3-6 months | | Azure Synapse | Delta Lake conversion | Low | 1-3 months | | Legacy DW (Oracle/Teradata) | Full rebuild | High | 12-18 months | ## Instructions ### Step 1: Discovery and Assessment ```python # scripts/migration_assessment.py from dataclasses import dataclass from typing import List, Dict import pandas as pd @dataclass class SourceTableInfo: """Information about source table for migration planning.""" database: str schema: str table: str row_count: int size_gb: float column_count: int partition_columns: List[str] dependencies: List[str] access_frequency: str # high, medium, low data_classification: str # pii, confidential, public def assess_hadoop_cluster(spark, hive_metastore: str) -> List[SourceTableInfo]: """ Assess Hadoop/Hive cluster for migration planning. Returns inventory of all tables with metadata. """ tables = [] # Get all databases databases = spark.sql("SHOW DATABASES").collect() for db_row in databases: db = db_row.databaseName if db in ['default', 'sys']: # Skip system DBs continue # Get tables in database spark.sql(f"USE {db}") table_list = spark.sql("SHOW TABLES").collect() for table_row in table_list: table_name = table_row.tableName try: # Get table details desc = spark.sql(f"DESCRIBE EXTENDED {db}.{table_name}") detail_df = desc.toPandas() # Extract partition info partition_cols = [] in_partition_section = False for _, row in detail_df.iterrows(): if row['col_name'] == '# Partition Information': in_partition_section = True elif in_partition_section and row['col_name'] and not row['col_name'].startswith('#'): partition_cols.append(row['col_name']) # Get row count and size stats = spark.sql(f"DESCRIBE EXTENDED {db}.{table_name}") # Parse statistics from DESCRIBE output tables.append(SourceTableInfo( database=db, schema=db, table=table_name, row_count=0, # Would query actual count size_gb=0, # From statistics column_count=len(detail_df), partition_columns=partition_cols, dependencies=[], # Would analyze queries access_frequency='medium', data_classification='internal', )) except Exception as e: print(f"Error processing {db}.{table_name}: {e}") return tables def generate_migration_plan(tables: List[SourceTableInfo]) -> pd.DataFrame: """Generate migration plan with prioritization.""" plan_data = [] for table in tables: # Calculate complexity score complexity = 0 complexity += 1 if table.size_gb > 100 else 0 complexity += 1 if len(table.partition_columns) > 2 else 0 complexity += 1 if len(table.dependencies) > 5 else 0 complexity += 2 if table.data_classification == 'pii' else 0 # Calculate priority priority = 0 priority += 3 if table.access_frequency == 'high' else 1 priority += 2 if table.data_classification == 'pii' else 0 plan_data.append({ 'source_table': f"{table.database}.{table.table}", 'target_table': f"migrated.{table.schema}.{table.table}", 'size_gb': table.size_gb, 'complexity_score': complexity, 'priority_score': priority, 'estimated_hours': max(1, table.size_gb / 10 + complexity * 2), 'migration_wave': 1 if priority > 3 else (2 if priority > 1 else 3), }) return pd.DataFrame(plan_data).sort_values(['migration_wave', 'priority_score'], ascending=[True, False]) ``` ### Step 2: Schema Migration ```python # scripts/schema_migration.py from pyspark.sql import SparkSession from pyspark.sql.types import * def convert_hive_to_delta_schema(spark: SparkSession, hive_table: str) -> StructType: """ Convert Hive table schema to Delta Lake compatible schema. Handles type conversions and incompatibilities. """ hive_schema = spark.table(hive_table).schema # Type mappings for problematic types type_conversions = { 'decimal(38,0)': DecimalType(38, 10), # Add scale 'char': StringType(), 'varchar': StringType(), 'tinyint': IntegerType(), # Delta doesn't have tinyint } new_fields = [] for field in hive_schema.fields: new_type = field.dataType type_str = str(field.dataType).lower() for pattern, replacement in type_conversions.items(): if pattern in type_str: new_type = replacement break new_fields.append(StructField( field.name, new_type, field.nullable, field.metadata )) return StructType(new_fields) def migrate_table_schema( spark: SparkSession, source_table: str, target_table: str, catalog: str = "migrated", ) -> dict: """ Migrate table schema from Hive to Delta Lake. Returns migration result with any schema changes. """ # Get source schema source_df = spark.table(source_table) source_schema = source_df.schema # Convert schema target_schema = convert_hive_to_delta_schema(spark, source_table) # Create target table schema_ddl = ", ".join([ f"`{f.name}` {f.dataType.simpleString()}" for f in target_schema.fields ]) spark.sql(f""" CREATE TABLE IF NOT EXISTS {catalog}.{target_table} ( {schema_ddl} ) USING DELTA TBLPROPERTIES ( 'delta.autoOptimize.optimizeWrite' = 'true', 'delta.autoOptimize.autoCompact' = 'true' ) """) # Track schema changes changes = [] for i, (src, tgt) in enumerate(zip(source_schema.fields, target_schema.fields)): if str(src.dataType) != str(tgt.dataType): changes.append({ 'column': src.name, 'source_type': str(src.dataType), 'target_type': str(tgt.dataType), }) return { 'source_table': source_table, 'target_table': f"{catalog}.{target_table}", 'column_count': len(target_schema.fields), 'schema_changes': changes, } ``` ### Step 3: Data Migration ```python # scripts/data_migration.py from pyspark.sql import SparkSession, DataFrame from datetime import datetime import time class DataMigrator: """Handle data migration from legacy systems to Delta Lake.""" def __init__(self, spark: SparkSession, target_catalog: str): self.spark = spark self.target_catalog = target_catalog def migrate_table( self, source_table: str, target_table: str, batch_size: int = 1000000, partition_columns: list[str] = None, incremental_column: str = None, ) -> dict: """ Migrate table data with batching and checkpointing. Args: source_table: Source table (Hive, JDBC, etc.) target_table: Target Delta table batch_size: Rows per batch partition_columns: Columns for partitioning incremental_column: Column for incremental loads Returns: Migration statistics """ start_time = time.time() stats = { 'source_table': source_table, 'target_table': f"{self.target_catalog}.{target_table}", 'batches': 0, 'total_rows': 0, 'errors': [], } # Get source data source_df = self.spark.table(source_table) # For large tables, process in batches by partition if partition_columns and len(partition_columns) > 0: # Get distinct partition values partitions = source_df.select(partition_columns).distinct().collect() for partition_row in partitions: # Build filter condition conditions = [ f"{col} = '{partition_row[col]}'" for col in partition_columns ] filter_expr = " AND ".join(conditions) batch_df = source_df.filter(filter_expr) self._write_batch(batch_df, target_table, partition_columns) stats['batches'] += 1 stats['total_rows'] += batch_df.count() else: # Single batch for small tables self._write_batch(source_df, target_table, partition_columns) stats['batches'] = 1 stats['total_rows'] = source_df.count() stats['duration_seconds'] = time.time() - start_time stats['rows_per_second'] = stats['total_rows'] / stats['duration_seconds'] return stats def _write_batch( self, df: DataFrame, target_table: str, partition_columns: list[str] = None, ): """Write a batch to Delta table.""" writer = df.write.format("delta").mode("append") if partition_columns: writer = writer.partitionBy(*partition_columns) writer.saveAsTable(f"{self.target_catalog}.{target_table}") def validate_migration( self, source_table: str, target_table: str, ) -> dict: """Validate migrated data matches source.""" source_df = self.spark.table(source_table) target_df = self.spark.table(f"{self.target_catalog}.{target_table}") validation = { 'source_count': source_df.count(), 'target_count': target_df.count(), 'count_match': False, 'schema_match': False, 'sample_match': False, } # Count validation validation['count_match'] = ( validation['source_count'] == validation['target_count'] ) # Schema validation (column names) source_cols = set(source_df.columns) target_cols = set(target_df.columns) validation['schema_match'] = source_cols == target_cols # Sample data validation source_sample = source_df.limit(100).toPandas() target_sample = target_df.limit(100).toPandas() # Compare samples (simplified) validation['sample_match'] = len(source_sample) == len(target_sample) return validation ``` ### Step 4: ETL/Pipeline Migration ```python # scripts/pipeline_migration.py def convert_spark_job_to_databricks( source_code: str, source_type: str = "spark-submit", ) -> str: """ Convert legacy Spark job to Databricks job. Handles common patterns from spark-submit, Oozie, Airflow. """ # Common replacements replacements = { # SparkSession changes 'SparkSession.builder.master("yarn")': 'SparkSession.builder', '.master("local[*]")': '', # Path changes 'hdfs://namenode:8020/': '/mnt/data/', 's3a://': 's3://', # Hive changes '.enableHiveSupport()': '', # Unity Catalog handles this 'hive_metastore.': '', # Direct table access # Config changes '.config("spark.sql.warehouse.dir"': '# Removed: .config("spark.sql.warehouse.dir"', } converted = source_code for old, new in replacements.items(): converted = converted.replace(old, new) # Add Databricks-specific imports header = ''' # Converted for Databricks # Original source: {source_type} # Conversion date: {date} from pyspark.sql import SparkSession # SparkSession is pre-configured in Databricks spark = SparkSession.builder.getOrCreate() '''.format(source_type=source_type, date=datetime.now().isoformat()) return header + converted # Convert Oozie workflow to Databricks job def convert_oozie_to_databricks_job(oozie_xml: str) -> dict: """Convert Oozie workflow XML to Databricks job definition.""" import xml.etree.ElementTree as ET root = ET.fromstring(oozie_xml) tasks = [] for action in root.findall('.//action'): action_name = action.get('name') # Handle different action types spark_action = action.find('spark') if spark_action is not None: jar = spark_action.find('jar').text main_class = spark_action.find('class').text tasks.append({ 'task_key': action_name, 'spark_jar_task': { 'main_class_name': main_class, 'parameters': [], }, 'libraries': [{'jar': jar}], }) shell_action = action.find('shell') if shell_action is not None: # Convert to notebook task or skip pass # Build job definition job_definition = { 'name': f"migrated-{root.get('name')}", 'tasks': tasks, 'job_clusters': [{ 'job_cluster_key': 'migration_cluster', 'new_cluster': { 'spark_version': '14.3.x-scala2.12', 'node_type_id': 'Standard_DS3_v2', 'num_workers': 2, } }], } return job_definition ``` ### Step 5: Cutover Planning ```python # scripts/cutover_plan.py from dataclasses import dataclass from datetime import datetime, timedelta from typing import List @dataclass class CutoverStep: """Individual step in cutover plan.""" order: int name: str duration_minutes: int owner: str rollback_procedure: str verification: str def generate_cutover_plan( migration_wave: int, tables: List[str], cutover_date: datetime, ) -> List[CutoverStep]: """Generate detailed cutover plan.""" steps = [ CutoverStep( order=1, name="Pre-cutover validation", duration_minutes=60, owner="Data Engineer", rollback_procedure="N/A - no changes made", verification="Run validation queries on all tables", ), CutoverStep( order=2, name="Disable source data pipelines", duration_minutes=15, owner="Platform Admin", rollback_procedure="Re-enable pipelines in source system", verification="Verify no new data in source", ), CutoverStep( order=3, name="Final incremental sync", duration_minutes=120, owner="Data Engineer", rollback_procedure="N/A", verification="Row counts match source", ), CutoverStep( order=4, name="Enable Databricks pipelines", duration_minutes=30, owner="Data Engineer", rollback_procedure="Disable Databricks pipelines, re-enable source", verification="Jobs running successfully", ), CutoverStep( order=5, name="Update downstream applications", duration_minutes=60, owner="Application Team", rollback_procedure="Revert connection strings", verification="Applications reading from Databricks", ), CutoverStep( order=6, name="Monitor and validate", duration_minutes=240, owner="Data Engineer", rollback_procedure="Execute full rollback if issues", verification="All metrics within acceptable range", ), ] # Calculate timings current_time = cutover_date for step in steps: step.start_time = current_time step.end_time = current_time + timedelta(minutes=step.duration_minutes) current_time = step.end_time return steps ``` ## Output - Migration assessment complete - Schema migration automated - Data migration pipeline ready - ETL conversion scripts - Cutover plan documented ## Error Handling | Issue | Cause | Solution | |-------|-------|----------| | Schema incompatibility | Unsupported types | Use type conversion mappings | | Data loss | Truncation | Validate counts at each step | | Performance issues | Large tables | Use partitioned migration | | Dependency conflicts | Wrong migration order | Analyze dependencies first | ## Examples ### Quick Migration Validation ```sql -- Compare source and target counts SELECT 'source' as system, COUNT(*) as row_count FROM hive_metastore.db.table UNION ALL SELECT 'target' as system, COUNT(*) as row_count FROM migrated.db.table; ``` ## Resources - [Databricks Migration Guide](https://docs.databricks.com/migration/index.html) - [Delta Lake Migration](https://docs.databricks.com/delta/migration.html) - [Unity Catalog Migration](https://docs.databricks.com/data-governance/unity-catalog/migrate.html) ## Completion This skill pack provides comprehensive coverage for Databricks platform operations.

Skill file: plugins/saas-packs/databricks-pack/skills/databricks-migration-deep-dive/SKILL.md