Stream Processor
The Stream Processor collects timeseries data from multiple UNS sources, maintains state for variable mappings, and generates transformed messages using JavaScript expressions.
When to Use This Processor
Use Stream Processor when you need to combine data from multiple UNS sources. Example: Calculate pump efficiency from separate flow and temperature sensors.
Required input format: UMH timeseries (value
+ timestamp_ms
)
Configuration
processors:
- stream_processor:
mode: "timeseries" # Only "timeseries" supported
output_topic: "umh.v1.enterprise.site.area" # Base output topic
model:
name: "pump" # Model name for data contract
version: "v1" # Model version
sources: # Map variables to UNS topics
press: "umh.v1.enterprise.site.area._raw.pressure"
temp: "umh.v1.enterprise.site.area._raw.temperature"
run: "umh.v1.enterprise.site.area._raw.running"
mapping: # JavaScript expressions
# Static (no dependencies)
serialNumber: '"SN-P42-008"'
# Dynamic (with dependencies)
pressure: "press + 4.00001" # Depends on 'press'
temperatureC: "(temp - 32) * 5/9" # Depends on 'temp'
motor:
rpm: "press / 4" # Nested, depends on 'press'
status: 'run ? "active" : "inactive"' # Depends on 'run'
efficiency: "(press / temp) * 100" # Depends on both
mode
string
Yes
Only "timeseries"
supported
output_topic
string
Yes
Base UMH topic for output
model.name
string
Yes
Model name for data contract
model.version
string
Yes
Model version
sources
map
Yes
Variable name → UNS topic
mapping
object
No
JavaScript expressions
How It Works
Dependency Tracking
Static mappings: Expressions without variables evaluate on every message (e.g.,
"SN-12345"
)Dynamic mappings: Expressions with variables evaluate when those variables update (e.g.,
press * 2
)Multiple dependencies: Wait for all variables before evaluating (e.g.,
press / temp
)
JavaScript Expression Engine
The Stream Processor uses the same underlying goja JavaScript engine as the Node-RED JavaScript Processor, but with different configurations optimized for different use cases:
Stream Processor JavaScript:
Evaluates single expressions (not full functions)
Pre-compiles at startup
Auto-detects variable dependencies
Blocks dangerous operations (eval, require, process)
Comparison with Node-RED JavaScript Processor:
Node-RED JS runs full JavaScript functions with
return
statementsStream Processor evaluates expressions directly (no function wrapper needed)
Both use security sandboxing to prevent dangerous operations
Node-RED JS allows console.log() for debugging, Stream Processor blocks it
Stream Processor adds expression pre-compilation and dependency analysis
Security Features (shared with Node-RED JS):
Maximum call stack size limits to prevent stack overflow
Blocked dangerous globals (eval, Function, require, process, setTimeout)
Safe built-in objects available (Math, JSON, Date, String, Number)
Execution within a sandboxed environment
State Management
Stores latest value for each configured source
Values persist until processor restarts
Enables calculations across async data sources
Message Processing
Input Requirements
Format:
{"value": 25.5, "timestamp_ms": 1647890123456}
Metadata: Must have
umh_topic
fieldTopic Match: Must match a configured source
Processing Steps
Match
umh_topic
to configured sourcesStore value in state (e.g.,
press = 25.5
)Evaluate static mappings
Evaluate dynamic mappings with satisfied dependencies
Generate output messages
Output Topics
Format: <output_topic>.<data_contract>[.<virtual_path>].<name>
output_topic
: From configurationdata_contract
:_<model_name>_<model_version>
virtual_path
: For nested mappingsname
: Field name
Examples:
umh.v1.enterprise.site.area._pump_v1.pressure
umh.v1.enterprise.site.area._pump_v1.motor.rpm
(nested)
JavaScript Expressions
Allowed Operations
// Math
Math.PI * 2
Math.sqrt(press * press + temp * temp)
// Arithmetic
press * 2 + 1
(temp - 32) * 5/9
// Conditionals
press > 100 ? 'high' : 'normal'
run ? 'active' : 'inactive'
// Strings
"Pump-" + press
`Temperature: ${temp}°C`
// JSON
JSON.stringify({pressure: press, temp: temp})
// Date
Date.now()
Blocked Operations
eval()
- No dynamic codeFunction()
- No function constructorsrequire()
- No module importsprocess
- No process accesssetTimeout/setInterval
- No asyncconsole
- No console access
Examples
Sensor Calibration
stream_processor:
mode: "timeseries"
output_topic: "umh.v1.plant1.line5.cell3"
model:
name: "sensor_calibrated"
version: "v1"
sources:
raw_temp: "umh.v1.plant1.line5.cell3._raw.temperature"
raw_press: "umh.v1.plant1.line5.cell3._raw.pressure"
mapping:
temperature: "raw_temp + 0.5" # Offset correction
pressure: "raw_press * 1.02" # Scale correction
Machine Status
stream_processor:
mode: "timeseries"
output_topic: "umh.v1.factory.assembly"
model:
name: "machine_status"
version: "v2"
sources:
speed: "umh.v1.factory.assembly._raw.conveyor_speed"
temp: "umh.v1.factory.assembly._raw.motor_temp"
running: "umh.v1.factory.assembly._raw.is_running"
mapping:
status: |
running ? (temp > 80 ? "warning" : "running") : "stopped"
alert: "temp > 90"
efficiency: "running ? (speed / 100) * 100 : 0"
Boiler Efficiency
stream_processor:
mode: "timeseries"
output_topic: "umh.v1.site.boiler"
model:
name: "efficiency_calc"
version: "v1"
sources:
flow_in: "umh.v1.site.boiler._raw.input_flow"
flow_out: "umh.v1.site.boiler._raw.output_flow"
temp_in: "umh.v1.site.boiler._raw.input_temp"
temp_out: "umh.v1.site.boiler._raw.output_temp"
mapping:
flow_diff: "flow_out - flow_in"
temp_diff: "temp_out - temp_in"
thermal_efficiency: |
(flow_out * temp_out - flow_in * temp_in) / (flow_in * temp_in) * 100
heat_transfer: "(flow_out - flow_in) * (temp_out - temp_in) * 500"
Nested Metrics
stream_processor:
mode: "timeseries"
output_topic: "umh.v1.plant.packaging"
model:
name: "machine_metrics"
version: "v3"
sources:
speed: "umh.v1.plant.packaging._raw.belt_speed"
count: "umh.v1.plant.packaging._raw.package_count"
weight: "umh.v1.plant.packaging._raw.total_weight"
mapping:
performance:
speed: "speed"
throughput: "count / 60" # packages/min
quality:
avg_weight: "weight / count"
deviation: "Math.abs((weight / count) - 1.5)" # from target
Output topics:
umh.v1.plant.packaging._machine_metrics_v3.performance.speed
umh.v1.plant.packaging._machine_metrics_v3.performance.throughput
umh.v1.plant.packaging._machine_metrics_v3.quality.avg_weight
umh.v1.plant.packaging._machine_metrics_v3.quality.deviation
Processing Behavior
Static vs Dynamic Evaluation
Static (every message): "SN-12345"
, Date.now()
, Math.PI
Dynamic (when dependencies update): press * 2
, press / temp
, run ? "active" : "stopped"
Metadata Preservation
umh_topic
: Replaced with output topicAll other metadata: Passed through
Error Handling
Invalid JSON: "Invalid timeseries format from umh.v1.plant.sensor - expected {value, timestamp_ms}. Skipping message."
Missing Dependencies: "Cannot evaluate 'efficiency' - waiting for 'temp' (last value: 65.2 from 30s ago)"
JavaScript Error: "Failed to evaluate 'motor.rpm': Cannot divide by zero (press=0). Using last valid value: 1250."
Missing umh_topic: "Message missing umh_topic metadata. Check input configuration."
Troubleshooting
No Output Messages
Check umh_topic exists:
# Verify metadata in your pipeline
echo '{"value": 42}' | benthos -c your_config.yaml
# Should show: Message missing umh_topic metadata
Verify topic matches source:
sources:
press: "umh.v1.plant._raw.pressure" # Must match exactly
Missing Outputs
Check dependencies:
efficiency: "press / temp"
needs both variablesLook for "waiting for 'temp'" in logs
Test expressions:
// Test in browser console first
var press = 100, temp = 50;
console.log(press / temp); // Should be 2
Performance Issues
Simplify expressions:
press * 2
instead of complex mathCheck message rate: >1000 msg/s may need optimization
Monitor memory if storing many variables
Common Errors
"undefined is not a function"
Using blocked function
Use allowed operations only
"variable not defined"
Typo in variable name
Check sources configuration
"unexpected token"
JavaScript syntax error
Test in browser console
"Cannot read null"
Missing null check
Add: temp ? temp * 2 : 0
Last updated