6.1 KiB
name, description
| name | description |
|---|---|
| project-architecture | Detailed architecture, data flow, pipeline execution, dependencies, and system design for the Unify data migration project. Use when you need deep understanding of how components interact. |
Project Architecture
Comprehensive architecture documentation for the Unify data migration project.
Medallion Architecture Deep Dive
Bronze Layer
Purpose: Raw data ingestion from parquet files
Location: python_files/pipeline_operations/bronze_layer_deployment.py
Process:
- Lists parquet files from Azure ADLS Gen2 or local storage
- Creates bronze databases:
bronze_cms,bronze_fvms,bronze_nicherms - Reads parquet files and applies basic transformations
- Adds versioning, row hashes, and data source columns
Silver Layer
Purpose: Validated, standardized data organized by source
Location: python_files/silver/ (cms, fvms, nicherms subdirectories)
Process:
- Drops and recreates silver databases
- Recursively finds all Python files in
python_files/silver/ - Executes each silver transformation file in sorted order
- Uses threading for parallel execution (currently commented out)
Gold Layer
Purpose: Business-ready, aggregated analytical datasets
Location: python_files/gold/
Process:
- Creates business-ready analytical tables in
gold_data_modeldatabase - Executes transformations from
python_files/gold/ - Aggregates and joins data across multiple silver tables
Data Sources
FVMS (Family Violence Management System)
- Tables: 32 tables
- Key tables: incident, person, address, risk_assessment
- Purpose: Family violence incident tracking and management
CMS (Case Management System)
- Tables: 19 tables
- Key tables: offence_report, case_file, person, victim
- Purpose: Criminal offence investigation and case management
NicheRMS (Records Management System)
- Tables: 39 TBL_* tables
- Purpose: Legacy records management system
Azure Integration
Storage (ADLS Gen2)
- Containers:
bronze-layer,code-layer,legacy_ingestion - Authentication: Managed Identity (
AZURE_MANAGED_IDENTITY_CLIENT_ID) - Path Pattern:
abfss://container@account.dfs.core.windows.net/path
Key Services
- Key Vault:
AuE-DataMig-Dev-KVfor secret management - Synapse Workspace:
auedatamigdevsynws - Spark Pool:
dm8c64gb
Environment Detection Pattern
All processing scripts auto-detect their runtime environment:
if "/home/trusted-service-user" == env_vars["HOME"]:
# Azure Synapse Analytics production environment
import notebookutils.mssparkutils as mssparkutils
spark = SparkOptimiser.get_optimised_spark_session()
DATA_PATH_STRING = "abfss://code-layer@auedatamigdevlake.dfs.core.windows.net"
else:
# Local development environment using Docker Spark container
from python_files.utilities.local_spark_connection import sparkConnector
config = UtilityFunctions.get_settings_from_yaml("configuration.yaml")
connector = sparkConnector(...)
DATA_PATH_STRING = config["DATA_PATH_STRING"]
Core Utilities Architecture
SparkOptimiser
- Configured Spark session with optimized settings
- Handles driver memory, encryption, authentication
- Centralized session management
NotebookLogger
- Rich console logging with fallback to standard print
- Structured logging (info, warning, error, success)
- Graceful degradation when Rich library unavailable
TableUtilities
- DataFrame operations (deduplication, hashing, timestamp conversion)
add_row_hash(): Change detectionsave_as_table(): Standard table save with timestamp conversionclean_date_time_columns(): Intelligent timestamp parsingdrop_duplicates_simple/advanced(): Deduplication strategiesfilter_and_drop_column(): Remove duplicate flags
DAGMonitor
- Pipeline execution tracking and reporting
- Performance metrics and logging
Configuration Management
configuration.yaml
Central YAML configuration includes:
- Data Sources: FVMS, CMS, NicheRMS table lists (
*_IN_SCOPEvariables) - Azure Settings: Storage accounts, Key Vault, Synapse workspace, subscription IDs
- Spark Settings: Driver, encryption, authentication scheme
- Data Paths: Local (
/workspaces/data) vs Azure (abfss://) - Logging: LOG_LEVEL, LOG_ROTATION, LOG_RETENTION
- Nulls Handling: STRING_NULL_REPLACEMENT, NUMERIC_NULL_REPLACEMENT, TIMESTAMP_NULL_REPLACEMENT
Error Handling Strategy
- Decorator-Based:
@synapse_error_print_handlerfor consistent error handling - Loguru Integration: Structured logging with proper levels
- Graceful Degradation: Handle missing dependencies (Rich library fallback)
- Context Information: Include table/database names in all log messages
Local Data Filtering
TableUtilities.save_as_table() automatically filters to last N years when date_created column exists, controlled by NUMBER_OF_YEARS global variable in session_optimiser.py. Prevents full dataset processing in local development.
Testing Architecture
Test Structure
python_files/testing/: Unit and integration testsmedallion_testing.py: Full pipeline validationbronze_layer_validation.py: Bronze layer testsingestion_layer_validation.py: Ingestion tests
Testing Strategy
- pytest integration with PySpark environments
- Quality gates: syntax validation and linting before completion
- Integration tests for full medallion flow
DuckDB Integration
After running pipelines, build local DuckDB database for fast SQL analysis:
- File:
/workspaces/data/warehouse.duckdb - Command:
make build_duckdb - Purpose: Fast local queries without Azure connection
- Contains: All bronze, silver, gold layer tables
Recent Architectural Changes
Path Migration
- Standardized all paths to use
unify_2_1_dm_synapse_env_d10 - Improved portability and environment consistency
- 12 files updated across utilities, notebooks, configurations
Code Cleanup
- Removed unused utilities:
file_executor.py,file_finder.py - Reduced codebase complexity
- Regular cleanup pattern for maintainability