Stream Processors
Stream processors transform raw data into structured, validated information according to your data models using reusable template configurations.
Stream processors are the runtime components that bring data models to life. They consume raw data from your industrial systems, apply transformations according to your data models, and output structured data. Stream processors work directly with data models through templates, not through data contracts.
Overview
Stream processors bridge the gap between raw industrial data and structured business information:
Input: Raw sensor data, PLC values, device messages
Processing: Transformation, validation, contextualization according to data models
Output: Structured data compliant with data models
Templates: Reusable configurations with variable substitution
Auto-validation: If data contracts exist for the model, output is automatically validated and routed to contract bridges
# Template definition
templates:
streamProcessors:
motor_template:
model:
name: pump
version: v1
sources: # alias → raw topic
press: "${{ .location_path }}._raw.${{ .abc }}"
tF: "${{ .location_path }}._raw.tempF"
r: "${{ .location_path }}._raw.run"
mapping: # field → JS / constant / alias
pressure: "press"
temperature: "(tF-32)*5/9" # JavaScript expressions for data transformation
running: "r"
motor:
rpm: "press"
serialNumber: "${{ .sn }}"
# Stream processor instances
streamprocessors:
- name: motor_assembly
_templateRef: "motor_template"
location:
0: corpA
1: plant-A
variables:
abc: "assembly"
sn: "SN-P42-008"
- name: motor_qualitycheck
_templateRef: "motor_template"
variables:
abc: "qualitycheck"
sn: "SN-P42-213"
Key Concepts
Template Reference
Each stream processor references a reusable template:
streamprocessors:
- name: pump41_sp
_templateRef: "pump_template"
This template reference ensures:
Output data matches the template's data model structure
Validation occurs against the model's schema
Template variables provide instance-specific configuration
Automatic integration: If data contracts exist for the same model, output is automatically validated and routed to contract bridges
Location Hierarchy
Stream processors define their position in the hierarchical organization (commonly based on ISA-95 but adaptable to KKS or custom naming standards). For complete hierarchy structure and rules, see Topic Convention.
location:
0: corpA # Enterprise (mandatory)
1: plant-A # Site/Region (optional)
2: line-4 # Area/Zone (optional)
3: pump41 # Work Unit (optional)
4: motor # Work Center (optional)
This creates UNS topics like:
umh.v1.corpA.plant-A.line-4.pump41._pump.pressure
Data Sources
Stream processors subscribe to raw data topics:
sources:
press: "umh.v1.corpA.plant-A.line-4.pump41.deviceX._raw.press"
temp: "umh.v1.corpA.plant-A.line-4.pump41.deviceX._raw.tempF"
power1: "umh.v1.corpA.plant-A.line-4.pump41._raw.power_l1"
power2: "umh.v1.corpA.plant-A.line-4.pump41._raw.power_l2"
Field Mapping
Transform raw values into model fields using JavaScript expressions:
mapping:
pressure: "press" # Direct pass-through
temperature: "(temp - 32) * 5 / 9" # Fahrenheit to Celsius
total_power: "power1 + power2" # Derived calculation
serialNumber: "'SN-P41-007'" # Static metadata
Simple Example
Temperature Sensor
Transform Fahrenheit readings to Celsius:
# Data model (from data-models.md)
datamodels:
temperature:
description: "Temperature sensor model"
versions:
v1:
structure:
temperatureInC:
_payloadshape: timeseries-number
# Data contract (from data-contracts.md)
datacontracts:
- name: _temperature_v1
model:
name: temperature
version: v1
default_bridges:
- type: timescaledb
retention_in_days: 365
# Stream processor implementation
streamprocessors:
- name: furnaceTemp_sp
_templateRef: "temperature_template"
location:
0: corpA
1: plant-A
2: line-4
3: furnace1
variables:
temp_sensor: "temperature_F"
sn: "SN-F1-001"
Result:
Input:
1500°F
from PLCOutput:
815.6°C
in structured formatStorage: Auto-created TimescaleDB hypertable
Complex Example
Pump with Motor Sub-Model
# Stream processor for pump with motor sub-model
streamprocessors:
- name: pump41_sp
_templateRef: "pump_template"
location:
0: corpA
1: plant-A
2: line-4
3: pump41
variables:
abc: "deviceX"
sn: "SN-P41-007"
Generated Topics:
umh.v1.corpA.plant-A.line-4.pump41._pump.pressure
umh.v1.corpA.plant-A.line-4.pump41._pump.temperature
umh.v1.corpA.plant-A.line-4.pump41._pump.running
umh.v1.corpA.plant-A.line-4.pump41._pump.diagnostics.vibration
umh.v1.corpA.plant-A.line-4.pump41._pump.motor.current
umh.v1.corpA.plant-A.line-4.pump41._pump.motor.rpm
umh.v1.corpA.plant-A.line-4.pump41._pump.total_power
umh.v1.corpA.plant-A.line-4.pump41._pump.serial_number
Validation and Error Handling
Stream processors provide built-in validation:
Schema Validation
Output must match the referenced data model structure
Field types validated against payload shapes
Constraint checking (min/max, allowed values)
Additional validation: If data contracts exist for the model, output is automatically validated against contract requirements
Runtime Validation
# Invalid mapping - caught at startup
mapping:
invalid_field: "someVar" # Error: not defined in model
# Runtime error handling
mapping:
temperature: "temp / 0" # Expression error: skips message
Error Scenarios
Unknown fields: Processor fails to start
Missing variables: Processor fails to start
Expression errors: Message skipped, logged
Undefined expression results: Message skipped
Deployment and Management
Stream processors are deployed through:
YAML Configuration: Direct configuration files
Management Console: Web-based interface (recommended)
API: Programmatic deployment
Management Console Workflow
For detailed information on using the Management Console interface, see:
Stream Processor Implementation → Management Console
The console provides:
Visual data model builder
Interactive mapping configuration
Tag browser for source selection
Live validation and preview
One-click deployment
Operational Benefits
Automatic Infrastructure
Database tables: Auto-created from data models
Schema registry: Models registered automatically
Validation pipelines: Generated from contracts
Monitoring: Built-in performance metrics
Scalability
Multiple instances: Same model deployed across assets
Shared infrastructure: One contract, many processors
Resource efficiency: Optimized processing pipelines
Maintainability
Version management: Controlled model/contract evolution
Configuration as code: YAML-based, version-controlled
Centralized validation: Consistent across all processors
Best Practices
Naming
Descriptive names:
pump41_sp
,furnace_temp_sp
Include location: Reference the asset/location
Consistent suffix: Use
_sp
for stream processors
Source Management
Meaningful variable names:
press
,temp
, notvar1
,var2
Full topic paths: Avoid ambiguity with complete UNS paths
Document complex sources: Comment unusual data sources
Mapping Logic
Keep expressions simple: Complex logic should be in separate steps
Use appropriate precision: Match industrial data accuracy
Handle edge cases: Consider sensor failure scenarios
Document calculations: Comment unit conversions and formulas
Related Documentation
For complete implementation details, configuration syntax, and Management Console usage:
Data Flows → Stream Processor Implementation
Additional references:
Data Models - Defining data structures
Data Contracts - Storage and retention policies
Unified Namespace - Topic structure and conventions
Last updated