Skip to content

Incremental Synthetic Data Import Integration

Home > Packages > Incremental Synthetic Data Import Integration

Related: Flat File Ingestion | Synthetic Data Generation | Incremental Generation

This document explains how to use the enhanced flat file ingestion framework to import incremental synthetic data generated by the synthetic data generation package.

Overview

The incremental synthetic data generator creates date-partitioned files that can be imported using the flat file ingestion framework's new batch processing capabilities. This integration provides a complete end-to-end solution for generating and importing time-series synthetic data.

Data Structure

The incremental synthetic data generator creates files in the following structure:

tmp/spark/Files/synthetic_data/retail_oltp_incremental/
├── 2024/01/01/
│   ├── customers/
│   │   └── customers_20240101.csv
│   ├── products/
│   │   └── products_20240101.csv
│   ├── stores/
│   │   └── stores_20240101.csv
│   ├── orders/
│   │   └── orders_20240101.csv
│   ├── order_items/
│   │   └── order_items_20240101.csv
│   └── inventory/
│       └── inventory_20240101.csv
├── 2024/01/02/
│   └── ... (same structure)
└── ...

Table Types and Import Strategies

1. Snapshot Tables

Tables that maintain current state as of a specific date:

Table Description Write Mode Merge Keys Skip Existing
customers Customer master data merge customer_id Yes
products Product master data merge product_id Yes
stores Store master data merge store_id Yes

Characteristics: - Updated occasionally (customers grow/churn, products change weekly) - Each date contains the complete current state - Use merge mode to maintain latest state - Skip existing dates to avoid unnecessary reprocessing

2. Incremental Tables

Tables containing only new records for each date:

Table Description Write Mode Merge Keys Skip Existing
orders Daily new orders append None No
order_items Daily new order items append None No

Characteristics: - New records added daily - Each file contains only new data for that date - Use append mode to accumulate historical data - Don't skip existing dates (may need reprocessing)

3. Daily Snapshot Tables

Tables with daily snapshots using composite keys:

Table Description Write Mode Merge Keys Skip Existing
inventory Daily inventory levels merge store_id,product_id,date Yes

Characteristics: - Complete snapshot for each date - Use composite merge keys to handle multiple dimensions - Skip existing dates for performance

Sample Configurations

The sample configurations are provided in DDL scripts:

Lakehouse Version

Location: ddl_scripts/Lakehouses/Config/002_Sample_Data_Ingestion/004_incremental_synthetic_data_sample_configs.py

Warehouse Version

Location: ddl_scripts/Warehouses/Config_WH/002_Sample_Data_Ingestion/004_incremental_synthetic_data_sample_configs.sql

Key Configuration Features

Date-Partitioned Processing

import_pattern = 'date_partitioned'
date_partition_format = 'YYYY/MM/DD'
file_discovery_pattern = '**/customers/*.csv'

Batch Processing Control

batch_import_enabled = TRUE
date_range_start = '2024-01-01'
date_range_end = '2024-01-30'
skip_existing_dates = TRUE  -- For snapshot tables

Execution Sequencing

table_relationship_group = 'retail_oltp'
import_sequence_order = 1  -- Process order within group
execution_group = 1        -- Group for batch execution

Execution Strategy

Step 1: Generate Synthetic Data

# Generate incremental synthetic data
ingen_fab package synthetic-data generate-series retail_oltp_incremental \
  --start-date 2024-01-01 \
  --end-date 2024-01-30

Step 2: Deploy Configuration Tables

# Deploy flat file ingestion DDL scripts
ingen_fab ddl compile --generation-mode=lakehouse

Step 3: Load Sample Configurations

# Execute the sample configuration DDL scripts
# (Run the generated notebook from step 2)

Step 4: Import Data by Execution Groups

# Group 1: Master/Reference Data
ingen_fab run package flat-file-ingestion run --execution-group=1

# Group 2: Transactional Data  
ingen_fab run package flat-file-ingestion run --execution-group=2

# Group 3: Snapshot Data
ingen_fab run package flat-file-ingestion run --execution-group=3

Execution Groups

Group Tables Purpose Processing Order
1 customers, products, stores Master data First (referential integrity)
2 orders, order_items Transactional data Second (depends on master)
3 inventory Daily snapshots Third (independent)

Configuration Details

Snapshot Table Example (Customers)

Row(
    config_id="retail_customers_incremental",
    source_file_path="tmp/spark/Files/synthetic_data/retail_oltp_incremental",
    write_mode="merge",
    merge_keys="customer_id",
    import_pattern="date_partitioned",
    date_partition_format="YYYY/MM/DD",
    file_discovery_pattern="**/customers/*.csv",
    skip_existing_dates=True,
    execution_group=1,
    import_sequence_order=1
)

Incremental Table Example (Orders)

Row(
    config_id="retail_orders_incremental", 
    source_file_path="tmp/spark/Files/synthetic_data/retail_oltp_incremental",
    write_mode="append",
    merge_keys=None,
    import_pattern="date_partitioned", 
    date_partition_format="YYYY/MM/DD",
    file_discovery_pattern="**/orders/*.csv",
    skip_existing_dates=False,
    execution_group=2,
    import_sequence_order=4
)

Monitoring and Validation

Check Import Status

SELECT 
    config_id,
    execution_id,
    status,
    records_processed,
    records_inserted,
    records_updated,
    execution_duration_seconds,
    error_message
FROM log_flat_file_ingestion
WHERE config_id LIKE 'retail_%_incremental%'
ORDER BY job_start_time DESC;

Verify Data Completeness

-- Check date coverage
SELECT 
    'customers' as table_name,
    DATE_TRUNC('day', _ingestion_timestamp) as ingestion_date,
    COUNT(*) as record_count
FROM bronze.customers
GROUP BY DATE_TRUNC('day', _ingestion_timestamp)

UNION ALL

SELECT 
    'orders' as table_name,
    DATE_TRUNC('day', order_date) as ingestion_date,
    COUNT(*) as record_count  
FROM bronze.orders
GROUP BY DATE_TRUNC('day', order_date)

ORDER BY table_name, ingestion_date;

Check Referential Integrity

-- Verify order-customer relationships
SELECT 
    COUNT(*) as total_orders,
    COUNT(DISTINCT o.customer_id) as unique_customers,
    COUNT(c.customer_id) as valid_customer_refs
FROM bronze.orders o
LEFT JOIN bronze.customers c ON o.customer_id = c.customer_id
WHERE o.order_date >= '2024-01-01';

Best Practices

1. Processing Order

Always process execution groups in order: 1. Master data (Group 1) - establishes referential integrity 2. Transactional data (Group 2) - depends on master data 3. Snapshot data (Group 3) - independent processing

2. Date Range Management

  • Start with small date ranges for testing
  • Use skip_existing_dates=TRUE for snapshot tables
  • Use skip_existing_dates=FALSE for incremental tables

3. Error Handling

  • Use error_handling_strategy="log" for production
  • Monitor the log table for processing issues
  • Set up alerts for failed executions

4. Performance Optimization

  • Process related tables in the same execution group
  • Use appropriate partitioning strategies
  • Monitor resource usage during large imports

Troubleshooting

Common Issues

Issue: Files not found

Solution: Verify the synthetic data generation completed successfully
Check: tmp/spark/Files/synthetic_data/retail_oltp_incremental/

Issue: Referential integrity violations

Solution: Ensure master data (Group 1) is processed before transactional data (Group 2)

Issue: Duplicate records in snapshot tables

Solution: Verify merge_keys configuration and skip_existing_dates setting

Issue: Missing dates in incremental tables

Solution: Check skip_existing_dates=FALSE for incremental tables

Integration Benefits

  1. End-to-End Solution: Complete pipeline from data generation to import
  2. Date-Partitioned Processing: Efficient handling of time-series data
  3. Flexible Import Strategies: Different modes for different table types
  4. Referential Integrity: Ordered processing maintains data relationships
  5. Production Ready: Comprehensive error handling and monitoring
  6. Scalable: Handles large date ranges and data volumes efficiently

This integration provides a robust foundation for testing data pipelines with realistic, time-series synthetic data that maintains referential integrity and supports various import patterns.