241 lines
7.7 KiB
Python
241 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Create a Direct Lake semantic model from lakehouse tables.
|
|
|
|
Usage:
|
|
python3 create_direct_lake_model.py "src.Workspace/LH.Lakehouse" "dest.Workspace/Model.SemanticModel" -t schema.table
|
|
|
|
Requirements:
|
|
- fab CLI installed and authenticated
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
import uuid
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
|
|
def run_fab(args: list[str]) -> str:
|
|
"""Run fab command and return output."""
|
|
result = subprocess.run(["fab"] + args, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
print(f"fab error: {result.stderr}", file=sys.stderr)
|
|
return result.stdout.strip()
|
|
|
|
|
|
def get_lakehouse_sql_endpoint(workspace: str, lakehouse: str) -> dict:
|
|
"""Get lakehouse SQL endpoint info."""
|
|
path = f"{workspace}/{lakehouse}"
|
|
output = run_fab(["get", path, "-q", "properties.sqlEndpointProperties"])
|
|
return json.loads(output)
|
|
|
|
|
|
def get_table_schema(workspace: str, lakehouse: str, schema: str, table: str) -> list:
|
|
"""Get table schema from lakehouse (parses text output)."""
|
|
path = f"{workspace}/{lakehouse}/Tables/{schema}/{table}"
|
|
output = run_fab(["table", "schema", path])
|
|
|
|
# Parse text table format:
|
|
# name type
|
|
# ------------------------------------------
|
|
# col_name col_type
|
|
columns = []
|
|
in_data = False
|
|
for line in output.split("\n"):
|
|
line = line.strip()
|
|
if line.startswith("---"):
|
|
in_data = True
|
|
continue
|
|
if in_data and line:
|
|
parts = line.split()
|
|
if len(parts) >= 2:
|
|
columns.append({"name": parts[0], "type": parts[1]})
|
|
return columns
|
|
|
|
|
|
def tmdl_data_type(sql_type: str) -> str:
|
|
"""Convert SQL type to TMDL data type."""
|
|
sql_type = sql_type.lower()
|
|
if 'int' in sql_type:
|
|
return 'int64'
|
|
elif 'float' in sql_type or 'double' in sql_type or 'decimal' in sql_type:
|
|
return 'double'
|
|
elif 'bool' in sql_type or 'bit' in sql_type:
|
|
return 'boolean'
|
|
elif 'date' in sql_type or 'time' in sql_type:
|
|
return 'dateTime'
|
|
else:
|
|
return 'string'
|
|
|
|
|
|
def create_model_tmdl(model_name: str, table_name: str) -> str:
|
|
"""Create model.tmdl content."""
|
|
return f"""model '{model_name}'
|
|
\tculture: en-US
|
|
\tdefaultPowerBIDataSourceVersion: powerBI_V3
|
|
|
|
ref table '{table_name}'
|
|
"""
|
|
|
|
|
|
def create_expressions_tmdl(connection_string: str, endpoint_id: str) -> str:
|
|
"""Create expressions.tmdl content."""
|
|
return f"""expression DatabaseQuery =
|
|
\t\tlet
|
|
\t\t\tdatabase = Sql.Database("{connection_string}", "{endpoint_id}")
|
|
\t\tin
|
|
\t\t\tdatabase
|
|
\tlineageTag: {uuid.uuid4()}
|
|
"""
|
|
|
|
|
|
def create_table_tmdl(table_name: str, schema_name: str, columns: list) -> str:
|
|
"""Create table.tmdl content."""
|
|
lines = [
|
|
f"table '{table_name}'",
|
|
f"\tlineageTag: {uuid.uuid4()}",
|
|
f"\tsourceLineageTag: [{schema_name}].[{table_name}]",
|
|
""
|
|
]
|
|
|
|
# Add columns
|
|
for col in columns:
|
|
col_name = col['name']
|
|
data_type = tmdl_data_type(col['type'])
|
|
lines.extend([
|
|
f"\tcolumn '{col_name}'",
|
|
f"\t\tdataType: {data_type}",
|
|
f"\t\tlineageTag: {uuid.uuid4()}",
|
|
f"\t\tsourceLineageTag: {col_name}",
|
|
f"\t\tsummarizeBy: none",
|
|
f"\t\tsourceColumn: {col_name}",
|
|
"",
|
|
f"\t\tannotation SummarizationSetBy = Automatic",
|
|
""
|
|
])
|
|
|
|
# Add partition
|
|
lines.extend([
|
|
f"\tpartition '{table_name}' = entity",
|
|
f"\t\tmode: directLake",
|
|
f"\t\tsource",
|
|
f"\t\t\tentityName: {table_name}",
|
|
f"\t\t\tschemaName: {schema_name}",
|
|
f"\t\t\texpressionSource: DatabaseQuery",
|
|
""
|
|
])
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def create_database_tmdl() -> str:
|
|
"""Create database.tmdl content."""
|
|
return f"""database '{uuid.uuid4()}'
|
|
"""
|
|
|
|
|
|
def create_pbism() -> str:
|
|
"""Create definition.pbism content."""
|
|
return json.dumps({
|
|
"$schema": "https://developer.microsoft.com/json-schemas/fabric/item/semanticModel/definitionProperties/1.0.0/schema.json",
|
|
"version": "4.0",
|
|
"settings": {}
|
|
}, indent=2)
|
|
|
|
|
|
def create_platform(model_name: str) -> str:
|
|
"""Create .platform content."""
|
|
return json.dumps({
|
|
"$schema": "https://developer.microsoft.com/json-schemas/fabric/gitIntegration/platformProperties/2.0.0/schema.json",
|
|
"metadata": {
|
|
"type": "SemanticModel",
|
|
"displayName": model_name
|
|
},
|
|
"config": {
|
|
"version": "2.0",
|
|
"logicalId": str(uuid.uuid4())
|
|
}
|
|
}, indent=2)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Create Direct Lake semantic model")
|
|
parser.add_argument("source", help="Source: Workspace.Workspace/Lakehouse.Lakehouse")
|
|
parser.add_argument("dest", help="Destination: Workspace.Workspace/Model.SemanticModel")
|
|
parser.add_argument("-t", "--table", required=True, help="Table: schema.table_name")
|
|
args = parser.parse_args()
|
|
|
|
# Parse source
|
|
src_parts = args.source.split("/")
|
|
src_workspace = src_parts[0]
|
|
src_lakehouse = src_parts[1].replace(".Lakehouse", "") + ".Lakehouse"
|
|
|
|
# Parse destination
|
|
dest_parts = args.dest.split("/")
|
|
dest_workspace = dest_parts[0]
|
|
model_name = dest_parts[1].replace(".SemanticModel", "")
|
|
|
|
# Parse table
|
|
table_parts = args.table.split(".")
|
|
schema_name = table_parts[0]
|
|
table_name = table_parts[1]
|
|
|
|
print(f"Source: {src_workspace}/{src_lakehouse}")
|
|
print(f"Table: {schema_name}.{table_name}")
|
|
print(f"Dest: {dest_workspace}/{model_name}.SemanticModel")
|
|
|
|
# Get SQL endpoint
|
|
print("\nGetting SQL endpoint...")
|
|
endpoint = get_lakehouse_sql_endpoint(src_workspace, src_lakehouse)
|
|
print(f" Connection: {endpoint['connectionString']}")
|
|
print(f" ID: {endpoint['id']}")
|
|
|
|
# Get table schema
|
|
print(f"\nGetting table schema for {schema_name}.{table_name}...")
|
|
columns = get_table_schema(src_workspace, src_lakehouse, schema_name, table_name)
|
|
print(f" Found {len(columns)} columns")
|
|
|
|
# Create temp directory with TMDL
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
model_dir = Path(tmpdir) / f"{model_name}.SemanticModel"
|
|
def_dir = model_dir / "definition"
|
|
tables_dir = def_dir / "tables"
|
|
|
|
model_dir.mkdir()
|
|
def_dir.mkdir()
|
|
tables_dir.mkdir()
|
|
|
|
# Write files
|
|
print("\nCreating TMDL files...")
|
|
|
|
(model_dir / ".platform").write_text(create_platform(model_name))
|
|
(model_dir / "definition.pbism").write_text(create_pbism())
|
|
(def_dir / "model.tmdl").write_text(create_model_tmdl(model_name, table_name))
|
|
(def_dir / "database.tmdl").write_text(create_database_tmdl())
|
|
(def_dir / "expressions.tmdl").write_text(
|
|
create_expressions_tmdl(endpoint['connectionString'], endpoint['id'])
|
|
)
|
|
(tables_dir / f"{table_name}.tmdl").write_text(
|
|
create_table_tmdl(table_name, schema_name, columns)
|
|
)
|
|
|
|
print(f" Created: {model_dir}")
|
|
for f in model_dir.rglob("*"):
|
|
if f.is_file():
|
|
print(f" {f.relative_to(model_dir)}")
|
|
|
|
# Import to Fabric
|
|
print(f"\nImporting to {dest_workspace}...")
|
|
dest_path = f"{dest_workspace}/{model_name}.SemanticModel"
|
|
result = run_fab(["import", dest_path, "-i", str(model_dir), "-f"])
|
|
print(result)
|
|
|
|
print("\nDone!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|