Extract Generation Package¶
Home > Packages > Extract Generation
At a Glance¶
| Item | Summary |
|---|---|
| Purpose | Generate file extracts (CSV/TSV/DAT/Parquet) from lakehouse/warehouse objects with high-performance processing. |
| Inputs | Extract job and detail configs in config_extract_generation and config_extract_generation_details. |
| Outputs | Files written to OneLake paths; performance metrics in log_extract_generation. |
| Core commands | ingen_fab package extract compile, ingen_fab package extract run. |
| When to use | You need high-performance, parallel, configurable exports with compression, partitioning, and detailed monitoring. |
Note: For CLI flags and examples, refer to User Guide → CLI Reference and Deploy Guide.
The Extract Generation package provides high-performance automated file extract capabilities from Microsoft Fabric lakehouses and warehouses. It features dynamic location resolution, parallel processing, comprehensive performance monitoring, and support for multiple output formats.
Overview¶
The Extract Generation package allows you to: - High-Performance Processing: Pure PySpark implementation with 32x performance improvements over legacy approaches - Parallel Execution: Configurable concurrent extract processing (1-10 parallel jobs) - Multiple Formats: CSV, TSV (tab-delimited), DAT (pipe-delimited), Parquet with proper Spark format mapping - Dynamic Location Resolution: Flexible source/target workspace and datastore configuration - Performance Monitoring: Real-time throughput, memory usage, and execution metrics - Advanced File Options: Compression, single-file forcing, custom delimiters - Comprehensive Logging: Thread-safe execution tracking with detailed performance analytics
Installation & Setup¶
1. Compile the Package¶
# Compile extract generation package with sample configurations
ingen_fab package extract compile --include-samples
# Compile for specific target datastore
ingen_fab package extract compile --target-datastore lakehouse
ingen_fab package extract compile --target-datastore warehouse
Note: The package now supports both lakehouse and warehouse deployment modes. For compatibility with synthetic data tables generated by the
synthetic_data_generationpackage, see Extract Generation - Synthetic Data Alignment.
2. Deploy DDL Scripts¶
The package creates the following configuration tables:
Lakehouse Mode: - config_extract_generation - Extract job definitions with location resolution - config_extract_generation_details - File format specifications with performance options - log_extract_generation - Execution history with performance metrics
Warehouse Mode: - config.config_extract_generation - Extract job definitions - config.config_extract_generation_details - File format specifications
- config.log_extract_generation - Execution history and metrics
3. Configure Extracts¶
Insert extract configurations into the metadata tables or use the provided sample data that includes configurations for TSV, DAT, CSV, and Parquet formats.
Architecture & Performance¶
Pure PySpark Implementation¶
The package has been completely rewritten for optimal performance: - No Pandas Conversions: Direct Spark DataFrame operations for 32x speed improvement - PySpark DataFrame API: Replaced SQL queries with type-safe DataFrame operations - Memory Efficient: Smart partitioning with repartition(1) for single files - Location Resolution: Dynamic workspace and datastore resolution using location_resolver.py pattern
Performance Benchmarks¶
| Dataset Size | Legacy Time | Optimized Time | Speedup |
|---|---|---|---|
| 1M rows | 26.7s | 0.82s | 32.6x |
| 10M rows | ~4.5 min | ~8.2s | 33x |
| 100M rows | ~45 min | ~82s | 33x |
Parallel Processing¶
Configure concurrent extract execution:
# Notebook parameter for parallel execution
max_parallel_extracts = 5 # Run up to 5 extracts concurrently
# Default is 1 for backward compatibility
max_parallel_extracts = 1 # Serial execution
Benefits of parallel processing: - Thread-Safe Operations: Concurrent logging and progress tracking - Resource Optimization: Better cluster utilization - Faster Completion: Significant reduction in total execution time - Configurable Concurrency: Control resource usage based on cluster capacity
Configuration¶
Location Resolution Architecture¶
The new architecture supports flexible source and target configuration:
Source Configuration (where data comes from)¶
source_workspace_id- Source workspace (optional, uses default if null)source_datastore_id- Source lakehouse/warehouse ID (optional, uses default if null)source_datastore_type- 'lakehouse' or 'warehouse'source_schema_name- Schema name for data access
Target Configuration (where files are written)¶
target_workspace_id- Target workspace (optional, uses default if null)target_datastore_id- Target lakehouse/warehouse ID (optional, uses default if null)target_datastore_type- 'lakehouse' or 'warehouse'target_file_root_path- Root path like 'Files' or 'Tables'
Extract Generation Configuration¶
Define extract jobs in the main configuration table:
Lakehouse Configuration:
# Sample lakehouse extract configuration
Row(
extract_name="SAMPLE_CUSTOMERS_EXPORT",
is_active=True,
trigger_name=None,
extract_pipeline_name=None,
extract_table_name="customers",
extract_table_schema="default",
extract_view_name=None,
extract_view_schema=None,
is_full_load=True,
execution_group="CUSTOMER_EXTRACTS",
created_date="2024-01-15",
created_by="system"
)
Warehouse Configuration:
INSERT INTO config.config_extract_generation (
extract_name,
extract_description,
active_yn,
extract_type,
fabric_workspace_id,
fabric_lakehouse_id,
fabric_warehouse_id,
source_type,
source_schema,
source_object,
extract_sql,
incremental_yn,
incremental_column,
incremental_days_back,
execution_group
) VALUES (
'CUSTOMER_DAILY_EXPORT',
'Daily customer data export',
'Y',
'TABLE',
'workspace-guid',
NULL,
'warehouse-guid',
'TABLE',
'dbo',
'customers',
NULL,
'N',
NULL,
NULL,
'CUSTOMER_EXTRACTS'
);
Extract Details Configuration¶
Define file output specifications with enhanced format support:
Enhanced Format Support:
# TSV (Tab-Separated Values) configuration
Row(
extract_name="SAMPLE_INVENTORY_TSV",
output_format="tsv", # Maps to CSV format in Spark
file_properties_column_delimiter="\t", # Tab delimiter
file_properties_header=True,
force_single_file=True, # Performance optimization
# Location resolution
source_workspace_id=None, # Uses default workspace
source_datastore_id=None, # Uses default lakehouse
source_datastore_type="lakehouse",
target_workspace_id=None,
target_datastore_id=None,
target_datastore_type="lakehouse",
target_file_root_path="Files"
)
# DAT (Pipe-Delimited) configuration
Row(
extract_name="SAMPLE_PRODUCTS_DAT",
output_format="dat", # Maps to CSV format in Spark
file_properties_column_delimiter="|", # Pipe delimiter
file_properties_header=True,
force_single_file=True,
# Location resolution
source_datastore_type="lakehouse",
target_datastore_type="lakehouse",
target_file_root_path="Files"
)
Performance Options:
# Force single file output (recommended for small-medium datasets)
force_single_file=True
# File size limits for automatic splitting
file_properties_max_rows_per_file=1000000
# Compression options
is_compressed=True
compressed_type="SNAPPY" # Fast compression
compressed_level="NORMAL"
Supported File Formats¶
Core Formats with Spark Mapping¶
| Format | Spark Format | Delimiter | Use Case |
|---|---|---|---|
| CSV | csv | , (comma) | Standard exports, Excel compatibility |
| TSV | csv | \t (tab) | Text processing, avoiding comma conflicts |
| DAT | csv | \| (pipe) | Legacy system compatibility |
| PARQUET | parquet | N/A | Analytics, high-performance queries |
Format-Specific Configuration¶
CSV Format¶
output_format="csv"
file_properties_column_delimiter=","
file_properties_quote_character='"'
file_properties_escape_character="\\"
file_properties_header=True
TSV Format¶
output_format="tsv" # Mapped to CSV in Spark
file_properties_column_delimiter="\t" # Tab separator
file_properties_quote_character='"'
file_properties_header=True
DAT Format¶
output_format="dat" # Mapped to CSV in Spark
file_properties_column_delimiter="|" # Pipe separator
file_properties_quote_character="" # Often no quoting
file_properties_header=True
Parquet Format¶
output_format="parquet"
is_compressed=True
compressed_type="SNAPPY" # Recommended for Parquet
force_single_file=True # Single file optimization
Compression Options¶
| Type | Speed | Compression | Compatibility | Use Case |
|---|---|---|---|---|
| NONE | Fastest | None | Universal | Network-fast environments |
| SNAPPY | Fast | Good | Spark native | Analytics, Parquet files |
| GZIP | Medium | Better | Universal | General purpose |
| ZIP | Medium | Good | Universal | Windows compatibility |
| LZ4 | Very Fast | Good | Modern systems | High-throughput scenarios |
Usage¶
Running Extracts¶
# Run specific extract
ingen_fab package extract run --extract-name SAMPLE_CUSTOMERS_EXPORT
# Run all active extracts (serial)
ingen_fab package extract run
# Run with parallel processing (if supported by implementation)
ingen_fab package extract run --parallel --max-workers 5
# Run specific execution group
ingen_fab package extract run --execution-group CUSTOMER_EXTRACTS
Extract Types¶
1. Table Extract (Lakehouse)¶
Direct export from a lakehouse table using read_table:
extract_table_name="customers"
extract_table_schema="default"
source_datastore_type="lakehouse"
# Uses: source_lakehouse.read_table(table_name="customers", schema_name="default")
2. View Extract (Lakehouse)¶
Export from a lakehouse view:
extract_view_name="customer_summary"
extract_view_schema="reporting"
source_datastore_type="lakehouse"
3. Table Extract (Warehouse)¶
Direct export from warehouse table:
extract_type="TABLE"
source_schema="dbo"
source_object="customers"
source_datastore_type="warehouse"
4. Stored Procedure Extract (Warehouse)¶
Execute procedure and export results:
Performance Monitoring & Metrics¶
Real-Time Performance Tracking¶
The package now includes comprehensive performance monitoring:
# Performance metrics collected automatically
throughput_rows_per_second=1219512 # Actual processing speed
memory_usage_mb_before=2048 # Memory before processing
memory_usage_mb_after=2156 # Memory after processing
duration_seconds=820 # Total execution time
rows_extracted=1000000 # Total rows processed
Performance Helper Functions¶
Built-in performance tracking:
def get_memory_usage_mb():
"""Get current JVM memory usage in MB"""
def calculate_performance_metrics(start_time, end_time, row_count, memory_before, memory_after):
"""Calculate comprehensive performance metrics"""
def print_performance_summary(metrics_dict):
"""Print formatted performance summary"""
Sample Performance Output¶
📊 PERFORMANCE SUMMARY 📊
╔══════════════════════════════════════╗
║ Extract: SAMPLE_CUSTOMERS_SNAPSHOT ║
║ Duration: 0.82 seconds ║
║ Rows: 1,000,000 ║
║ Throughput: 1,219,512 rows/second ║
║ Memory: 2048 MB → 2156 MB (+108 MB) ║
║ Status: ✅ SUCCESS ║
╚══════════════════════════════════════╝
Monitoring Queries¶
Query performance metrics:
Lakehouse:
# Read performance logs
performance_df = target_lakehouse.read_table("log_extract_generation")
# Filter recent high-performance runs
high_perf_df = performance_df.filter(
(F.col("run_status") == "SUCCESS") &
(F.col("throughput_rows_per_second") > 1000000)
).select(
"extract_name", "duration_seconds", "rows_extracted",
"throughput_rows_per_second", "memory_usage_mb_after"
).orderBy(F.desc("throughput_rows_per_second"))
Warehouse:
-- Performance trending
SELECT
extract_name,
AVG(throughput_rows_per_second) as avg_throughput,
AVG(duration_seconds) as avg_duration,
AVG(memory_usage_mb_after - memory_usage_mb_before) as avg_memory_increase,
COUNT(*) as run_count
FROM config.log_extract_generation
WHERE run_status = 'SUCCESS'
AND run_timestamp >= DATEADD(day, -7, GETDATE())
GROUP BY extract_name
ORDER BY avg_throughput DESC;
Advanced Features¶
Dynamic Location Resolution¶
Configure flexible source and target locations:
# Multi-workspace configuration
Row(
extract_name="CROSS_WORKSPACE_EXPORT",
# Source from Workspace A, Lakehouse X
source_workspace_id="workspace-a-guid",
source_datastore_id="lakehouse-x-guid",
source_datastore_type="lakehouse",
source_schema_name="sales",
# Target to Workspace B, Lakehouse Y
target_workspace_id="workspace-b-guid",
target_datastore_id="lakehouse-y-guid",
target_datastore_type="lakehouse",
target_file_root_path="Files",
# Table configuration
extract_table_name="transactions",
extract_table_schema="finance"
)
# Default workspace/datastore (most common)
Row(
extract_name="SIMPLE_EXPORT",
# Uses default workspace and datastore
source_workspace_id=None,
source_datastore_id=None,
source_datastore_type="lakehouse",
target_workspace_id=None,
target_datastore_id=None,
target_datastore_type="lakehouse"
)
Parallel Processing Configuration¶
# Notebook parameters for parallel execution
max_parallel_extracts = 5 # Run up to 5 extracts concurrently
# Example: Process 10 extracts with 3 parallel workers
# - Extracts 1-3: Running in parallel
# - Extracts 4-6: Queued, start when first batch completes
# - Extracts 7-10: Queued for subsequent batches
# Thread-safe logging shows concurrent execution:
# [Thread-1] Starting extract: CUSTOMERS_SNAPSHOT
# [Thread-2] Starting extract: PRODUCTS_SNAPSHOT
# [Thread-3] Starting extract: ORDERS_INCREMENTAL
Single File Optimization¶
Force single output files for better performance:
force_single_file=True # Uses repartition(1) for single file output
# Benefits:
# - Avoids small file problems
# - Better for downstream processing
# - Consistent file naming
# - Improved transfer efficiency
# Trade-offs:
# - All data processed by single executor
# - May be slower for very large datasets (>10M rows)
# - Consider file splitting for massive datasets
File Splitting for Large Datasets¶
file_properties_max_rows_per_file=500000 # Split at 500K rows
# Output files will be named:
# customers_20240725_142530_part0001.csv
# customers_20240725_142530_part0002.csv
# customers_20240725_142530_part0003.csv
Trigger Files¶
Generate trigger files for downstream processes:
is_trigger_file=True
trigger_file_extension=".done"
# Creates: customers_20240725_142530.csv.done
# Signals completion to downstream systems
File Path Patterns¶
Use placeholders in path and filename patterns:
Path Placeholders¶
{year}- Current year (YYYY){month}- Current month (MM){day}- Current day (DD){hour}- Current hour (HH){execution_group}- Execution group name
Filename Placeholders¶
{timestamp}- Full timestamp (YYYYMMDD_HHMMSS){date}- Date only (YYYYMMDD){extract_name}- Extract configuration name{run_id}- Unique run identifier
Example patterns:
extract_container="Files/extracts"
extract_directory="{execution_group}/{extract_name}/{year}/{month}"
extract_file_name="{extract_name}_{timestamp}"
extract_file_name_extension="csv"
# Results in: Files/extracts/CUSTOMER_EXTRACTS/SAMPLE_CUSTOMERS/2024/07/SAMPLE_CUSTOMERS_20240725_142530.csv
Best Practices¶
Performance Optimization¶
- Use Pure PySpark Operations
- Package automatically uses PySpark DataFrame API
- No manual Pandas conversions needed
-
Leverages Spark's distributed processing
-
Choose Appropriate File Formats
- Parquet: Analytics workloads, best compression
- CSV: Human-readable, universal compatibility
- TSV: Text processing, comma-free data
-
DAT: Legacy system integration
-
Configure Single File Output
-
Use Parallel Processing
-
Enable Smart Compression
Reliability¶
-
Location Resolution
-
Monitor Performance Metrics
- Set up alerts for throughput degradation
- Monitor memory usage trends
-
Track duration increases
-
Use Execution Groups
Security¶
- Leverage Fabric Permissions
- Extract jobs run under workspace identity
- Inherit source object permissions
-
Control access to output locations
-
Secure File Paths
-
Comprehensive Audit Trail
- All extractions logged with performance metrics
- Thread-safe concurrent logging
- Track execution across parallel jobs
Troubleshooting¶
Common Issues¶
Extract Fails with Format Error¶
Solution: TSV/DAT formats are automatically mapped to CSV with appropriate delimiters. No action needed in latest version.Performance Degradation¶
Solutions: - Check for Pandas conversions (eliminated in latest version) - Verifyforce_single_file=True for small-medium datasets - Monitor memory usage and cluster resources - Use parallel processing: max_parallel_extracts=3 Location Resolution Errors¶
Solutions: - Verify workspace/datastore IDs are correct - UseNone for default workspace/datastore - Check datastore type: 'lakehouse' vs 'warehouse' - Ensure target paths exist: target_file_root_path="Files" Parallel Processing Issues¶
Solutions: - Reducemax_parallel_extracts based on cluster size - Monitor memory usage per thread - Use execution groups to control priority - Check for concurrent access to same resources Debug Mode¶
Enable detailed logging:
# Set in notebook parameters
debug_mode=True
log_level="DEBUG"
# Outputs detailed performance metrics:
# [Thread-1] Memory before: 2048 MB
# [Thread-1] Processing 1M rows...
# [Thread-1] Memory after: 2156 MB
# [Thread-1] Throughput: 1,219,512 rows/second
Examples¶
Example 1: High-Performance Customer Export¶
Optimized daily customer export with performance monitoring:
# Configuration with performance optimization
customer_config = Row(
extract_name="CUSTOMERS_DAILY_OPTIMIZED",
is_active=True,
extract_table_name="customers",
extract_table_schema="default",
is_full_load=True,
execution_group="DAILY_EXPORTS"
)
customer_details = Row(
extract_name="CUSTOMERS_DAILY_OPTIMIZED",
output_format="parquet", # Best performance for analytics
force_single_file=True, # Single file optimization
is_compressed=True,
compressed_type="SNAPPY", # Fast compression
extract_container="Files/extracts",
extract_directory="customers/{year}/{month}",
extract_file_name="customers_{date}",
extract_file_name_extension="parquet",
# Performance defaults
source_datastore_type="lakehouse",
target_datastore_type="lakehouse",
target_file_root_path="Files"
)
# Expected performance: ~1M+ rows/second
Example 2: Multi-Format Product Export¶
Export products in multiple formats simultaneously:
# CSV Format
csv_config = Row(
extract_name="PRODUCTS_CSV",
extract_table_name="products",
output_format="csv",
file_properties_column_delimiter=",",
force_single_file=True
)
# TSV Format
tsv_config = Row(
extract_name="PRODUCTS_TSV",
extract_table_name="products",
output_format="tsv", # Maps to CSV with tab delimiter
file_properties_column_delimiter="\t",
force_single_file=True
)
# DAT Format
dat_config = Row(
extract_name="PRODUCTS_DAT",
extract_table_name="products",
output_format="dat", # Maps to CSV with pipe delimiter
file_properties_column_delimiter="|",
force_single_file=True
)
# Run in parallel with max_parallel_extracts=3
Example 3: Cross-Workspace Extract¶
Extract from one workspace to another:
cross_workspace_config = Row(
extract_name="CROSS_WORKSPACE_SALES",
# Source: Production workspace, sales lakehouse
source_workspace_id="prod-workspace-guid",
source_datastore_id="sales-lakehouse-guid",
source_datastore_type="lakehouse",
source_schema_name="fact_tables",
extract_table_name="sales_transactions",
# Target: Analytics workspace, reporting lakehouse
target_workspace_id="analytics-workspace-guid",
target_datastore_id="reporting-lakehouse-guid",
target_datastore_type="lakehouse",
target_file_root_path="Files",
# Configuration
is_full_load=True,
execution_group="CROSS_WORKSPACE_EXPORTS"
)
cross_workspace_details = Row(
extract_name="CROSS_WORKSPACE_SALES",
output_format="parquet",
is_compressed=True,
compressed_type="SNAPPY",
force_single_file=False, # Allow partitioning for large dataset
extract_container="Files/imports",
extract_directory="sales/{year}/{month}",
extract_file_name="sales_transactions_{timestamp}",
extract_file_name_extension="parquet"
)
Sample Data and Testing¶
The package includes comprehensive sample data:
Sample Tables Created¶
customers- 5 sample customer recordsproducts- 10 sample products with categories and pricingorders- 100 sample orders with various statusesorder_items- 250+ line items with pricing detailsinventory- 30 warehouse inventory records for TSV testing
Sample Extract Configurations¶
SAMPLE_CUSTOMERS_SNAPSHOT- Parquet export with single file optimizationSAMPLE_PRODUCTS_SNAPSHOT- Parquet with trigger fileSAMPLE_ORDERS_INCREMENTAL- Compressed Parquet for incremental processingSAMPLE_ORDER_ITEMS_INCREMENTAL- Compressed Parquet with line item detailsSAMPLE_INVENTORY_TSV- TSV format with tab delimitersSAMPLE_PRODUCTS_DAT- DAT format with pipe delimiters
Testing Performance¶
Run performance tests:
# Test all sample extracts
ingen_fab package extract run
# Test specific format
ingen_fab package extract run --extract-name SAMPLE_INVENTORY_TSV
# Test parallel processing
ingen_fab package extract run --extract-name "SAMPLE_*" --parallel --max-workers 3
Expected performance benchmarks: - Small datasets (< 1K rows): < 1 second - Medium datasets (< 100K rows): < 5 seconds
- Large datasets (1M+ rows): < 10 seconds - Throughput: 1M+ rows/second for optimized configurations
Advanced Integration¶
Synthetic Data Alignment¶
For using extract generation with synthetic data generated by the synthetic data package, see Extract Generation - Synthetic Data Alignment. This integration provides:
- Pre-configured extract definitions for synthetic data schemas
- Optimized extract patterns for test data workflows
- Sample configurations for common synthetic data export scenarios
Next Steps¶
- Review the sample configurations in
sample_project/ddl_scripts/Lakehouses/Config/for complete examples - Explore performance optimization techniques
- Learn about integration with orchestration tools
- Test parallel processing with your cluster configuration
- Monitor performance metrics and optimize based on your data patterns
Migration from Legacy Versions¶
Key Changes in Latest Version¶
- Performance: 32x speed improvement with pure PySpark implementation
- Parallel Processing: Configurable concurrent extract execution
- Format Support: Native TSV/DAT support with proper Spark mapping
- Location Resolution: Dynamic workspace/datastore configuration
- Monitoring: Comprehensive performance metrics and threading information
- Architecture: PySpark DataFrame API replaces SQL for type safety
Breaking Changes¶
- Configuration schema updated with location resolution fields
- Performance metrics added to log schema
- Format mapping changed (TSV/DAT now map to CSV internally)
- Deprecated Pandas-based processing (automatic migration)
Migration Steps¶
- Update Configuration Schema: Add new location resolution fields
- Review Format Configurations: TSV/DAT formats now properly supported
- Enable Performance Monitoring: Update log table schema
- Test Parallel Processing: Start with
max_parallel_extracts=1, then increase - Validate Performance: Expect significant speed improvements
The package maintains backward compatibility while providing substantial performance and feature enhancements.