Stream Processors 🚧
🚧 Roadmap Item - Stream processors implement data contracts by transforming raw data into structured, validated information according to your data models.
Stream processors are the runtime components that bring data models and contracts to life. They consume raw data from your industrial systems, apply transformations according to your data models, and output structured data that complies with your data contracts.
Overview
Stream processors bridge the gap between raw industrial data and structured business information:
Input: Raw sensor data, PLC values, device messages
Processing: Transformation, validation, contextualization
Output: Structured data compliant with data contracts
Storage: Automatic routing to configured sinks
Key Concepts
Contract Binding
Each stream processor implements exactly one data contract:
This binding ensures:
Output data matches the contract's data model structure
Data is routed to the contract's configured sinks
Validation occurs against the contract's schema
Retention policies are applied automatically
Location Hierarchy
This creates UNS topics like:
Data Sources
Stream processors subscribe to raw data topics:
Field Mapping
Transform raw values into model fields using JavaScript expressions:
Simple Example
Temperature Sensor
Transform Fahrenheit readings to Celsius:
Result:
Input:
1500°F
from PLCOutput:
815.6°C
in structured formatStorage: Auto-created TimescaleDB hypertable
Complex Example
Pump with Motor Sub-Model
Generated Topics:
Validation and Error Handling
Stream processors provide built-in validation:
Schema Validation
Output must match the bound data model structure
Field types validated against payload shapes
Constraint checking (min/max, allowed values)
Runtime Validation
Error Scenarios
Unknown fields: Processor fails to start
Missing variables: Processor fails to start
Expression errors: Message skipped, logged
Undefined expression results: Message skipped
Deployment and Management
Stream processors are deployed through:
YAML Configuration: Direct configuration files
Management Console: Web-based interface (recommended)
API: Programmatic deployment
Management Console Workflow
For detailed information on using the Management Console interface, see:
The console provides:
Visual data model builder
Interactive mapping configuration
Tag browser for source selection
Live validation and preview
One-click deployment
Operational Benefits
Automatic Infrastructure
Database tables: Auto-created from data models
Schema registry: Models registered automatically
Validation pipelines: Generated from contracts
Monitoring: Built-in performance metrics
Scalability
Multiple instances: Same model deployed across assets
Shared infrastructure: One contract, many processors
Resource efficiency: Optimized processing pipelines
Maintainability
Version management: Controlled model/contract evolution
Configuration as code: YAML-based, version-controlled
Centralized validation: Consistent across all processors
Best Practices
Naming
Descriptive names:
pump41_sp
,furnace_temp_sp
Include location: Reference the asset/location
Consistent suffix: Use
_sp
for stream processors
Source Management
Meaningful variable names:
press
,temp
, notvar1
,var2
Full topic paths: Avoid ambiguity with complete UNS paths
Document complex sources: Comment unusual data sources
Mapping Logic
Keep expressions simple: Complex logic should be in separate steps
Use appropriate precision: Match industrial data accuracy
Handle edge cases: Consider sensor failure scenarios
Document calculations: Comment unit conversions and formulas
Related Documentation
For complete implementation details, configuration syntax, and Management Console usage:
Additional references:
Last updated