๐Ÿ“… let's chat! explore the endless possibilities creating industries that don't exist. click here

databricks-data-handling

Implement Delta Lake data management patterns including GDPR, PII handling, and data lifecycle. Use when implementing data retention, handling GDPR requests, or managing data lifecycle in Delta Lake. Trigger with phrases like "databricks GDPR", "databricks PII", "databricks data retention", "databricks data lifecycle", "delete user data". allowed-tools: Read, Write, Edit, Bash(databricks:*) version: 1.0.0 license: MIT author: Jeremy Longshore <jeremy@intentsolutions.io>

Allowed Tools

No tools specified

Provided by Plugin

databricks-pack

Claude Code skill pack for Databricks (24 skills)

saas packs v1.0.0
View Plugin

Installation

This skill is included in the databricks-pack plugin:

/plugin install databricks-pack@claude-code-plugins-plus

Click to copy

Instructions

# Databricks Data Handling ## Overview Implement data management patterns for compliance, privacy, and lifecycle in Delta Lake. ## Prerequisites - Unity Catalog configured - Understanding of Delta Lake features - Compliance requirements documented - Data classification in place ## Instructions ### Step 1: Data Classification and Tagging ```sql -- Tag tables with data classification ALTER TABLE catalog.schema.customers SET TAGS ('data_classification' = 'PII', 'retention_days' = '365'); ALTER TABLE catalog.schema.orders SET TAGS ('data_classification' = 'CONFIDENTIAL', 'retention_days' = '2555'); ALTER TABLE catalog.schema.analytics_events SET TAGS ('data_classification' = 'INTERNAL', 'retention_days' = '90'); -- Tag columns with sensitivity ALTER TABLE catalog.schema.customers ALTER COLUMN email SET TAGS ('pii' = 'true', 'pii_type' = 'email'); ALTER TABLE catalog.schema.customers ALTER COLUMN phone SET TAGS ('pii' = 'true', 'pii_type' = 'phone'); -- Query classified data SELECT table_catalog, table_schema, table_name, tag_name, tag_value FROM system.information_schema.table_tags WHERE tag_name = 'data_classification'; ``` ### Step 2: GDPR Right to Deletion (RTBF) ```python # src/compliance/gdpr.py from pyspark.sql import SparkSession from delta.tables import DeltaTable from datetime import datetime import logging logger = logging.getLogger(__name__) class GDPRHandler: """Handle GDPR data subject requests.""" def __init__(self, spark: SparkSession, catalog: str): self.spark = spark self.catalog = catalog def process_deletion_request( self, user_id: str, request_id: str, dry_run: bool = True, ) -> dict: """ Process GDPR deletion request for a user. Args: user_id: User identifier to delete request_id: GDPR request tracking ID dry_run: If True, only report what would be deleted Returns: Deletion report """ report = { "request_id": request_id, "user_id": user_id, "timestamp": datetime.utcnow().isoformat(), "dry_run": dry_run, "tables_processed": [], "total_rows_deleted": 0, } # Find all tables with PII tag pii_tables = self._get_pii_tables() for table_info in pii_tables: table_name = f"{table_info['catalog']}.{table_info['schema']}.{table_info['table']}" user_column = self._get_user_column(table_name) if not user_column: continue # Count rows to be deleted count_query = f"SELECT COUNT(*) FROM {table_name} WHERE {user_column} = '{user_id}'" row_count = self.spark.sql(count_query).first()[0] if row_count > 0: table_report = { "table": table_name, "rows_to_delete": row_count, "deleted": False, } if not dry_run: # Delete using Delta Lake DELETE self.spark.sql(f""" DELETE FROM {table_name} WHERE {user_column} = '{user_id}' """) table_report["deleted"] = True # Log deletion for audit self._log_deletion(request_id, table_name, user_id, row_count) report["tables_processed"].append(table_report) report["total_rows_deleted"] += row_count return report def _get_pii_tables(self) -> list[dict]: """Get all tables tagged as containing PII.""" query = f""" SELECT DISTINCT table_catalog as catalog, table_schema as schema, table_name as table FROM {self.catalog}.information_schema.table_tags WHERE tag_name = 'data_classification' AND tag_value = 'PII' """ return [row.asDict() for row in self.spark.sql(query).collect()] def _get_user_column(self, table_name: str) -> str: """Determine user identifier column for table.""" # Check for common user ID columns columns = [c.name for c in self.spark.table(table_name).schema] user_columns = ['user_id', 'customer_id', 'account_id', 'member_id'] for uc in user_columns: if uc in columns: return uc return None def _log_deletion( self, request_id: str, table_name: str, user_id: str, row_count: int ): """Log deletion for compliance audit.""" self.spark.sql(f""" INSERT INTO {self.catalog}.compliance.deletion_log VALUES ( '{request_id}', '{table_name}', '{user_id}', {row_count}, current_timestamp() ) """) # Usage gdpr = GDPRHandler(spark, "prod_catalog") report = gdpr.process_deletion_request( user_id="user-12345", request_id="GDPR-2024-001", dry_run=True # Set to False to actually delete ) print(report) ``` ### Step 3: Data Retention Policies ```python # src/compliance/retention.py from pyspark.sql import SparkSession from datetime import datetime, timedelta class DataRetentionManager: """Manage data retention and cleanup.""" def __init__(self, spark: SparkSession, catalog: str): self.spark = spark self.catalog = catalog def apply_retention_policies(self, dry_run: bool = True) -> list[dict]: """ Apply retention policies based on table tags. Returns: List of tables processed with row counts """ results = [] # Get tables with retention tags tables = self.spark.sql(f""" SELECT table_catalog, table_schema, table_name, CAST(tag_value AS INT) as retention_days FROM {self.catalog}.information_schema.table_tags WHERE tag_name = 'retention_days' """).collect() for table in tables: full_name = f"{table.table_catalog}.{table.table_schema}.{table.table_name}" cutoff_date = datetime.now() - timedelta(days=table.retention_days) # Find date column date_col = self._get_date_column(full_name) if not date_col: continue # Count rows to delete count = self.spark.sql(f""" SELECT COUNT(*) FROM {full_name} WHERE {date_col} < '{cutoff_date.strftime('%Y-%m-%d')}' """).first()[0] result = { "table": full_name, "retention_days": table.retention_days, "cutoff_date": cutoff_date.strftime('%Y-%m-%d'), "rows_to_delete": count, "deleted": False, } if not dry_run and count > 0: self.spark.sql(f""" DELETE FROM {full_name} WHERE {date_col} < '{cutoff_date.strftime('%Y-%m-%d')}' """) result["deleted"] = True results.append(result) return results def vacuum_tables(self, retention_hours: int = 168) -> list[dict]: """ Vacuum Delta tables to remove old files. Args: retention_hours: Hours of history to retain (default 7 days) """ results = [] # Get all Delta tables tables = self.spark.sql(f""" SELECT table_catalog, table_schema, table_name FROM {self.catalog}.information_schema.tables WHERE table_type = 'MANAGED' """).collect() for table in tables: full_name = f"{table.table_catalog}.{table.table_schema}.{table.table_name}" try: self.spark.sql(f"VACUUM {full_name} RETAIN {retention_hours} HOURS") results.append({"table": full_name, "status": "vacuumed"}) except Exception as e: results.append({"table": full_name, "status": "error", "error": str(e)}) return results def _get_date_column(self, table_name: str) -> str: """Find appropriate date column for retention.""" columns = [c.name for c in self.spark.table(table_name).schema] date_columns = ['created_at', 'event_date', 'timestamp', 'date', 'updated_at'] for dc in date_columns: if dc in columns: return dc return None # Scheduled job for retention def run_daily_retention(spark): """Run as scheduled job.""" manager = DataRetentionManager(spark, "prod_catalog") # Apply retention policies retention_results = manager.apply_retention_policies(dry_run=False) print(f"Retention applied: {len(retention_results)} tables processed") # Vacuum tables vacuum_results = manager.vacuum_tables() print(f"Vacuum completed: {len(vacuum_results)} tables") ``` ### Step 4: PII Masking and Anonymization ```python # src/compliance/masking.py from pyspark.sql import DataFrame from pyspark.sql.functions import ( col, sha2, concat, lit, regexp_replace, when, substring, length ) class PIIMasker: """Mask PII data for analytics and testing.""" @staticmethod def mask_email(df: DataFrame, column: str) -> DataFrame: """Mask email addresses: john.doe@company.com -> j***@***.com""" return df.withColumn( column, concat( substring(col(column), 1, 1), lit("***@***."), regexp_replace(col(column), r".*\.(\w+)$", "$1") ) ) @staticmethod def mask_phone(df: DataFrame, column: str) -> DataFrame: """Mask phone numbers: +1-555-123-4567 -> +1-555-***-****""" return df.withColumn( column, regexp_replace(col(column), r"(\d{3})-(\d{4})$", "***-****") ) @staticmethod def hash_identifier(df: DataFrame, column: str, salt: str = "") -> DataFrame: """Hash identifiers for pseudonymization.""" return df.withColumn( column, sha2(concat(col(column), lit(salt)), 256) ) @staticmethod def mask_name(df: DataFrame, column: str) -> DataFrame: """Mask names: John Smith -> J*** S***""" return df.withColumn( column, regexp_replace(col(column), r"(\w)\w+", "$1***") ) @staticmethod def create_masked_view( spark, source_table: str, view_name: str, masking_rules: dict[str, str], ) -> None: """ Create a view with masked PII columns. Args: spark: SparkSession source_table: Source table name view_name: Name for masked view masking_rules: Dict of {column: masking_type} Types: email, phone, hash, name, redact """ df = spark.table(source_table) for column, mask_type in masking_rules.items(): if mask_type == "email": df = PIIMasker.mask_email(df, column) elif mask_type == "phone": df = PIIMasker.mask_phone(df, column) elif mask_type == "hash": df = PIIMasker.hash_identifier(df, column) elif mask_type == "name": df = PIIMasker.mask_name(df, column) elif mask_type == "redact": df = df.withColumn(column, lit("[REDACTED]")) df.createOrReplaceTempView(view_name) # Usage PIIMasker.create_masked_view( spark, "prod_catalog.customers.users", "masked_users", { "email": "email", "phone": "phone", "full_name": "name", "ssn": "redact", } ) ``` ### Step 5: Row-Level Security ```sql -- Create row filter function CREATE OR REPLACE FUNCTION catalog.security.region_filter(region STRING) RETURNS BOOLEAN RETURN ( -- Allow access if user is admin IS_ACCOUNT_GROUP_MEMBER('data-admins') OR -- Or if region matches user's assigned region region = current_user_attribute('region') ); -- Apply row filter to table ALTER TABLE catalog.schema.sales SET ROW FILTER catalog.security.region_filter ON (region); -- Column masking function CREATE OR REPLACE FUNCTION catalog.security.mask_salary(salary DECIMAL) RETURNS DECIMAL RETURN CASE WHEN IS_ACCOUNT_GROUP_MEMBER('hr-team') THEN salary ELSE NULL END; -- Apply column mask ALTER TABLE catalog.schema.employees ALTER COLUMN salary SET MASK catalog.security.mask_salary; ``` ## Output - Data classification tags applied - GDPR deletion process implemented - Retention policies enforced - PII masking configured - Row-level security enabled ## Error Handling | Issue | Cause | Solution | |-------|-------|----------| | Vacuum fails | Retention too short | Ensure > 7 days retention | | Delete timeout | Large table | Partition deletes over time | | Missing user column | Non-standard schema | Map user columns manually | | Mask function error | Invalid regex | Test masking functions | ## Examples ### GDPR Subject Access Request ```python def generate_sar_report(spark, user_id: str) -> dict: """Generate Subject Access Request report.""" pii_tables = get_pii_tables(spark) report = {"user_id": user_id, "data": {}} for table in pii_tables: user_col = get_user_column(table) if user_col: data = spark.sql(f""" SELECT * FROM {table} WHERE {user_col} = '{user_id}' """).toPandas().to_dict('records') report["data"][table] = data return report ``` ## Resources - [Delta Lake Privacy](https://docs.databricks.com/delta/privacy.html) - [Unity Catalog Security](https://docs.databricks.com/data-governance/unity-catalog/row-level-security.html) - [GDPR Compliance](https://docs.databricks.com/security/privacy/gdpr.html) ## Next Steps For enterprise RBAC, see `databricks-enterprise-rbac`.

Skill file: plugins/saas-packs/databricks-pack/skills/databricks-data-handling/SKILL.md