Incremental Synthetic Data Import Integration¶
Home > Packages > Incremental Synthetic Data Import Integration
Related: Flat File Ingestion | Synthetic Data Generation | Incremental Generation
This document explains how to use the enhanced flat file ingestion framework to import incremental synthetic data generated by the synthetic data generation package.
Overview¶
The incremental synthetic data generator creates date-partitioned files that can be imported using the flat file ingestion framework's new batch processing capabilities. This integration provides a complete end-to-end solution for generating and importing time-series synthetic data.
Data Structure¶
The incremental synthetic data generator creates files in the following structure:
tmp/spark/Files/synthetic_data/retail_oltp_incremental/
├── 2024/01/01/
│ ├── customers/
│ │ └── customers_20240101.csv
│ ├── products/
│ │ └── products_20240101.csv
│ ├── stores/
│ │ └── stores_20240101.csv
│ ├── orders/
│ │ └── orders_20240101.csv
│ ├── order_items/
│ │ └── order_items_20240101.csv
│ └── inventory/
│ └── inventory_20240101.csv
├── 2024/01/02/
│ └── ... (same structure)
└── ...
Table Types and Import Strategies¶
1. Snapshot Tables¶
Tables that maintain current state as of a specific date:
| Table | Description | Write Mode | Merge Keys | Skip Existing |
|---|---|---|---|---|
| customers | Customer master data | merge | customer_id | Yes |
| products | Product master data | merge | product_id | Yes |
| stores | Store master data | merge | store_id | Yes |
Characteristics: - Updated occasionally (customers grow/churn, products change weekly) - Each date contains the complete current state - Use merge mode to maintain latest state - Skip existing dates to avoid unnecessary reprocessing
2. Incremental Tables¶
Tables containing only new records for each date:
| Table | Description | Write Mode | Merge Keys | Skip Existing |
|---|---|---|---|---|
| orders | Daily new orders | append | None | No |
| order_items | Daily new order items | append | None | No |
Characteristics: - New records added daily - Each file contains only new data for that date - Use append mode to accumulate historical data - Don't skip existing dates (may need reprocessing)
3. Daily Snapshot Tables¶
Tables with daily snapshots using composite keys:
| Table | Description | Write Mode | Merge Keys | Skip Existing |
|---|---|---|---|---|
| inventory | Daily inventory levels | merge | store_id,product_id,date | Yes |
Characteristics: - Complete snapshot for each date - Use composite merge keys to handle multiple dimensions - Skip existing dates for performance
Sample Configurations¶
The sample configurations are provided in DDL scripts:
Lakehouse Version¶
Location: ddl_scripts/Lakehouses/Config/002_Sample_Data_Ingestion/004_incremental_synthetic_data_sample_configs.py
Warehouse Version¶
Location: ddl_scripts/Warehouses/Config_WH/002_Sample_Data_Ingestion/004_incremental_synthetic_data_sample_configs.sql
Key Configuration Features¶
Date-Partitioned Processing¶
import_pattern = 'date_partitioned'
date_partition_format = 'YYYY/MM/DD'
file_discovery_pattern = '**/customers/*.csv'
Batch Processing Control¶
batch_import_enabled = TRUE
date_range_start = '2024-01-01'
date_range_end = '2024-01-30'
skip_existing_dates = TRUE -- For snapshot tables
Execution Sequencing¶
table_relationship_group = 'retail_oltp'
import_sequence_order = 1 -- Process order within group
execution_group = 1 -- Group for batch execution
Execution Strategy¶
Step 1: Generate Synthetic Data¶
# Generate incremental synthetic data
ingen_fab package synthetic-data generate-series retail_oltp_incremental \
--start-date 2024-01-01 \
--end-date 2024-01-30
Step 2: Deploy Configuration Tables¶
Step 3: Load Sample Configurations¶
Step 4: Import Data by Execution Groups¶
# Group 1: Master/Reference Data
ingen_fab run package flat-file-ingestion run --execution-group=1
# Group 2: Transactional Data
ingen_fab run package flat-file-ingestion run --execution-group=2
# Group 3: Snapshot Data
ingen_fab run package flat-file-ingestion run --execution-group=3
Execution Groups¶
| Group | Tables | Purpose | Processing Order |
|---|---|---|---|
| 1 | customers, products, stores | Master data | First (referential integrity) |
| 2 | orders, order_items | Transactional data | Second (depends on master) |
| 3 | inventory | Daily snapshots | Third (independent) |
Configuration Details¶
Snapshot Table Example (Customers)¶
Row(
config_id="retail_customers_incremental",
source_file_path="tmp/spark/Files/synthetic_data/retail_oltp_incremental",
write_mode="merge",
merge_keys="customer_id",
import_pattern="date_partitioned",
date_partition_format="YYYY/MM/DD",
file_discovery_pattern="**/customers/*.csv",
skip_existing_dates=True,
execution_group=1,
import_sequence_order=1
)
Incremental Table Example (Orders)¶
Row(
config_id="retail_orders_incremental",
source_file_path="tmp/spark/Files/synthetic_data/retail_oltp_incremental",
write_mode="append",
merge_keys=None,
import_pattern="date_partitioned",
date_partition_format="YYYY/MM/DD",
file_discovery_pattern="**/orders/*.csv",
skip_existing_dates=False,
execution_group=2,
import_sequence_order=4
)
Monitoring and Validation¶
Check Import Status¶
SELECT
config_id,
execution_id,
status,
records_processed,
records_inserted,
records_updated,
execution_duration_seconds,
error_message
FROM log_flat_file_ingestion
WHERE config_id LIKE 'retail_%_incremental%'
ORDER BY job_start_time DESC;
Verify Data Completeness¶
-- Check date coverage
SELECT
'customers' as table_name,
DATE_TRUNC('day', _ingestion_timestamp) as ingestion_date,
COUNT(*) as record_count
FROM bronze.customers
GROUP BY DATE_TRUNC('day', _ingestion_timestamp)
UNION ALL
SELECT
'orders' as table_name,
DATE_TRUNC('day', order_date) as ingestion_date,
COUNT(*) as record_count
FROM bronze.orders
GROUP BY DATE_TRUNC('day', order_date)
ORDER BY table_name, ingestion_date;
Check Referential Integrity¶
-- Verify order-customer relationships
SELECT
COUNT(*) as total_orders,
COUNT(DISTINCT o.customer_id) as unique_customers,
COUNT(c.customer_id) as valid_customer_refs
FROM bronze.orders o
LEFT JOIN bronze.customers c ON o.customer_id = c.customer_id
WHERE o.order_date >= '2024-01-01';
Best Practices¶
1. Processing Order¶
Always process execution groups in order: 1. Master data (Group 1) - establishes referential integrity 2. Transactional data (Group 2) - depends on master data 3. Snapshot data (Group 3) - independent processing
2. Date Range Management¶
- Start with small date ranges for testing
- Use
skip_existing_dates=TRUEfor snapshot tables - Use
skip_existing_dates=FALSEfor incremental tables
3. Error Handling¶
- Use
error_handling_strategy="log"for production - Monitor the log table for processing issues
- Set up alerts for failed executions
4. Performance Optimization¶
- Process related tables in the same execution group
- Use appropriate partitioning strategies
- Monitor resource usage during large imports
Troubleshooting¶
Common Issues¶
Issue: Files not found
Solution: Verify the synthetic data generation completed successfully
Check: tmp/spark/Files/synthetic_data/retail_oltp_incremental/
Issue: Referential integrity violations
Issue: Duplicate records in snapshot tables
Issue: Missing dates in incremental tables
Integration Benefits¶
- End-to-End Solution: Complete pipeline from data generation to import
- Date-Partitioned Processing: Efficient handling of time-series data
- Flexible Import Strategies: Different modes for different table types
- Referential Integrity: Ordered processing maintains data relationships
- Production Ready: Comprehensive error handling and monitoring
- Scalable: Handles large date ranges and data volumes efficiently
This integration provides a robust foundation for testing data pipelines with realistic, time-series synthetic data that maintains referential integrity and supports various import patterns.