Tag Processor

The Tag Processor is designed to prepare incoming data for the UMH data model. It processes messages through three configurable stages: defaults, conditional transformations, and advanced processing, all using a Node-RED style JavaScript environment.

Use the tag_processor compared to the nodered_js when you are processing tags or time series data and converting them to the UMH data model within the _historian data contract. This processor is optimized for handling structured time series data, automatically formats messages, and generates appropriate metadata.

Message Formatting Behavior

The processor automatically formats different input types into a consistent structure with a "value" field:

  1. Simple Values (numbers, strings, booleans) Input:

42

Output:

{
  "value": 42
}

Input:

"test string"

Output:

{
  "value": "test string"
}

Input:

true

Output:

{
  "value": true
}
  1. Arrays (converted to string representation) Input:

["a", "b", "c"]

Output:

{
  "value": "[a b c]"
}
  1. Objects (preserved as JSON objects) Input:

{
  "key1": "value1",
  "key2": 42
}

Output:

{
  "value": "{\"key1\": \"value1\",\"key2\": 42}"
}
  1. Numbers (preserved as numbers) Input:

23.5

Output:

{
  "value": 23.5
}

Input:

42

Output:

{
  "value": 42
}

This consistent formatting ensures that:

  • All messages have a "value" field

  • Simple types (numbers, strings, booleans) are preserved as-is

  • Complex types (arrays, objects) are converted to their string representations

  • Numbers are always preserved as numeric types (integers or floats)

Configuration

pipeline:
  processors:
    - tag_processor:
        defaults: |

          // Set default location hierarchy and datacontract
          msg.meta.location_path = "enterprise.plant1.machiningArea.cnc-line.cnc5.plc123";
          msg.meta.data_contract = "_historian";
          msg.meta.tag_name = "value";
          msg.payload = msg.payload; //does not modify the payload
          return msg;
        conditions:
          - if: msg.meta.opcua_node_id === "ns=1;i=2245"
            then: |
              // Set path hierarchy and tag name for specific OPC UA node
              msg.meta.virtual_path = "axis.x.position";
              msg.meta.tag_name = "actual";
              return msg;
        advancedProcessing: |
          // Optional advanced message processing
          // Example: double numeric values
          msg.payload = parseFloat(msg.payload) * 2;
          return msg;

Processing Stages

  1. Defaults

    • Sets initial metadata values

    • Runs first on every message

    • Must return a message object

  2. Conditions

    • List of conditional transformations

    • Each condition has an if expression and a then code block

    • Runs after defaults

    • Must return a message object

  3. Advanced Processing

    • Optional final processing stage

    • Can modify both metadata and payload

    • Must return a message object

How Metadata Works

The tag processor uses a two-step process for handling metadata:

  1. JavaScript Stage - Working with msg.meta

    In your JavaScript code (defaults, conditions, advancedProcessing), you work with the msg.meta object:

    // In JavaScript processing stages
    msg.meta.location_path = 'enterprise.site.area'
    msg.meta.data_contract = '_historian'
    msg.meta.tag_name = 'temperature'
    msg.meta.virtual_path = 'axis.x.position'

    At this stage, these are just JavaScript object properties - they're not yet actual Benthos metadata.

  2. Conversion to Benthos Metadata

    After the JavaScript processing is complete, the tag processor extracts all properties from the msg.meta JavaScript object and converts them to actual Benthos message metadata.

  3. Final Message Structure

    Important: Properties set on msg.meta become message metadata, not part of the payload.

    Metadata (accessible via message metadata):

    • location_path: "enterprise.site.area"

    • data_contract: "_historian"

    • tag_name: "temperature"

    • virtual_path: "axis.x.position"

    • umh_topic: "umh.v1.enterprise.site.area._historian.axis.x.position.temperature" (auto-generated)

    • topic: "umh.v1.enterprise.site.area._historian.axis.x.position.temperature" (auto-generated, deprecated)

    Payload (the actual message content):

    {
      "value": 23.5,
      "timestamp_ms": 1647890123456
    }

Integration with UNS Output

The tag processor is typically used together with the uns output plugin, which is the standard way to publish data into the United Manufacturing Hub. Here's how metadata flows from tag processor to UNS:

  1. Metadata to UNS Headers: All metadata fields set by the tag processor become UNS headers, preserving the complete context and routing information.

  2. UMH Topic as Routing Key: The auto-generated umh_topic metadata becomes the UNS routing key, enabling efficient message routing and topic-based subscriptions.

  3. Payload Preservation: The structured payload (with value and timestamp_ms) is published as-is to the UNS.

Complete Flow Example:

pipeline:
  processors:
    - tag_processor:
        defaults: |
          msg.meta.location_path = "enterprise.site.area";
          msg.meta.data_contract = "_historian";
          msg.meta.tag_name = "temperature";
          msg.meta.custom_field = "sensor_data";
          return msg;
  output:
    uns: {}

Input Message: 23.5

UNS Record Result:

  • UNS Topic: umh.messages (fixed UNS topic)

  • UNS Routing Key: umh.v1.enterprise.site.area._historian.temperature (from umh_topic metadata)

  • UNS Value: {"value": 23.5, "timestamp_ms": 1647890123456} (structured payload)

  • UNS Headers:

    location_path: "enterprise.site.area"
    data_contract: "_historian"
    tag_name: "temperature"
    umh_topic: "umh.v1.enterprise.site.area._historian.temperature"
    topic: "umh.v1.enterprise.site.area._historian.temperature"
    custom_field: "sensor_data"
    bridged_by: "umh-core"

Metadata Fields

The processor uses the following metadata fields:

Required Fields:

  • location_path: Hierarchical location path in dot notation (e.g., "enterprise.site.area.line.workcell.plc123")

  • data_contract: Data schema identifier (e.g., "_historian", "_analytics")

  • tag_name: Name of the tag/variable (e.g., "temperature", "pressure")

Optional Fields:

  • virtual_path: Logical, non-physical grouping path in dot notation (e.g., "axis.x.position")

Generated Fields:

  • umh_topic: Automatically generated from the above fields in the format:

    umh.v1.<location_path>.<data_contract>.<virtual_path>.<tag_name>

    Empty or undefined fields are skipped, and dots are normalized.

Message Structure

Messages in the Tag Processor follow the Node-RED style format:

{
  payload: {
    // The message content - can be a simple value or complex object
    "temperature": 23.5,
    "timestamp_ms": 1733903611000
  },
  meta: {
    // Required fields
    location_path: "enterprise.site.area.line.workcell.plc123",  // Hierarchical location path
    data_contract: "_historian",                                 // Data schema identifier
    tag_name: "temperature",                                     // Name of the tag/variable

    // Optional fields
    virtual_path: "axis.x.position",                            // Logical grouping path

    // Generated field (by processor)
    umh_topic: "umh.v1.enterprise.site.area.line.workcell.plc123._historian.axis.x.position.temperature",

    // Input-specific fields (e.g., from OPC UA)
    opcua_node_id: "ns=1;i=2245",
    opcua_tag_name: "temperature_sensor_1",
    opcua_tag_group: "sensors.temperature",
    opcua_tag_path: "sensors.temperature",
    opcua_tag_type: "number",
    opcua_source_timestamp: "2024-03-12T10:00:00Z",
    opcua_server_timestamp: "2024-03-12T10:00:00.001Z",
    opcua_attr_nodeid: "ns=1;i=2245",
    opcua_attr_nodeclass: "Variable",
    opcua_attr_browsename: "Temperature",
    opcua_attr_description: "Temperature Sensor 1",
    opcua_attr_accesslevel: "CurrentRead",
    opcua_attr_datatype: "Double"
  }
}

Examples

  1. Basic Defaults Processing

tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise.plant1.machiningArea.cnc-line.cnc5.plc123";
    msg.meta.data_contract = "_historian";
    msg.meta.tag_name = "actual";
    return msg;

Input:

23.5

Output:

{
  "actual": 23.5,
  "timestamp_ms": 1733903611000
}

UMH Topic: umh.v1.enterprise.plant1.machiningArea.cnc-line.cnc5.plc123._historian.actual

  1. OPC UA Node ID Based Processing

tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise.plant1.machiningArea.cnc-line.cnc5.plc123";
    msg.meta.data_contract = "_historian";
    return msg;
  conditions:
    - if: msg.meta.opcua_attr_nodeid === "ns=1;i=2245"
      then: |
        msg.meta.virtual_path = "axis.x.position";
        msg.meta.tag_name = "actual";
        return msg;

Input with metadata opcua_attr_nodeid: "ns=1;i=2245":

23.5

Output:

{
  "actual": 23.5,
  "timestamp_ms": 1733903611000
}

UMH Topic: umh.v1.enterprise.plant1.machiningArea.cnc-line.cnc5.plc123._historian.axis.x.position.actual

  1. Moving Folder Structures in Virtual Path

tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise.plant1";
    msg.meta.data_contract = "_historian";
    msg.meta.virtual_path = msg.meta.opcua_tag_path;
    msg.meta.tag_name = msg.meta.opcua_tag_name;
    return msg;
  conditions:
    # Move the entire DataAccess_AnalogType folder and its children into axis.x
    - if: msg.meta.opcua_tag_path && msg.meta.opcua_tag_path.includes("DataAccess_AnalogType")
      then: |
        msg.meta.location_path += ".area1.machining_line.cnc5.plc123";
        msg.meta.virtual_path = "axis.x." + msg.meta.opcua_tag_path;
        return msg;

Input messages with OPC UA tags:

// Original tag paths from OPC UA:
// DataAccess_AnalogType
// DataAccess_AnalogType.EURange
// DataAccess_AnalogType.Min
// DataAccess_AnalogType.Max

Output UMH topics will be:

umh.v1.enterprise.plant1.area1.machining_line.cnc5.plc123._historian.axis.x.DataAccess_AnalogType
umh.v1.enterprise.plant1.area1.machining_line.cnc5.plc123._historian.axis.x.DataAccess_AnalogType.EURange
umh.v1.enterprise.plant1.area1.machining_line.cnc5.plc123._historian.axis.x.DataAccess_AnalogType.Min
umh.v1.enterprise.plant1.area1.machining_line.cnc5.plc123._historian.axis.x.DataAccess_AnalogType.Max

This example shows how to:

  • Match an entire folder structure using includes("DataAccess_AnalogType")

  • Move all matching nodes into a new virtual path prefix (axis.x)

  • Preserve the original folder hierarchy under the new location

  • Apply consistent location path for the entire folder structure

  1. Advanced Processing with getLastPayload

getLastPayload is a function that returns the last payload of a message that was avaialble in Kafka. Remember that you will get the full payload, and might still need to extract the value you need.

This is not yet implemented, but will be available in the future.

tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise.site.area.line.workcell";
    msg.meta.data_contract = "_analytics";
    msg.meta.virtual_path = "work_order";
    return msg;
  advancedProcessing: |
    msg.payload = {
      "work_order_id": msg.payload.work_order_id,
      "work_order_start_time": umh.getLastPayload("enterprise.site.area.line.workcell._historian.workorder.work_order_start_time").work_order_start_time,
      "work_order_end_time": umh.getLastPayload("enterprise.site.area.line.workcell._historian.workorder.work_order_end_time").work_order_end_time
    };
    return msg;

Input:

{
  "work_order_id": "WO123"
}

Output:

{
  "work_order_id": "WO123",
  "work_order_start_time": "2024-03-12T10:00:00Z",
  "work_order_end_time": "2024-03-12T18:00:00Z"
}

UMH Topic: umh.v1.enterprise.site.area.line.workcell._analytics.work_order

  1. Dropping Messages Based on Value

tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise";
    msg.meta.data_contract = "_historian";
    msg.meta.tag_name = "temperature";
    return msg;
  advancedProcessing: |
    if (msg.payload < 0) {
      // Drop negative values
      return null;
    }
    return msg;

Input:

-10

Output: Message is dropped (no output)

Input:

10

Output:

{
  "temperature": 10,
  "timestamp_ms": 1733903611000
}

UMH Topic: umh.v1.enterprise._historian.temperature

  1. Duplicating Messages for Different Data Contracts

tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise";
    msg.meta.data_contract = "_historian";
    msg.meta.tag_name = "temperature";
    return msg;
  conditions:
    - if: true
      then: |
        msg.meta.location_path += ".production";
        return msg;
  advancedProcessing: |
    // Create two versions of the message:
    // 1. Original value for historian
    // 2. Doubled value for custom
    let doubledValue = msg.payload * 2;

    msg1 = {
      payload: msg.payload,
      meta: { ...msg.meta, data_contract: "_historian" }
    };

    msg2 = {
      payload: doubledValue,
      meta: { ...msg.meta, data_contract: "_custom", tag_name: msg.meta.tag_name + "_doubled" }
    };

    return [msg1, msg2];

Input:

23.5

Output 1 (Historian):

{
  "temperature": 23.5,
  "timestamp_ms": 1733903611000
}

UMH Topic: umh.v1.enterprise.production._historian.temperature

Output 2 (custom):

{
  "temperature_doubled": 47,
  "timestamp_ms": 1733903611000
}

UMH Topic: umh.v1.enterprise.production._custom.temperature_doubled

  1. Processing Full MQTT Message Payload

tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise.area._workorder";
    msg.meta.data_contract = "_workorder";
    msg.meta.virtual_path = "new";
    msg.meta.tag_name = "maintenance";
    return msg;

Input:

{
  "maintenanceSchedule": {
    "eventType": "ScheduledMaintenance",
    "eventId": "SM-20240717-025",
    "timestamp": "2024-07-17T13:00:00Z",
    "equipmentId": "InjectionMoldingMachine5",
    "equipmentName": "Engel Victory 120",
    "scheduledDate": "2024-07-22",
    "maintenanceType": "Preventive",
    "description": "Inspection and cleaning of injection unit and mold.",
    "maintenanceDuration": "6 hours",
    "assignedTo": {
      "employeeId": "EMP-5005",
      "name": "Hans Becker"
    },
    "status": "Scheduled",
    "partsRequired": [
      {
        "partId": "NOZZLE-015",
        "description": "Injection Nozzle",
        "quantity": 1
      }
    ],
    "notes": "Replace worn nozzle to prevent defects."
  }
}

Output:

{
  "maintenance": {
    "maintenanceSchedule": {
      "eventType": "ScheduledMaintenance",
      "eventId": "SM-20240717-025",
      "timestamp": "2024-07-17T13:00:00Z",
      "equipmentId": "InjectionMoldingMachine5",
      "equipmentName": "Engel Victory 120",
      "scheduledDate": "2024-07-22",
      "maintenanceType": "Preventive",
      "description": "Inspection and cleaning of injection unit and mold.",
      "maintenanceDuration": "6 hours",
      "assignedTo": {
        "employeeId": "EMP-5005",
        "name": "Hans Becker"
      },
      "status": "Scheduled",
      "partsRequired": [
        {
          "partId": "NOZZLE-015",
          "description": "Injection Nozzle",
          "quantity": 1
        }
      ],
      "notes": "Replace worn nozzle to prevent defects."
    }
  },
  "timestamp_ms": 1733903611000
}

UMH Topic: umh.v1.enterprise.area._workorder.maintenance

  1. Setting Custom Timestamps

By default, the tag processor uses the current time. You can set custom timestamps using the timestamp_ms metadata field (Unix milliseconds as string):

tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise.site.area";
    msg.meta.data_contract = "_historian";
    msg.meta.tag_name = "temperature";
    
    // Use OPC UA timestamp
    if (msg.meta.opcua_source_timestamp) {
      msg.meta.timestamp_ms = new Date(msg.meta.opcua_source_timestamp).getTime().toString();
    }
    // Or use Sparkplug B timestamp directly
    if (msg.meta.spb_timestamp) {
      msg.meta.timestamp_ms = msg.meta.spb_timestamp;
    }
    
    return msg;

Note: In the tag_processor, the resulting payload will always include timestamp_ms and one additional key corresponding to the tag_name. If you need to fully control the resulting payload structure, consider using the nodered_js processor instead. You can set the topic and payload manually, as shown below:

pipeline:
  processors:
    - nodered_js:
        code: |
          // set kafka topic manually
          msg.meta.umh_topic = "umh.v1.enterprise.site.area._workorder.new"

          // only take two fields from the payload
          msg.payload = {
            "maintenanceSchedule": {
              "eventType": msg.payload.maintenanceSchedule.eventType,
              "description": msg.payload.maintenanceSchedule.description
            }
          }
          return msg;

Last updated