LogoLogo
umh-core
umh-core
  • Introduction
  • Getting Started
  • Usage
    • Unified Namespace
      • Overview
      • Payload Formats
      • Topic Convention
      • Producing Data
      • Consuming Data
    • Data Flows
      • Overview
      • Bridges
      • Stand-alone Flow
      • Stream Processor 🚧
    • Data Modeling 🚧
      • Data Models 🚧
      • Data Contracts 🚧
      • Stream Processors 🚧
  • Production
    • Updating
    • Sizing Guide
    • Corporate Firewalls
    • Metrics
    • Migration from Classic
  • Reference
    • Configuration Reference
    • Container Layout
    • State Machines
    • Environment Variables
  • UMH Core vs UMH Classic
  • UMH Classic
    • Go To Documentation
  • Ressources
    • Website
    • Blog
Powered by GitBook
On this page
  • 1. Bridge Sink Flows 🚧
  • Consumption Patterns
  • 1. Bridge Sink Flows
  • 2. Stand-alone Flow Consumers
  • 3. REST API Access 🚧
  • Topic Patterns with UNS Input
  • Message Processing
  • Raw Data Payload
  • Structured Data Contract Payloads
  • UNS Input Features
  • Error Handling
  • Integration Examples
  • Database Integration Pattern
  • Data Contract Evolution
  • Consuming Raw Data (Simple)
  • Consuming Structured Data (Advanced) 🚧
  • Migration from UMH Classic
  • Why UNS Input/Output?
  • Topic Browser 🚧
  • Next Steps
  • Learn More
  1. Usage
  2. Unified Namespace

Consuming Data

1. Bridge Sink Flows 🚧

🚧 Roadmap Item - Enhanced consumer tooling, Topic Browser, and REST API are under development.

Consuming data from the Unified Namespace involves subscribing to specific topics or patterns and processing the standardized message formats. UMH Core provides multiple consumption methods for different use cases.

Consumption Patterns

1. Bridge Sink Flows

Create outbound Bridges to send UNS data to external systems:

protocolConverter:
  - name: uns-to-mqtt
    desiredState: active
    protocolConverterServiceConfig:
      template:
        dataflowcomponent_write:
          benthos:
            input:
              uns:
                topics: ["umh.v1.acme.plant1.line4.pump01._raw.+"]
            pipeline:
              processors:
                - mapping: |
                    root.timestamp = this.timestamp_ms
                    root.value = this.value
                    root.topic = metadata("umh_topic")
            output:
              mqtt:
                urls: ["tcp://external-broker:1883"]
                topic: 'factory/{{ metadata("umh_topic").replace(".", "/") }}'

2. Stand-alone Flow Consumers

Process UNS data with custom logic:

dataFlow:
  - name: raw-data-consumer
    desiredState: active
    dataFlowComponentConfig:
      benthos:
        input:
          uns:
            topics: ["umh.v1.+.+.+.+._raw.+"]
        pipeline:
          processors:
            - mapping: |
                # Parse topic to extract location hierarchy
                let topic_parts = metadata("umh_topic").split(".")
                root.enterprise = topic_parts.1
                root.site = topic_parts.2  
                root.area = topic_parts.3
                root.line = topic_parts.4
                root.work_cell = topic_parts.5
                root.tag_name = topic_parts.7
                root.timestamp = this.timestamp_ms.ts_unix_milli()
                root.value = this.value
        output:
          sql_insert:
            driver: "postgres"
            dsn: "postgres://user:pass@timescale:5432/manufacturing"
            table: "sensor_data"

3. REST API Access 🚧

Query live UNS data via HTTP endpoints:

# Get namespace tree structure
curl -X GET "https://umh-core:8080/api/v1/uns/tree" \
  -H "Authorization: Bearer $JWT_TOKEN"

# Get latest values for specific path
curl -X GET "https://umh-core:8080/api/v1/uns/values" \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -d '{"path": "umh.v1.acme.plant1.line4.pump01._raw.*"}'

Topic Patterns with UNS Input

UNS input supports powerful regex patterns to subscribe to specific data. For complete topic structure details, see Topic Convention.

Pattern
Matches
Use Case

umh.v1.acme.+.+.+.+._raw.temperature

All raw temperature sensors

Temperature monitoring dashboard

umh.v1.acme.plant1.+.+.+._raw.+

All raw data from plant1

Plant-specific raw data analytics

umh.v1.+.+.+.+.pump01._pump.+

All structured data from pump01 assets

Asset-specific monitoring

umh.v1.acme.plant1.line4.+.+._temperature.+

Line 4 structured temperature data

Production line temperature analysis

Regex Support:

input:
  uns:
    topics: ["umh.v1.acme.plant1.line[0-9]+.+._raw.(temperature|pressure)"]

For complete UNS input syntax, see Benthos-UMH UNS Input Documentation.

Message Processing

Raw Data Payload

Simple sensor data follows the timeseries payload format:

{
  "value": 42.5,
  "timestamp_ms": 1733904005123
}

Processing example:

pipeline:
  processors:
    - mapping: |
        # Convert timestamp to different formats
        root.timestamp_iso = this.timestamp_ms.ts_unix_milli().ts_format("2006-01-02T15:04:05Z07:00")
        root.timestamp_unix = this.timestamp_ms / 1000
        root.measurement = this.value
        
        # Extract location from UNS topic (see Topic Convention for structure)
        let parts = metadata("umh_topic").split(".")
        root.location = {
          "enterprise": parts.1,
          "site": parts.2,
          "area": parts.3,
          "line": parts.4,
          "work_cell": parts.5,
          "tag": parts.7
        }

For topic structure details and parsing rules, see Topic Convention.

Structured Data Contract Payloads

For structured data contracts, use Data Models to understand payload structure:

# Consuming structured pump model data
input:
  uns:
    topics: ["umh.v1.+.+.+.+.pump01._pump.+"]
pipeline:
  processors:
    - mapping: |
        # Handle different fields from pump model
        match metadata("umh_topic").split(".").7 {
          "pressure" => root.sensor_type = "pressure"
          "temperature" => root.sensor_type = "temperature"  
          "motor.current" => root.sensor_type = "motor_current"
          "motor.rpm" => root.sensor_type = "motor_rpm"
          "diagnostics.vibration" => root.sensor_type = "vibration"
        }
        root.asset_id = "pump01"
        root.value = this.value
        root.timestamp = this.timestamp_ms

UNS Input Features

UNS input abstracts away Kafka/Redpanda complexity and provides:

  • Topic pattern matching with wildcards and regex

  • Automatic metadata - umh_topic contains the full UNS topic path

  • Message headers - All UNS metadata available via metadata() function

  • Embedded broker access - No need to configure Kafka addresses

input:
  uns:
    topics: 
      - "umh.v1.acme.plant1.+.+._raw.+"      # Wildcard matching
      - "umh.v1.acme.plant2.line[1-3].+._temperature.+" # Regex support

Error Handling

Handle consumer errors gracefully:

pipeline:
  processors:
    - try:
        - mapping: |
            # Your processing logic
            root.processed_value = this.value * 1.8 + 32
        - catch:
            - mapping: |
                root = deleted()
                # Log error and continue
            - log:
                level: "ERROR" 
                message: "Failed to process message: ${! error() }"

Integration Examples

Database Integration Pattern

UNS data can be consumed into various databases. Here's the basic pattern:

pipeline:
  processors:
    - mapping: |
        # Transform UNS message for database storage
        root.timestamp = this.timestamp_ms.ts_unix_milli()
        root.location = metadata("umh_topic").split(".").slice(1, 6).join(".")
        root.tag_name = metadata("umh_topic").split(".").7
        root.value = this.value
output:
  sql_insert:
    driver: "postgres"
    table: "sensor_readings"
    # ... database-specific configuration

For complete database integration examples including TimescaleDB, InfluxDB, and other systems, see Integration Patterns Guide.

Data Contract Evolution

Consuming Raw Data (Simple)

input:
  uns:
    topics: ["umh.v1.+.+.+.+._raw.+"]  # All raw sensor data
pipeline:
  processors:
    - mapping: |
        root.sensor_value = this.value
        root.timestamp = this.timestamp_ms

Consuming Structured Data (Advanced) 🚧

input:
  uns:
    topics: ["umh.v1.+.+.+.+._temperature.+"]  # Structured temperature data
pipeline:
  processors:
    - mapping: |
        # Structured contracts have specific field names
        root.temperature_celsius = this.temperature_in_c
        root.timestamp = this.timestamp_ms
        # Additional metadata from data model constraints
        root.unit = "°C"

Migration from UMH Classic

UMH Classic Users: See Migration from UMH Classic to UMH Core for complete migration instructions including _historian → _raw data contract changes.

Why UNS Input/Output?

UMH Core uses UNS input/output instead of direct Kafka access because:

  • Abstraction: Hides Kafka/Redpanda complexity from users

  • Embedded Integration: Works seamlessly with UMH Core's embedded Redpanda

  • Topic Management: Automatic topic creation and management

  • Metadata Handling: Proper UNS metadata propagation

  • Pattern Matching: Advanced regex support for topic patterns

This aligns with UMH Core's philosophy of embedding Redpanda as an internal implementation detail rather than exposing it directly to users.

Topic Browser 🚧

The upcoming Topic Browser provides a visual interface to explore and consume UNS data:

  • Real-time topic tree visualization

  • Live value monitoring

  • Historical data queries

  • Export to various formats

Next Steps

  • Data Modeling 🚧 - Structure complex data consumption with explicit models

  • Data Flows Overview - Advanced processing patterns

  • Configuration Reference - Complete consumer configuration

Learn More

  • Historians vs Open-Source databases - Choose the right storage backend

  • Why we chose TimescaleDB over InfluxDB - Database selection rationale

  • Simplifying Tag Browsing in Grafana - Visualization best practices

  • Benthos-UMH UNS Output Documentation - Complete UNS output reference

PreviousProducing DataNextData Flows

Last updated 3 days ago