Stream Processors

Stream processors transform messages already inside the Unified Namespace using the unified data-modelling system.

Stream processors implement the runtime execution of data models and contracts, providing real-time data transformation and contextualization within the UNS. This document covers the technical implementation details, configuration syntax, and Management Console interface.

Overview

Stream processors consume messages from the UNS, apply transformations according to data models, and republish structured data that complies with data contracts. They bridge raw industrial data and business-ready information.

Key Features:

  • Real-time data contextualization - Transform raw sensor data into business metrics

  • Data aggregation - Combine multiple data streams into unified models

  • Schema enforcement - Validate against data contracts automatically

  • Stream joins - Correlate data across different devices/systems

  • JavaScript expressions - Flexible transformation logic

Architecture Integration

Stream processors integrate with the unified data-modelling system through templates:

# Complete configuration example
payloadshapes:
  timeseries-number:
    fields:
      timestamp_ms:
        _type: number
      value:
        _type: number

datamodels:
  pump:
    description: "Pump with motor sub-model"
    versions:
      v1:
        structure:
          pressure:
            _payloadshape: timeseries-number
          motor:
            _refModel:
              name: motor
              version: v1

datacontracts:
  - name: _pump_v1
    model:
      name: pump
      version: v1
    default_bridges:
      - type: timescaledb
        retention_in_days: 365

templates:
  streamProcessors:
    pump_template:
      model:
        name: pump
        version: v1
      sources:
        press: "${{ .location_path }}._raw.${{ .pressure_sensor }}"
        temp: "${{ .location_path }}._raw.tempF"
      mapping:
        pressure: "press"
        temperature: "(temp-32)*5/9"
        serialNumber: "${{ .sn }}"

streamprocessors:
  - name: pump41_sp
    _templateRef: "pump_template"
    location:
      0: corpA
      1: plant-A
      2: line-4
      3: pump41
    variables:
      pressure_sensor: "pressure"
      sn: "SN-P41-007"

All components are registered in Redpanda Schema Registry at boot, ensuring the UNS output plugin rejects non-compliant messages.

Configuration Syntax

Template Definition

Templates provide reusable configurations with variable substitution:

templates:
  streamProcessors:
    template_name:
      model:
        name: model_name
        version: v1
      sources:
        var_name: "${{ .location_path }}._raw.${{ .variable_name }}"
      mapping:
        model_field: "javascript_expression"
        folder:
          sub_field: "javascript_expression"
        metadata_field: "${{ .variable_name }}"

Stream Processor Definition

streamprocessors:
  - name: processor_name
    _templateRef: "template_name"     # Required: references template
    location:                        # Hierarchical organization (ISA-95, KKS, or custom)
      0: enterprise                  # Level 0 (mandatory)
      1: site                        # Level 1 (optional)
      2: area                        # Level 2 (optional)
      3: work_unit                   # Level 3 (optional)
      4: work_center                 # Level 4 (optional)
    variables:                       # Template variable substitution
      variable_name: "value"

Location Hierarchy

Stream processors define their position in the hierarchical organization (commonly based on ISA-95 but adaptable to KKS or custom naming standards). For complete hierarchy structure and rules, see Topic Convention.

location:
  0: corpA        # Enterprise (mandatory)
  1: plant-A      # Site/Region (optional)  
  2: line-4       # Area/Zone (optional)
  3: pump42       # Work Unit (optional)
  4: motor1       # Work Center (optional)

This creates UNS topics following the standard convention:

umh.v1.{0}.{1}.{2}.{3}[.{4}].{contract}.{field_path}

Template Variables

Templates support variable substitution for flexible, reusable configurations:

templates:
  streamProcessors:
    sensor_template:
      model:
        name: temperature
        version: v1
      sources:
        temp: "${{ .location_path }}._raw.${{ .sensor_name }}"
      mapping:
        temperatureInC: "(temp - 32) * 5 / 9"
        sensor_id: "${{ .sensor_id }}"
        location: "${{ .location_description }}"

Built-in Variables:

  • ${{ .location_path }}: Auto-generated from location hierarchy (e.g., umh.v1.corpA.plant-A.line-4.pump41)

Custom Variables:

  • Define in variables: section of stream processor

  • Reference in templates using ${{ .variable_name }}

Field Mapping

Transform source variables into model fields using two mapping types:

Dynamic Mapping

JavaScript expressions evaluated at runtime:

mapping:
  # Direct pass-through
  pressure: "press"
  
  # Unit conversions
  temperature: "(temp - 32) * 5 / 9"
  
  # Calculations
  total_power: "l1 + l2 + l3"
  
  # Conditional logic
  status: "temp > 100 ? 'hot' : 'normal'"
  
  # Sub-model fields (nested YAML)
  motor:
    current: "motor_current_var"
    rpm: "motor_speed_var"
  
  # Folder fields
  diagnostics:
    vibration: "vibration_var"

Static Mapping

Template variables resolved at deployment time:

mapping:
  serialNumber: "${{ .sn }}"
  firmware_version: "${{ .firmware_ver }}"
  installation_date: "${{ .install_date }}"

Complete Examples

Minimal Temperature Sensor

payloadshapes:
  timeseries-number:
    fields:
      timestamp_ms:
        _type: number
      value:
        _type: number

datamodels:
  temperature:
    description: "Temperature sensor model"
    versions:
      v1:
        structure:
          temperatureInC:
            _payloadshape: timeseries-number

datacontracts:
  - name: _temperature_v1
    model:
      name: temperature
      version: v1
    default_bridges:
      - type: timescaledb
        retention_in_days: 365

templates:
  streamProcessors:
    temperature_template:
      model:
        name: temperature
        version: v1
      sources:
        temp: "${{ .location_path }}._raw.${{ .temp_sensor }}"
      mapping:
        temperatureInC: "(temp - 32) * 5 / 9"
        sensorId: "${{ .sn }}"

streamprocessors:
  - name: furnaceTemp_sp
    _templateRef: "temperature_template"
    location:
      0: corpA
      1: plant-A
      2: line-4
      3: furnace1
    variables:
      temp_sensor: "temperature_F"
      sn: "SN-F1-001"

Result:

  • UNS Topic: umh.v1.corpA.plant-A.line-4.furnace1._temperature_v1.temperatureInC

  • Payload: {"value": 815.6, "timestamp_ms": 1733904005123}

  • Database: Auto-created hypertable temperature_v1

Complex Pump with Motor Sub-Model

datamodels:
  motor:
    description: "Standard motor model"
    versions:
      v1:
        structure:
          current:
            _payloadshape: timeseries-number
          rpm:
            _payloadshape: timeseries-number
          temperature:
            _payloadshape: timeseries-number

  pump:
    description: "Pump with motor and diagnostics"
    versions:
      v1:
        structure:
          pressure:
            _payloadshape: timeseries-number
          temperature:
            _payloadshape: timeseries-number
          running:
            _payloadshape: timeseries-string
          vibration:
            x-axis:
              _payloadshape: timeseries-number
            y-axis:
              _payloadshape: timeseries-number
            z-axis:
              _payloadshape: timeseries-number
              _meta: # 🚧 **Roadmap Item**
                description: "Z-axis vibration measurement"
                unit: "m/s"
              _constraints: # 🚧 **Roadmap Item**
                max: 100
                min: 0
          motor:
            _refModel:
              name: motor
              version: v1
          acceleration:
            x:
              _payloadshape: timeseries-number
            y:
              _payloadshape: timeseries-number
          serialNumber:
            _payloadshape: timeseries-string

datacontracts:
  - name: _pump_v1
    model:
      name: pump
      version: v1
    default_bridges:
      - type: timescaledb
        retention_in_days: 1825  # 5 years
      - type: analytics_pipeline

templates:
  streamProcessors:
    pump_template:
      model:
        name: pump
        version: v1
      sources:
        press: "${{ .location_path }}._raw.${{ .pressure_sensor }}"
        temp: "${{ .location_path }}._raw.tempF"
        run: "${{ .location_path }}._raw.running"
        vib: "${{ .location_path }}._raw.vibration"
        current: "${{ .location_path }}._raw.motor_current"
        rpm: "${{ .location_path }}._raw.motor_speed"
        l1: "${{ .location_path }}._raw.power_l1"
        l2: "${{ .location_path }}._raw.power_l2"
      mapping:
        pressure: "press"
        temperature: "(temp - 32) * 5 / 9"
        running: "run"
        diagnostics:
          vibration: "vib"
        motor:
          current: "current"
          rpm: "rpm"
        total_power: "l1 + l2"
        serialNumber: "${{ .sn }}"

streamprocessors:
  - name: pump41_sp
    _templateRef: "pump_template"
    location:
      0: corpA
      1: plant-A
      2: line-4
      3: pump41
    variables:
      pressure_sensor: "pressure"
      sn: "SN-P41-007"

  - name: pump42_sp
    _templateRef: "pump_template"
    location:
      0: corpA
      1: plant-A
      2: line-4
      3: pump42
    variables:
      pressure_sensor: "press_sensor"
      sn: "SN-P42-008"

Generated Infrastructure:

Component
Count
Description

Data Model

1 (pump:v1)

Reusable across instances

Data Contract

1 (_pump_v1)

Shared configuration

Template

1 (pump_template)

Reusable processor configuration

Stream Processors

2 (pump41_sp, pump42_sp)

Asset-specific instances

TimescaleDB Tables

1 (pump_v1)

Shared storage

Validation and Error Handling

Stream processors provide built-in validation at multiple levels:

Template Validation

  • Model references must exist

  • Variable syntax must be valid

  • Mapping expressions validated for syntax

Deployment Validation

  • All template variables must be provided

  • Location hierarchy must be valid

  • Source topics must be resolvable

Runtime Validation

# Error scenarios and handling
mapping:
  invalid_field: "someVar"  # Error: not defined in model
  temperature: "temp / 0"   # Runtime error: skips message

Error Scenarios

  • Unknown model fields: Processor fails to start

  • Missing template variables: Processor fails to start

  • Expression errors: Message skipped, logged

  • Undefined expression results: Message skipped

Management Console

🚧 Roadmap Item - The Management Console provides a visual interface for creating and managing stream processors.

Console Workflow

The Management Console follows familiar UMH patterns for configuration:

1. Navigate to Stream Processors

Data Flows > Stream Processors > + Add Stream Processor

2. Template Selection

Select Template                                   [Next]
────────────────────────────────────────────────────────────────
Template          Model         Description
────────────────────────────────────────────────────────────────
○ pump_template   pump:v1       Pump with motor sub-model
○ temp_template   temperature:v1 Temperature sensor
○ motor_template  motor:v1      Standard motor
────────────────────────────────────────────────────────────────

3. Configuration Panel

Stream Processor (pump42_sp)                    [Deploy]
────────────────────────────────────────────────────────────────
1) General
   Name              pump42_sp
   Template          pump_template (pump:v1)

2) Location
   0: corpA     1: plant-A
   2: line-4    3: pump42    4: (blank)

3) Template Variables
────────────────────────────────────────────────────────────────
Variable           Value
────────────────────────────────────────────────────────────────
pressure_sensor    press_sensor
sn                 SN-P42-008
────────────────────────────────────────────────────────────────

4) Preview Generated Sources          📂 Tag Browser
────────────────────────────────────────────────────────────────
Source    Resolved Topics
────────────────────────────────────────────────────────────────
press     umh.v1.corpA.plant-A.line-4.pump42._raw.press_sensor
temp      umh.v1.corpA.plant-A.line-4.pump42._raw.tempF
run       umh.v1.corpA.plant-A.line-4.pump42._raw.running
────────────────────────────────────────────────────────────────

5) Preview Generated Output Topics
────────────────────────────────────────────────────────────────
umh.v1.corpA.plant-A.line-4.pump42._pump_v1.pressure
umh.v1.corpA.plant-A.line-4.pump42._pump_v1.temperature
umh.v1.corpA.plant-A.line-4.pump42._pump_v1.motor.current
────────────────────────────────────────────────────────────────
[YAML Preview ▼]

4. Interactive Features

Template-Driven Interface:

  • Pre-configured field mappings from template

  • Required variables clearly indicated

  • Live validation of variable values

Tag Browser Integration:

  • Click 📂 to verify source topics exist

  • Auto-complete topic paths

  • Real-time topic availability checking

Live Preview:

  • Show resolved source topics

  • Preview generated output topics

  • Validate template variable substitution

Best Practices

Template Design

  • Reusable logic: Design templates for equipment classes, not individual assets

  • Meaningful variables: Use descriptive variable names that make sense across instances

  • Consistent naming: Follow naming conventions across all templates

Stream Processor Configuration

  • Descriptive names: Use clear, asset-specific processor names

  • Complete variables: Provide all required template variables

  • Test incrementally: Deploy one processor at a time for validation

Performance Optimization

  • Efficient expressions: Keep JavaScript simple and fast

  • Minimize sources: Only subscribe to needed topics

  • Batch deployments: Deploy related processors together

For complete conceptual understanding:

For integration and context:

Last updated