12 KiB
12 KiB
Subengines Development Guide
Complete guide for developing operators with subengines in SAP Data Intelligence.
Table of Contents
- Overview
- Subengine Architecture
- Python Subengine
- Node.js Subengine
- C++ Subengine
- FlowAgent Subengine
- Performance Optimization
- Best Practices
Overview
Subengines enable operators to run on different runtimes within SAP Data Intelligence.
Supported Subengines:
- Python 3.9: Data science and ML workflows
- Node.js: JavaScript-based processing
- C++: High-performance native operators
- ABAP: ABAP Pipeline Engine (source systems)
- FlowAgent: Database connectivity
Key Benefits:
- Language flexibility
- Performance optimization
- Specialized libraries
- Same-engine process sharing
Subengine Architecture
Execution Model
Main Engine (Coordinator)
├── Python Subengine Process
│ ├── Python Operator 1
│ └── Python Operator 2 (same process)
│
├── Node.js Subengine Process
│ └── JavaScript Operator
│
└── Native Engine Process
└── Native Operator
Communication
Same Engine Communication:
- In-memory data transfer
- No serialization overhead
- Optimal performance
Cross-Engine Communication:
- Serialization required
- Inter-process communication
- Higher latency
Engine Selection
The optimizer selects engines to minimize communication:
Graph: [Python Op A] -> [Python Op B] -> [JS Op C]
Execution:
- Ops A and B: Same Python process
- Op C: Separate Node.js process
- Data serialized between B and C
Python Subengine
The most commonly used subengine for data processing and ML.
Python 3.9 Operator (Gen2)
Creating Python Operator:
# Operator script
def on_input(msg_id, header, body):
"""Process incoming message."""
import pandas as pd
# Process data
df = pd.DataFrame(body)
result = df.groupby('category').sum()
# Send output
api.send("output", api.Message(result.to_dict()))
# Register callback
api.set_port_callback("input", on_input)
API Reference
Message Handling:
# Set port callback
api.set_port_callback("port_name", callback_function)
# Send message
api.send("port_name", api.Message(body, attributes={}))
# Create message with attributes
msg = api.Message(
body={"data": values},
attributes={"source": "python"}
)
Configuration Access:
# Get configuration parameter
value = api.config.param_name
# Get with default
value = getattr(api.config, 'param_name', default_value)
Logging:
api.logger.info("Processing started")
api.logger.warning("Potential issue detected")
api.logger.error("Error occurred")
State Management
# Initialize state
state = {"counter": 0, "cache": {}}
def on_input(msg_id, header, body):
global state
state["counter"] += 1
# Process with state
if body["id"] in state["cache"]:
result = state["cache"][body["id"]]
else:
result = process(body)
state["cache"][body["id"]] = result
api.send("output", api.Message(result))
api.set_port_callback("input", on_input)
Using External Libraries
# Pre-installed libraries available
import pandas as pd
import numpy as np
import sklearn
import tensorflow
import torch
# Custom libraries via Dockerfile
# (see Creating Dockerfiles section)
Managed Connections
Access database connections from Python:
def on_input(msg_id, header, body):
# Get connection
conn = api.get_connection("HANA_CONNECTION")
# Execute query
cursor = conn.cursor()
cursor.execute("SELECT * FROM TABLE")
rows = cursor.fetchall()
api.send("output", api.Message(rows))
Node.js Subengine
JavaScript-based operator development.
Creating Node.js Operator
// Operator script using @sap/vflow-sub-node-sdk
const { Operator } = require("@sap/vflow-sub-node-sdk");
// Get operator instance
const operator = Operator.getInstance();
// Set up input port handler
operator.getInPort("input").onMessage((ctx) => {
// Process message
const data = ctx.body;
const result = processData(data);
// Send to output port
operator.getOutPort("output").send(result);
});
function processData(data) {
// Transform data
return data.map((item) => {
return {
id: item.id,
value: item.value * 2
};
});
}
API Reference
Message Handling:
const { Operator } = require("@sap/vflow-sub-node-sdk");
const operator = Operator.getInstance();
// Set port callback
operator.getInPort("port_name").onMessage((ctx) => {
// Access message body via ctx.body
const data = ctx.body;
// Process message
});
// Send to output port
operator.getOutPort("output").send(data);
// Send to specific named port
operator.getOutPort("port_name").send(data);
Configuration:
const operator = Operator.getInstance();
// Access config
const paramValue = operator.config.paramName;
Logging:
const operator = Operator.getInstance();
// Use operator logger
operator.logger.info("Information message");
operator.logger.debug("Debug message");
operator.logger.error("Error message");
Node.js Data Types
| SAP DI Type | Node.js Type |
|---|---|
| string | String |
| int32 | Number |
| int64 | BigInt |
| float32 | Number |
| float64 | Number |
| blob | Buffer |
| message | Object |
Safe Integer Handling
// Large integers may lose precision
// Use BigInt for int64 values
const { Operator } = require("@sap/vflow-sub-node-sdk");
const operator = Operator.getInstance();
operator.getInPort("input").onMessage((ctx) => {
const value = BigInt(ctx.body.largeNumber);
// Process safely
});
Node Modules
// Built-in modules available
var fs = require('fs');
var path = require('path');
var https = require('https');
// Custom modules via Dockerfile
C++ Subengine
High-performance native operator development.
Getting Started
- Install C++ SDK
- Create operator class
- Implement interfaces
- Compile and upload
Operator Implementation
// custom_operator.h
#include "sdi/subengine.h"
#include "sdi/operator.h"
class CustomOperator : public sdi::BaseOperator {
public:
CustomOperator(const sdi::OperatorConfig& config);
void init() override;
void start() override;
void shutdown() override;
private:
void onInput(const sdi::Message& msg);
std::string m_parameter;
};
// custom_operator.cpp
#include "custom_operator.h"
CustomOperator::CustomOperator(const sdi::OperatorConfig& config)
: BaseOperator(config) {
m_parameter = config.get<std::string>("parameter");
}
void CustomOperator::init() {
registerPortCallback("input",
[this](const sdi::Message& msg) { onInput(msg); });
}
void CustomOperator::start() {
LOG_INFO("Operator started");
}
void CustomOperator::onInput(const sdi::Message& msg) {
// Process message
auto data = msg.body<std::vector<int>>();
// Transform
for (auto& val : data) {
val *= 2;
}
// Send output
send("output", sdi::Message(data));
}
void CustomOperator::shutdown() {
LOG_INFO("Operator shutdown");
}
Building and Uploading
# Build
mkdir build && cd build
cmake ..
make
# Package
tar -czvf operator.tar.gz libcustom_operator.so manifest.json
# Upload via System Management
FlowAgent Subengine
Database connectivity subengine.
Purpose
FlowAgent provides:
- Database connection pooling
- Efficient data transfer
- Native database drivers
Supported Databases
- SAP HANA
- SAP IQ
- Microsoft SQL Server
- Oracle
- PostgreSQL
- MySQL
- DB2
FlowAgent Operators
Pre-built operators using FlowAgent:
- SQL Consumer: Execute SELECT queries
- SQL Executor: Execute DDL/DML
- Table Consumer: Read tables
- Table Producer: Write tables
Configuration
Connection: Database Connection ID
SQL Statement: SELECT * FROM SALES WHERE YEAR = 2024
Batch Size: 10000
Fetch Size: 5000
Performance Optimization
Minimize Cross-Engine Communication
Bad:
[Python A] -> [JS B] -> [Python C] -> [JS D]
(4 serialization points)
Good:
[Python A] -> [Python C] -> [JS B] -> [JS D]
(2 serialization points)
Batch Processing
# Process in batches
def on_input(msg_id, header, body):
batch = []
batch.append(body)
if len(batch) >= 1000:
process_batch(batch)
batch.clear()
Memory Management
# Stream large data
def on_input(msg_id, header, body):
import pandas as pd
# Process in chunks
for chunk in pd.read_csv(body, chunksize=10000):
result = process(chunk)
api.send("output", api.Message(result))
Connection Pooling
# Reuse connections
_connection = None
def get_connection():
global _connection
if _connection is None:
_connection = api.get_connection("DB_CONN")
return _connection
Best Practices
Operator Design
- Single Responsibility: One task per operator
- Stateless When Possible: Easier recovery
- Handle Errors Gracefully: Try-catch with logging
- Clean Up Resources: Close connections, files
Code Organization
# Good: Modular code
def validate_input(data):
"""Validate input data."""
if not data:
raise ValueError("Empty input")
return True
def transform_data(data):
"""Transform data."""
# Transformation logic
return result
def on_input(msg_id, header, body):
try:
validate_input(body)
result = transform_data(body)
api.send("output", api.Message(result))
except Exception as e:
api.logger.error(f"Error: {e}")
api.send("error", api.Message({"error": str(e)}))
Testing
- Unit Test Logic: Test functions independently
- Integration Test: Test with sample data
- Performance Test: Verify throughput
Documentation
def on_input(msg_id, header, body):
"""
Process incoming sales data.
Input Schema:
- order_id: string
- amount: float
- date: string (YYYY-MM-DD)
Output Schema:
- order_id: string
- processed_amount: float
- quarter: int
"""
# Implementation
Documentation Links
- Subengines Overview: https://github.com/SAP-docs/sap-hana-cloud-data-intelligence/tree/main/docs/modelingguide/subengines
- Python Subengine: https://github.com/SAP-docs/sap-hana-cloud-data-intelligence/blob/main/docs/modelingguide/subengines/create-operators-with-the-python-subengine-7e8f7d2.md
- Node.js Subengine: https://github.com/SAP-docs/sap-hana-cloud-data-intelligence/tree/main/docs/modelingguide/subengines (Node.js SDK files)
- C++ Subengine: https://github.com/SAP-docs/sap-hana-cloud-data-intelligence/blob/main/docs/modelingguide/subengines/working-with-the-c-subengine-to-create-operators-d8f634c.md
Last Updated: 2025-11-22