Skip to content

Extract Generation Package

Home > Packages > Extract Generation

At a Glance

Item Summary
Purpose Generate file extracts (CSV/TSV/DAT/Parquet) from lakehouse/warehouse objects with high-performance processing.
Inputs Extract job and detail configs in config_extract_generation and config_extract_generation_details.
Outputs Files written to OneLake paths; performance metrics in log_extract_generation.
Core commands ingen_fab package extract compile, ingen_fab package extract run.
When to use You need high-performance, parallel, configurable exports with compression, partitioning, and detailed monitoring.

Note: For CLI flags and examples, refer to User Guide → CLI Reference and Deploy Guide.

The Extract Generation package provides high-performance automated file extract capabilities from Microsoft Fabric lakehouses and warehouses. It features dynamic location resolution, parallel processing, comprehensive performance monitoring, and support for multiple output formats.

Overview

The Extract Generation package allows you to: - High-Performance Processing: Pure PySpark implementation with 32x performance improvements over legacy approaches - Parallel Execution: Configurable concurrent extract processing (1-10 parallel jobs) - Multiple Formats: CSV, TSV (tab-delimited), DAT (pipe-delimited), Parquet with proper Spark format mapping - Dynamic Location Resolution: Flexible source/target workspace and datastore configuration - Performance Monitoring: Real-time throughput, memory usage, and execution metrics - Advanced File Options: Compression, single-file forcing, custom delimiters - Comprehensive Logging: Thread-safe execution tracking with detailed performance analytics

Installation & Setup

1. Compile the Package

# Compile extract generation package with sample configurations
ingen_fab package extract compile --include-samples

# Compile for specific target datastore
ingen_fab package extract compile --target-datastore lakehouse
ingen_fab package extract compile --target-datastore warehouse

Note: The package now supports both lakehouse and warehouse deployment modes. For compatibility with synthetic data tables generated by the synthetic_data_generation package, see Extract Generation - Synthetic Data Alignment.

2. Deploy DDL Scripts

The package creates the following configuration tables:

Lakehouse Mode: - config_extract_generation - Extract job definitions with location resolution - config_extract_generation_details - File format specifications with performance options - log_extract_generation - Execution history with performance metrics

Warehouse Mode: - config.config_extract_generation - Extract job definitions - config.config_extract_generation_details - File format specifications
- config.log_extract_generation - Execution history and metrics

3. Configure Extracts

Insert extract configurations into the metadata tables or use the provided sample data that includes configurations for TSV, DAT, CSV, and Parquet formats.

Architecture & Performance

Pure PySpark Implementation

The package has been completely rewritten for optimal performance: - No Pandas Conversions: Direct Spark DataFrame operations for 32x speed improvement - PySpark DataFrame API: Replaced SQL queries with type-safe DataFrame operations - Memory Efficient: Smart partitioning with repartition(1) for single files - Location Resolution: Dynamic workspace and datastore resolution using location_resolver.py pattern

Performance Benchmarks

Dataset Size Legacy Time Optimized Time Speedup
1M rows 26.7s 0.82s 32.6x
10M rows ~4.5 min ~8.2s 33x
100M rows ~45 min ~82s 33x

Parallel Processing

Configure concurrent extract execution:

# Notebook parameter for parallel execution
max_parallel_extracts = 5  # Run up to 5 extracts concurrently

# Default is 1 for backward compatibility
max_parallel_extracts = 1  # Serial execution

Benefits of parallel processing: - Thread-Safe Operations: Concurrent logging and progress tracking - Resource Optimization: Better cluster utilization - Faster Completion: Significant reduction in total execution time - Configurable Concurrency: Control resource usage based on cluster capacity

Configuration

Location Resolution Architecture

The new architecture supports flexible source and target configuration:

Source Configuration (where data comes from)

  • source_workspace_id - Source workspace (optional, uses default if null)
  • source_datastore_id - Source lakehouse/warehouse ID (optional, uses default if null)
  • source_datastore_type - 'lakehouse' or 'warehouse'
  • source_schema_name - Schema name for data access

Target Configuration (where files are written)

  • target_workspace_id - Target workspace (optional, uses default if null)
  • target_datastore_id - Target lakehouse/warehouse ID (optional, uses default if null)
  • target_datastore_type - 'lakehouse' or 'warehouse'
  • target_file_root_path - Root path like 'Files' or 'Tables'

Extract Generation Configuration

Define extract jobs in the main configuration table:

Lakehouse Configuration:

# Sample lakehouse extract configuration
Row(
    extract_name="SAMPLE_CUSTOMERS_EXPORT",
    is_active=True,
    trigger_name=None,
    extract_pipeline_name=None,
    extract_table_name="customers",
    extract_table_schema="default", 
    extract_view_name=None,
    extract_view_schema=None,
    is_full_load=True,
    execution_group="CUSTOMER_EXTRACTS",
    created_date="2024-01-15",
    created_by="system"
)

Warehouse Configuration:

INSERT INTO config.config_extract_generation (
    extract_name,
    extract_description, 
    active_yn,
    extract_type,
    fabric_workspace_id,
    fabric_lakehouse_id,
    fabric_warehouse_id,
    source_type,
    source_schema,
    source_object,
    extract_sql,
    incremental_yn,
    incremental_column,
    incremental_days_back,
    execution_group
) VALUES (
    'CUSTOMER_DAILY_EXPORT',
    'Daily customer data export',
    'Y',
    'TABLE', 
    'workspace-guid',
    NULL,
    'warehouse-guid',
    'TABLE',
    'dbo',
    'customers',
    NULL,
    'N',
    NULL,
    NULL,
    'CUSTOMER_EXTRACTS'
);

Extract Details Configuration

Define file output specifications with enhanced format support:

Enhanced Format Support:

# TSV (Tab-Separated Values) configuration
Row(
    extract_name="SAMPLE_INVENTORY_TSV",
    output_format="tsv",                    # Maps to CSV format in Spark
    file_properties_column_delimiter="\t",  # Tab delimiter
    file_properties_header=True,
    force_single_file=True,                 # Performance optimization
    # Location resolution
    source_workspace_id=None,               # Uses default workspace
    source_datastore_id=None,               # Uses default lakehouse
    source_datastore_type="lakehouse",
    target_workspace_id=None,
    target_datastore_id=None,
    target_datastore_type="lakehouse",
    target_file_root_path="Files"
)

# DAT (Pipe-Delimited) configuration  
Row(
    extract_name="SAMPLE_PRODUCTS_DAT",
    output_format="dat",                    # Maps to CSV format in Spark
    file_properties_column_delimiter="|",   # Pipe delimiter
    file_properties_header=True,
    force_single_file=True,
    # Location resolution
    source_datastore_type="lakehouse",
    target_datastore_type="lakehouse",
    target_file_root_path="Files"
)

Performance Options:

# Force single file output (recommended for small-medium datasets)
force_single_file=True

# File size limits for automatic splitting
file_properties_max_rows_per_file=1000000

# Compression options
is_compressed=True
compressed_type="SNAPPY"           # Fast compression
compressed_level="NORMAL"

Supported File Formats

Core Formats with Spark Mapping

Format Spark Format Delimiter Use Case
CSV csv , (comma) Standard exports, Excel compatibility
TSV csv \t (tab) Text processing, avoiding comma conflicts
DAT csv \| (pipe) Legacy system compatibility
PARQUET parquet N/A Analytics, high-performance queries

Format-Specific Configuration

CSV Format

output_format="csv"
file_properties_column_delimiter=","
file_properties_quote_character='"'
file_properties_escape_character="\\"
file_properties_header=True

TSV Format

output_format="tsv"                          # Mapped to CSV in Spark
file_properties_column_delimiter="\t"        # Tab separator
file_properties_quote_character='"'
file_properties_header=True

DAT Format

output_format="dat"                          # Mapped to CSV in Spark
file_properties_column_delimiter="|"         # Pipe separator
file_properties_quote_character=""           # Often no quoting
file_properties_header=True

Parquet Format

output_format="parquet"
is_compressed=True
compressed_type="SNAPPY"                     # Recommended for Parquet
force_single_file=True                       # Single file optimization

Compression Options

Type Speed Compression Compatibility Use Case
NONE Fastest None Universal Network-fast environments
SNAPPY Fast Good Spark native Analytics, Parquet files
GZIP Medium Better Universal General purpose
ZIP Medium Good Universal Windows compatibility
LZ4 Very Fast Good Modern systems High-throughput scenarios

Usage

Running Extracts

# Run specific extract
ingen_fab package extract run --extract-name SAMPLE_CUSTOMERS_EXPORT

# Run all active extracts (serial)
ingen_fab package extract run 

# Run with parallel processing (if supported by implementation)
ingen_fab package extract run --parallel --max-workers 5

# Run specific execution group
ingen_fab package extract run --execution-group CUSTOMER_EXTRACTS

Extract Types

1. Table Extract (Lakehouse)

Direct export from a lakehouse table using read_table:

extract_table_name="customers"
extract_table_schema="default"
source_datastore_type="lakehouse"

# Uses: source_lakehouse.read_table(table_name="customers", schema_name="default")

2. View Extract (Lakehouse)

Export from a lakehouse view:

extract_view_name="customer_summary"  
extract_view_schema="reporting"
source_datastore_type="lakehouse"

3. Table Extract (Warehouse)

Direct export from warehouse table:

extract_type="TABLE"
source_schema="dbo"
source_object="customers"
source_datastore_type="warehouse"

4. Stored Procedure Extract (Warehouse)

Execute procedure and export results:

extract_type="PROCEDURE"
source_schema="reporting"
source_object="sp_generate_customer_report"

Performance Monitoring & Metrics

Real-Time Performance Tracking

The package now includes comprehensive performance monitoring:

# Performance metrics collected automatically
throughput_rows_per_second=1219512     # Actual processing speed
memory_usage_mb_before=2048            # Memory before processing
memory_usage_mb_after=2156             # Memory after processing
duration_seconds=820                   # Total execution time
rows_extracted=1000000                 # Total rows processed

Performance Helper Functions

Built-in performance tracking:

def get_memory_usage_mb():
    """Get current JVM memory usage in MB"""

def calculate_performance_metrics(start_time, end_time, row_count, memory_before, memory_after):
    """Calculate comprehensive performance metrics"""

def print_performance_summary(metrics_dict):
    """Print formatted performance summary"""

Sample Performance Output

📊 PERFORMANCE SUMMARY 📊
╔══════════════════════════════════════╗
║ Extract: SAMPLE_CUSTOMERS_SNAPSHOT    ║
║ Duration: 0.82 seconds               ║
║ Rows: 1,000,000                      ║
║ Throughput: 1,219,512 rows/second    ║
║ Memory: 2048 MB → 2156 MB (+108 MB)  ║
║ Status: ✅ SUCCESS                    ║  
╚══════════════════════════════════════╝

Monitoring Queries

Query performance metrics:

Lakehouse:

# Read performance logs
performance_df = target_lakehouse.read_table("log_extract_generation")

# Filter recent high-performance runs
high_perf_df = performance_df.filter(
    (F.col("run_status") == "SUCCESS") & 
    (F.col("throughput_rows_per_second") > 1000000)
).select(
    "extract_name", "duration_seconds", "rows_extracted", 
    "throughput_rows_per_second", "memory_usage_mb_after"
).orderBy(F.desc("throughput_rows_per_second"))

Warehouse:

-- Performance trending
SELECT 
    extract_name,
    AVG(throughput_rows_per_second) as avg_throughput,
    AVG(duration_seconds) as avg_duration,
    AVG(memory_usage_mb_after - memory_usage_mb_before) as avg_memory_increase,
    COUNT(*) as run_count
FROM config.log_extract_generation
WHERE run_status = 'SUCCESS'
    AND run_timestamp >= DATEADD(day, -7, GETDATE())
GROUP BY extract_name
ORDER BY avg_throughput DESC;

Advanced Features

Dynamic Location Resolution

Configure flexible source and target locations:

# Multi-workspace configuration
Row(
    extract_name="CROSS_WORKSPACE_EXPORT",
    # Source from Workspace A, Lakehouse X
    source_workspace_id="workspace-a-guid",
    source_datastore_id="lakehouse-x-guid", 
    source_datastore_type="lakehouse",
    source_schema_name="sales",

    # Target to Workspace B, Lakehouse Y
    target_workspace_id="workspace-b-guid",
    target_datastore_id="lakehouse-y-guid",
    target_datastore_type="lakehouse", 
    target_file_root_path="Files",

    # Table configuration
    extract_table_name="transactions",
    extract_table_schema="finance"
)

# Default workspace/datastore (most common)
Row(
    extract_name="SIMPLE_EXPORT",
    # Uses default workspace and datastore
    source_workspace_id=None,
    source_datastore_id=None,
    source_datastore_type="lakehouse",
    target_workspace_id=None,
    target_datastore_id=None,
    target_datastore_type="lakehouse"
)

Parallel Processing Configuration

# Notebook parameters for parallel execution
max_parallel_extracts = 5  # Run up to 5 extracts concurrently

# Example: Process 10 extracts with 3 parallel workers
# - Extracts 1-3: Running in parallel
# - Extracts 4-6: Queued, start when first batch completes
# - Extracts 7-10: Queued for subsequent batches

# Thread-safe logging shows concurrent execution:
# [Thread-1] Starting extract: CUSTOMERS_SNAPSHOT
# [Thread-2] Starting extract: PRODUCTS_SNAPSHOT  
# [Thread-3] Starting extract: ORDERS_INCREMENTAL

Single File Optimization

Force single output files for better performance:

force_single_file=True  # Uses repartition(1) for single file output

# Benefits:
# - Avoids small file problems
# - Better for downstream processing
# - Consistent file naming
# - Improved transfer efficiency

# Trade-offs:
# - All data processed by single executor
# - May be slower for very large datasets (>10M rows)
# - Consider file splitting for massive datasets

File Splitting for Large Datasets

file_properties_max_rows_per_file=500000  # Split at 500K rows

# Output files will be named:
# customers_20240725_142530_part0001.csv
# customers_20240725_142530_part0002.csv  
# customers_20240725_142530_part0003.csv

Trigger Files

Generate trigger files for downstream processes:

is_trigger_file=True
trigger_file_extension=".done"

# Creates: customers_20240725_142530.csv.done
# Signals completion to downstream systems

File Path Patterns

Use placeholders in path and filename patterns:

Path Placeholders

  • {year} - Current year (YYYY)
  • {month} - Current month (MM)
  • {day} - Current day (DD)
  • {hour} - Current hour (HH)
  • {execution_group} - Execution group name

Filename Placeholders

  • {timestamp} - Full timestamp (YYYYMMDD_HHMMSS)
  • {date} - Date only (YYYYMMDD)
  • {extract_name} - Extract configuration name
  • {run_id} - Unique run identifier

Example patterns:

extract_container="Files/extracts"
extract_directory="{execution_group}/{extract_name}/{year}/{month}"
extract_file_name="{extract_name}_{timestamp}"
extract_file_name_extension="csv"

# Results in: Files/extracts/CUSTOMER_EXTRACTS/SAMPLE_CUSTOMERS/2024/07/SAMPLE_CUSTOMERS_20240725_142530.csv

Best Practices

Performance Optimization

  1. Use Pure PySpark Operations
  2. Package automatically uses PySpark DataFrame API
  3. No manual Pandas conversions needed
  4. Leverages Spark's distributed processing

  5. Choose Appropriate File Formats

  6. Parquet: Analytics workloads, best compression
  7. CSV: Human-readable, universal compatibility
  8. TSV: Text processing, comma-free data
  9. DAT: Legacy system integration

  10. Configure Single File Output

    force_single_file=True  # For datasets < 10M rows
    

  11. Use Parallel Processing

    max_parallel_extracts=3  # Based on cluster capacity
    

  12. Enable Smart Compression

    is_compressed=True
    compressed_type="SNAPPY"  # Fast compression for Parquet
    compressed_type="GZIP"    # Universal compatibility for CSV
    

Reliability

  1. Location Resolution

    # Explicit configuration prevents runtime failures
    source_datastore_type="lakehouse"  
    target_datastore_type="lakehouse"
    target_file_root_path="Files"
    

  2. Monitor Performance Metrics

  3. Set up alerts for throughput degradation
  4. Monitor memory usage trends
  5. Track duration increases

  6. Use Execution Groups

    execution_group="CRITICAL_EXPORTS"  # Priority grouping
    execution_group="BATCH_EXPORTS"     # Background processing
    

Security

  1. Leverage Fabric Permissions
  2. Extract jobs run under workspace identity
  3. Inherit source object permissions
  4. Control access to output locations

  5. Secure File Paths

    target_file_root_path="Files/secure"    # Dedicated directories
    extract_directory="exports/sensitive"   # Access-controlled paths
    

  6. Comprehensive Audit Trail

  7. All extractions logged with performance metrics
  8. Thread-safe concurrent logging
  9. Track execution across parallel jobs

Troubleshooting

Common Issues

Extract Fails with Format Error

SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: tsv
Solution: TSV/DAT formats are automatically mapped to CSV with appropriate delimiters. No action needed in latest version.

Performance Degradation

Throughput dropped from 1.2M rows/sec to 50K rows/sec
Solutions: - Check for Pandas conversions (eliminated in latest version) - Verify force_single_file=True for small-medium datasets - Monitor memory usage and cluster resources - Use parallel processing: max_parallel_extracts=3

Location Resolution Errors

Cannot resolve source lakehouse/workspace
Solutions: - Verify workspace/datastore IDs are correct - Use None for default workspace/datastore - Check datastore type: 'lakehouse' vs 'warehouse' - Ensure target paths exist: target_file_root_path="Files"

Parallel Processing Issues

Thread conflicts or resource exhaustion
Solutions: - Reduce max_parallel_extracts based on cluster size - Monitor memory usage per thread - Use execution groups to control priority - Check for concurrent access to same resources

Debug Mode

Enable detailed logging:

# Set in notebook parameters
debug_mode=True
log_level="DEBUG"

# Outputs detailed performance metrics:
# [Thread-1] Memory before: 2048 MB
# [Thread-1] Processing 1M rows...
# [Thread-1] Memory after: 2156 MB  
# [Thread-1] Throughput: 1,219,512 rows/second

Examples

Example 1: High-Performance Customer Export

Optimized daily customer export with performance monitoring:

# Configuration with performance optimization
customer_config = Row(
    extract_name="CUSTOMERS_DAILY_OPTIMIZED",
    is_active=True,
    extract_table_name="customers", 
    extract_table_schema="default",
    is_full_load=True,
    execution_group="DAILY_EXPORTS"
)

customer_details = Row(
    extract_name="CUSTOMERS_DAILY_OPTIMIZED",
    output_format="parquet",                 # Best performance for analytics
    force_single_file=True,                  # Single file optimization
    is_compressed=True,
    compressed_type="SNAPPY",                # Fast compression
    extract_container="Files/extracts",
    extract_directory="customers/{year}/{month}",
    extract_file_name="customers_{date}",
    extract_file_name_extension="parquet",
    # Performance defaults
    source_datastore_type="lakehouse",
    target_datastore_type="lakehouse", 
    target_file_root_path="Files"
)

# Expected performance: ~1M+ rows/second

Example 2: Multi-Format Product Export

Export products in multiple formats simultaneously:

# CSV Format
csv_config = Row(
    extract_name="PRODUCTS_CSV",
    extract_table_name="products",
    output_format="csv",
    file_properties_column_delimiter=",",
    force_single_file=True
)

# TSV Format  
tsv_config = Row(
    extract_name="PRODUCTS_TSV", 
    extract_table_name="products",
    output_format="tsv",                     # Maps to CSV with tab delimiter
    file_properties_column_delimiter="\t",
    force_single_file=True
)

# DAT Format
dat_config = Row(
    extract_name="PRODUCTS_DAT",
    extract_table_name="products", 
    output_format="dat",                     # Maps to CSV with pipe delimiter
    file_properties_column_delimiter="|",
    force_single_file=True
)

# Run in parallel with max_parallel_extracts=3

Example 3: Cross-Workspace Extract

Extract from one workspace to another:

cross_workspace_config = Row(
    extract_name="CROSS_WORKSPACE_SALES",
    # Source: Production workspace, sales lakehouse
    source_workspace_id="prod-workspace-guid",
    source_datastore_id="sales-lakehouse-guid",
    source_datastore_type="lakehouse",
    source_schema_name="fact_tables",
    extract_table_name="sales_transactions",

    # Target: Analytics workspace, reporting lakehouse  
    target_workspace_id="analytics-workspace-guid",
    target_datastore_id="reporting-lakehouse-guid",
    target_datastore_type="lakehouse",
    target_file_root_path="Files",

    # Configuration
    is_full_load=True,
    execution_group="CROSS_WORKSPACE_EXPORTS"
)

cross_workspace_details = Row(
    extract_name="CROSS_WORKSPACE_SALES",
    output_format="parquet",
    is_compressed=True,
    compressed_type="SNAPPY", 
    force_single_file=False,                 # Allow partitioning for large dataset
    extract_container="Files/imports",
    extract_directory="sales/{year}/{month}",
    extract_file_name="sales_transactions_{timestamp}",
    extract_file_name_extension="parquet"
)

Sample Data and Testing

The package includes comprehensive sample data:

Sample Tables Created

  • customers - 5 sample customer records
  • products - 10 sample products with categories and pricing
  • orders - 100 sample orders with various statuses
  • order_items - 250+ line items with pricing details
  • inventory - 30 warehouse inventory records for TSV testing

Sample Extract Configurations

  • SAMPLE_CUSTOMERS_SNAPSHOT - Parquet export with single file optimization
  • SAMPLE_PRODUCTS_SNAPSHOT - Parquet with trigger file
  • SAMPLE_ORDERS_INCREMENTAL - Compressed Parquet for incremental processing
  • SAMPLE_ORDER_ITEMS_INCREMENTAL - Compressed Parquet with line item details
  • SAMPLE_INVENTORY_TSV - TSV format with tab delimiters
  • SAMPLE_PRODUCTS_DAT - DAT format with pipe delimiters

Testing Performance

Run performance tests:

# Test all sample extracts
ingen_fab package extract run

# Test specific format
ingen_fab package extract run --extract-name SAMPLE_INVENTORY_TSV

# Test parallel processing  
ingen_fab package extract run --extract-name "SAMPLE_*" --parallel --max-workers 3

Expected performance benchmarks: - Small datasets (< 1K rows): < 1 second - Medium datasets (< 100K rows): < 5 seconds
- Large datasets (1M+ rows): < 10 seconds - Throughput: 1M+ rows/second for optimized configurations

Advanced Integration

Synthetic Data Alignment

For using extract generation with synthetic data generated by the synthetic data package, see Extract Generation - Synthetic Data Alignment. This integration provides:

  • Pre-configured extract definitions for synthetic data schemas
  • Optimized extract patterns for test data workflows
  • Sample configurations for common synthetic data export scenarios

Next Steps

Migration from Legacy Versions

Key Changes in Latest Version

  1. Performance: 32x speed improvement with pure PySpark implementation
  2. Parallel Processing: Configurable concurrent extract execution
  3. Format Support: Native TSV/DAT support with proper Spark mapping
  4. Location Resolution: Dynamic workspace/datastore configuration
  5. Monitoring: Comprehensive performance metrics and threading information
  6. Architecture: PySpark DataFrame API replaces SQL for type safety

Breaking Changes

  • Configuration schema updated with location resolution fields
  • Performance metrics added to log schema
  • Format mapping changed (TSV/DAT now map to CSV internally)
  • Deprecated Pandas-based processing (automatic migration)

Migration Steps

  1. Update Configuration Schema: Add new location resolution fields
  2. Review Format Configurations: TSV/DAT formats now properly supported
  3. Enable Performance Monitoring: Update log table schema
  4. Test Parallel Processing: Start with max_parallel_extracts=1, then increase
  5. Validate Performance: Expect significant speed improvements

The package maintains backward compatibility while providing substantial performance and feature enhancements.