🚧 Roadmap Item - Stream processors transform messages already inside the Unified Namespace using the unified data-modelling system.
Stream processors implement the runtime execution of , providing real-time data transformation and contextualization within the UNS. This document covers the technical implementation details, configuration syntax, and Management Console interface.
Overview
Stream processors consume messages from the UNS, apply transformations according to data models, and republish structured data that complies with data contracts. They bridge raw industrial data and business-ready information.
Key Features:
Real-time data contextualization - Transform raw sensor data into business metrics
Data aggregation - Combine multiple data streams into unified models
Schema enforcement - Validate against data contracts automatically
Stream joins - Correlate data across different devices/systems
🚧 Roadmap Item - The Management Console provides a visual interface for creating and managing stream processors.
Console Workflow
The Management Console follows familiar UMH patterns for configuration:
1. Navigate to Stream Processors
Data Flows > Stream Processors > + Add Stream Processor
2. Single-Page Configuration Panel
Stream Processor (pump42_sp) [Deploy]
────────────────────────────────────────────────────────────────
1) General
Name pump42_sp
Desired State Active
Contract _pump:v1 â–¼ (model implied: Pump:v1)
2) Location
level0: corpA level1: plant-A
level2: line-4 level3: pump42 level4: (blank)
3) Sources 📂 Tag Browser
────────────────────────────────────────────────────────────────
Source Topics
────────────────────────────────────────────────────────────────
press umh.v1.corpA…deviceX._raw.press
tF …deviceX._raw.tempF
r …deviceX._raw.run
c …pump42._raw.motor_current
s …pump42._raw.motor_speed
l1 …pump42._raw.power_l1
l2 …pump42._raw.power_l2
────────────────────────────────────────────────────────────────
4) Apply Model
────────────────────────────────────────────────────────────────
Target Field JS Expression
────────────────────────────────────────────────────────────────
pressure press
temperature (tF-32)*5/9
running r
motor.current c
motor.rpm s
total_power l1+l2
serial_number 'SN-P42-008'
────────────────────────────────────────────────────────────────
[+ Add Row] [YAML Preview â–¼]
3. Interactive Features
Tag Browser Integration:
Click 📂 to open tag browser
Select from live UNS topics
Auto-suggest variable names
Full topic path verification
Live Validation:
Expression syntax checking
Model field validation
Real-time error highlighting
Preview generated topics
YAML Preview:
Show generated streamprocessors: block
Copy for version control
Validate against schema
4. Deployment Result
After clicking Deploy:
FSM validates configuration
Benthos container starts processing
Status updates to 🟢 Running
Generated topics appear in Tag Browser
Console Integration
Data Flows Overview:
Data Flows (Stream Processors)
────────────────────────────────────────────────────────────────
Name Contract State TPS Location
────────────────────────────────────────────────────────────────
pump41_sp _pump:v1 🟢Run 1.2 corpA.plant-A.line-4.pump41
pump42_sp _pump:v1 🟢Run 1.1 corpA.plant-A.line-4.pump42
furnaceTemp _temp:v1 🟢Run 0.8 corpA.plant-A.line-4.furnace1
Tag Browser Integration:
umh.v1
â”” corpA.plant-A.line-4.pump42
â”” _pump
├ pressure
├ temperature
├ running
├ diagnostics
│ └ vibration
├ motor
│ ├ current
│ └ rpm
├ total_power
â”” serial_number
Live Data Verification:
Click any field to see live values
Right sidebar shows: "Stored in TimescaleDB table pump_v1 • Retention: 365d"
SQL sample queries for data access
Why This Meets All Requirements
The unified data-modelling approach addresses key manufacturing requirements:
1. Generic Hierarchical Support
Built-in level0-4 location hierarchy
Automatic UNS topic generation
Hierarchical database storage
2. Single YAML Dialect
Unified configuration language
Pass-through and derived transformations
No mode-switching complexity
3. Folders & Sub-Models Unified
Consistent structure handling
Dot-notation field access
Reusable component modeling
4. Per-Field Constraints
Built-in validation (unit, min, max)
Type enforcement
Enumerated value support
5. Static Constants Support
Empty source expressions
Metadata injection
Configuration-driven constants
6. Simplified Event Model
Default "evaluate on every update"
No complex event handling
Predictable processing behavior
7. Succinct Contracts
YAML flow-style compatible
Clear model binding
Explicit sink configuration
8. Full UNS Topic Paths
No ambiguous references
Complete topic specification
Clear data lineage
Best Practices
Configuration Management
Version control: Store YAML configurations in Git
Environment-specific: Use environment variables for deployment-specific values
Validation: Test configurations in development before production
Performance Optimization
Minimize sources: Only subscribe to needed topics
Efficient expressions: Keep JavaScript simple and fast
Batch similar processors: Group related transformations
Monitoring and Maintenance
Monitor TPS: Track messages per second for performance
Error logging: Set up alerting for processing errors
Schema evolution: Plan model updates carefully
Security
Topic authorization: Restrict access to sensitive source topics
Expression validation: Review JavaScript expressions for security
Schema validation: Use schema registry for data integrity
Related Documentation
For conceptual understanding and model design:
For integration and context:
Stream processors define their position in the hierarchical organization (commonly based on ISA-95 but adaptable to KKS or custom naming standards). For complete hierarchy structure and rules, see .