LogoLogo
umh-core
umh-core
  • Introduction
  • Getting Started
  • Usage
    • Unified Namespace
      • Overview
      • Payload Formats
      • Topic Convention
      • Producing Data
      • Consuming Data
    • Data Flows
      • Overview
      • Bridges
      • Stand-alone Flow
      • Stream Processor 🚧
    • Data Modeling 🚧
      • Data Models 🚧
      • Data Contracts 🚧
      • Stream Processors 🚧
  • Production
    • Updating
    • Sizing Guide
    • Corporate Firewalls
    • Metrics
    • Migration from Classic
  • Reference
    • Configuration Reference
    • Container Layout
    • State Machines
    • Environment Variables
  • UMH Core vs UMH Classic
  • UMH Classic
    • Go To Documentation
  • Ressources
    • Website
    • Blog
Powered by GitBook
On this page
  • Consumption Patterns
  • 1. Bridge Sink Flows
  • 2. Stand-alone Flow Consumers
  • 3. REST API Access 🚧
  • Topic Patterns with UNS Input
  • Message Processing
  • Raw Data Payload
  • Structured Data Contract Payloads
  • UNS Input Features
  • Error Handling
  • Integration Examples
  • Database Integration Pattern
  • Data Contract Evolution
  • Consuming Raw Data (Simple)
  • Consuming Structured Data (Advanced) 🚧
  • Migration from UMH Classic
  • Why UNS Input/Output?
  • Topic Browser 🚧
  • Next Steps
  • Learn More
  1. Usage
  2. Unified Namespace

Consuming Data

🚧 Roadmap Item - Enhanced consumer tooling, Topic Browser, and REST API are under development.

Consuming data from the Unified Namespace involves subscribing to specific topics or patterns and processing the standardized message formats. UMH Core provides multiple consumption methods for different use cases.

Consumption Patterns

1. Bridge Sink Flows

Create outbound Bridges to send UNS data to external systems:

protocolConverter:
  - name: uns-to-mqtt
    desiredState: active
    protocolConverterServiceConfig:
      template:
        dataflowcomponent_write:
          benthos:
            input:
              uns:
                topics: ["umh.v1.acme.plant1.line4.pump01._raw.+"]
            pipeline:
              processors:
                - mapping: |
                    root.timestamp = this.timestamp_ms
                    root.value = this.value
                    root.topic = metadata("umh_topic")
            output:
              mqtt:
                urls: ["tcp://external-broker:1883"]
                topic: 'factory/{{ metadata("umh_topic").replace(".", "/") }}'

2. Stand-alone Flow Consumers

Process UNS data with custom logic:

dataFlow:
  - name: raw-data-consumer
    desiredState: active
    dataFlowComponentConfig:
      benthos:
        input:
          uns:
            topics: ["umh.v1.+.+.+.+._raw.+"]
        pipeline:
          processors:
            - mapping: |
                # Parse topic to extract location hierarchy
                let topic_parts = metadata("umh_topic").split(".")
                root.enterprise = topic_parts.1
                root.site = topic_parts.2  
                root.area = topic_parts.3
                root.line = topic_parts.4
                root.work_cell = topic_parts.5
                root.tag_name = topic_parts.7
                root.timestamp = this.timestamp_ms.ts_unix_milli()
                root.value = this.value
        output:
          sql_insert:
            driver: "postgres"
            dsn: "postgres://user:pass@timescale:5432/manufacturing"
            table: "sensor_data"

3. REST API Access 🚧

Query live UNS data via HTTP endpoints:

# Get namespace tree structure
curl -X GET "https://umh-core:8080/api/v1/uns/tree" \
  -H "Authorization: Bearer $JWT_TOKEN"

# Get latest values for specific path
curl -X GET "https://umh-core:8080/api/v1/uns/values" \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -d '{"path": "umh.v1.acme.plant1.line4.pump01._raw.*"}'

Topic Patterns with UNS Input

Pattern
Matches
Use Case

umh.v1.acme.+.+.+.+._raw.temperature

All raw temperature sensors

Temperature monitoring dashboard

umh.v1.acme.plant1.+.+.+._raw.+

All raw data from plant1

Plant-specific raw data analytics

umh.v1.+.+.+.+.pump01._pump.+

All structured data from pump01 assets

Asset-specific monitoring

umh.v1.acme.plant1.line4.+.+._temperature.+

Line 4 structured temperature data

Production line temperature analysis

Regex Support:

input:
  uns:
    topics: ["umh.v1.acme.plant1.line[0-9]+.+._raw.(temperature|pressure)"]

Message Processing

Raw Data Payload

{
  "value": 42.5,
  "timestamp_ms": 1733904005123
}

Processing example:

pipeline:
  processors:
    - mapping: |
        # Convert timestamp to different formats
        root.timestamp_iso = this.timestamp_ms.ts_unix_milli().ts_format("2006-01-02T15:04:05Z07:00")
        root.timestamp_unix = this.timestamp_ms / 1000
        root.measurement = this.value
        
        # Extract location from UNS topic (see Topic Convention for structure)
        let parts = metadata("umh_topic").split(".")
        root.location = {
          "enterprise": parts.1,
          "site": parts.2,
          "area": parts.3,
          "line": parts.4,
          "work_cell": parts.5,
          "tag": parts.7
        }

Structured Data Contract Payloads

# Consuming structured pump model data
input:
  uns:
    topics: ["umh.v1.+.+.+.+.pump01._pump.+"]
pipeline:
  processors:
    - mapping: |
        # Handle different fields from pump model
        match metadata("umh_topic").split(".").7 {
          "pressure" => root.sensor_type = "pressure"
          "temperature" => root.sensor_type = "temperature"  
          "motor.current" => root.sensor_type = "motor_current"
          "motor.rpm" => root.sensor_type = "motor_rpm"
          "diagnostics.vibration" => root.sensor_type = "vibration"
        }
        root.asset_id = "pump01"
        root.value = this.value
        root.timestamp = this.timestamp_ms

UNS Input Features

UNS input abstracts away Kafka/Redpanda complexity and provides:

  • Topic pattern matching with wildcards and regex

  • Automatic metadata - umh_topic contains the full UNS topic path

  • Message headers - All UNS metadata available via metadata() function

  • Embedded broker access - No need to configure Kafka addresses

input:
  uns:
    topics: 
      - "umh.v1.acme.plant1.+.+._raw.+"      # Wildcard matching
      - "umh.v1.acme.plant2.line[1-3].+._temperature.+" # Regex support

Error Handling

Handle consumer errors gracefully:

pipeline:
  processors:
    - try:
        - mapping: |
            # Your processing logic
            root.processed_value = this.value * 1.8 + 32
        - catch:
            - mapping: |
                root = deleted()
                # Log error and continue
            - log:
                level: "ERROR" 
                message: "Failed to process message: ${! error() }"

Integration Examples

Database Integration Pattern

UNS data can be consumed into various databases. Here's the basic pattern:

pipeline:
  processors:
    - mapping: |
        # Transform UNS message for database storage
        root.timestamp = this.timestamp_ms.ts_unix_milli()
        root.location = metadata("umh_topic").split(".").slice(1, 6).join(".")
        root.tag_name = metadata("umh_topic").split(".").7
        root.value = this.value
output:
  sql_insert:
    driver: "postgres"
    table: "sensor_readings"
    # ... database-specific configuration

Data Contract Evolution

Consuming Raw Data (Simple)

input:
  uns:
    topics: ["umh.v1.+.+.+.+._raw.+"]  # All raw sensor data
pipeline:
  processors:
    - mapping: |
        root.sensor_value = this.value
        root.timestamp = this.timestamp_ms

Consuming Structured Data (Advanced) 🚧

input:
  uns:
    topics: ["umh.v1.+.+.+.+._temperature.+"]  # Structured temperature data
pipeline:
  processors:
    - mapping: |
        # Structured contracts have specific field names
        root.temperature_celsius = this.temperature_in_c
        root.timestamp = this.timestamp_ms
        # Additional metadata from data model constraints
        root.unit = "°C"

Migration from UMH Classic

Why UNS Input/Output?

UMH Core uses UNS input/output instead of direct Kafka access because:

  • Abstraction: Hides Kafka/Redpanda complexity from users

  • Embedded Integration: Works seamlessly with UMH Core's embedded Redpanda

  • Topic Management: Automatic topic creation and management

  • Metadata Handling: Proper UNS metadata propagation

  • Pattern Matching: Advanced regex support for topic patterns

This aligns with UMH Core's philosophy of embedding Redpanda as an internal implementation detail rather than exposing it directly to users.

Topic Browser 🚧

The upcoming Topic Browser provides a visual interface to explore and consume UNS data:

  • Real-time topic tree visualization

  • Live value monitoring

  • Historical data queries

  • Export to various formats

Next Steps

Learn More

PreviousProducing DataNextData Flows

Last updated 2 days ago

UNS input supports powerful regex patterns to subscribe to specific data. For complete topic structure details, see .

For complete UNS input syntax, see .

Simple sensor data follows the :

For topic structure details and parsing rules, see .

For structured data contracts, use to understand payload structure:

For complete database integration examples including TimescaleDB, InfluxDB, and other systems, see .

UMH Classic Users: See for complete migration instructions including _historian → _raw data contract changes.

🚧 - Structure complex data consumption with explicit models

- Advanced processing patterns

- Complete consumer configuration

- Choose the right storage backend

- Database selection rationale

- Visualization best practices

- Complete UNS output reference

Topic Convention
Benthos-UMH UNS Input Documentation
timeseries payload format
Topic Convention
Data Models
Integration Patterns Guide
Migration from UMH Classic to UMH Core
Data Modeling
Data Flows Overview
Configuration Reference
Historians vs Open-Source databases
Why we chose TimescaleDB over InfluxDB
Simplifying Tag Browsing in Grafana
Benthos-UMH UNS Output Documentation