Tag Processor
The Tag Processor is designed to prepare incoming data for the UMH data model. It processes messages through three configurable stages: defaults, conditional transformations, and advanced processing, all using a Node-RED style JavaScript environment.
Use the tag_processor
compared to the nodered_js
when you are processing tags or time series data and converting them to the UMH data model within the _historian
data contract. This processor is optimized for handling structured time series data, automatically formats messages, and generates appropriate metadata.
Message Formatting Behavior
The processor automatically formats different input types into a consistent structure with a "value" field:
Simple Values (numbers, strings, booleans) Input:
42
Output:
{
"value": 42
}
Input:
"test string"
Output:
{
"value": "test string"
}
Input:
true
Output:
{
"value": true
}
Arrays (converted to string representation) Input:
["a", "b", "c"]
Output:
{
"value": "[a b c]"
}
Objects (preserved as JSON objects) Input:
{
"key1": "value1",
"key2": 42
}
Output:
{
"value": "{\"key1\": \"value1\",\"key2\": 42}"
}
Numbers (preserved as numbers) Input:
23.5
Output:
{
"value": 23.5
}
Input:
42
Output:
{
"value": 42
}
This consistent formatting ensures that:
All messages have a "value" field
Simple types (numbers, strings, booleans) are preserved as-is
Complex types (arrays, objects) are converted to their string representations
Numbers are always preserved as numeric types (integers or floats)
Configuration
pipeline:
processors:
- tag_processor:
defaults: |
// Set default location hierarchy and datacontract
msg.meta.location_path = "enterprise.plant1.machiningArea.cnc-line.cnc5.plc123";
msg.meta.data_contract = "_historian";
msg.meta.tag_name = "value";
msg.payload = msg.payload; //does not modify the payload
return msg;
conditions:
- if: msg.meta.opcua_node_id === "ns=1;i=2245"
then: |
// Set path hierarchy and tag name for specific OPC UA node
msg.meta.virtual_path = "axis.x.position";
msg.meta.tag_name = "actual";
return msg;
advancedProcessing: |
// Optional advanced message processing
// Example: double numeric values
msg.payload = parseFloat(msg.payload) * 2;
return msg;
Processing Stages
Defaults
Sets initial metadata values
Runs first on every message
Must return a message object
Conditions
List of conditional transformations
Each condition has an
if
expression and athen
code blockRuns after defaults
Must return a message object
Advanced Processing
Optional final processing stage
Can modify both metadata and payload
Must return a message object
How Metadata Works
The tag processor uses a two-step process for handling metadata:
JavaScript Stage - Working with
msg.meta
In your JavaScript code (defaults, conditions, advancedProcessing), you work with the
msg.meta
object:// In JavaScript processing stages msg.meta.location_path = 'enterprise.site.area' msg.meta.data_contract = '_historian' msg.meta.tag_name = 'temperature' msg.meta.virtual_path = 'axis.x.position'
At this stage, these are just JavaScript object properties - they're not yet actual Benthos metadata.
Conversion to Benthos Metadata
After the JavaScript processing is complete, the tag processor extracts all properties from the
msg.meta
JavaScript object and converts them to actual Benthos message metadata.Final Message Structure
Important: Properties set on
msg.meta
become message metadata, not part of the payload.Metadata (accessible via message metadata):
location_path
: "enterprise.site.area"data_contract
: "_historian"tag_name
: "temperature"virtual_path
: "axis.x.position"umh_topic
: "umh.v1.enterprise.site.area._historian.axis.x.position.temperature" (auto-generated)topic
: "umh.v1.enterprise.site.area._historian.axis.x.position.temperature" (auto-generated, deprecated)
Payload (the actual message content):
{ "value": 23.5, "timestamp_ms": 1647890123456 }
Integration with UNS Output
The tag processor is typically used together with the uns
output plugin, which is the standard way to publish data into the United Manufacturing Hub. Here's how metadata flows from tag processor to UNS:
Metadata to UNS Headers: All metadata fields set by the tag processor become UNS headers, preserving the complete context and routing information.
UMH Topic as Routing Key: The auto-generated
umh_topic
metadata becomes the UNS routing key, enabling efficient message routing and topic-based subscriptions.Payload Preservation: The structured payload (with
value
andtimestamp_ms
) is published as-is to the UNS.
Complete Flow Example:
pipeline:
processors:
- tag_processor:
defaults: |
msg.meta.location_path = "enterprise.site.area";
msg.meta.data_contract = "_historian";
msg.meta.tag_name = "temperature";
msg.meta.custom_field = "sensor_data";
return msg;
output:
uns: {}
Input Message: 23.5
UNS Record Result:
UNS Topic:
umh.messages
(fixed UNS topic)UNS Routing Key:
umh.v1.enterprise.site.area._historian.temperature
(fromumh_topic
metadata)UNS Value:
{"value": 23.5, "timestamp_ms": 1647890123456}
(structured payload)UNS Headers:
location_path: "enterprise.site.area" data_contract: "_historian" tag_name: "temperature" umh_topic: "umh.v1.enterprise.site.area._historian.temperature" topic: "umh.v1.enterprise.site.area._historian.temperature" custom_field: "sensor_data" bridged_by: "umh-core"
Metadata Fields
The processor uses the following metadata fields:
Required Fields:
location_path
: Hierarchical location path in dot notation (e.g., "enterprise.site.area.line.workcell.plc123")data_contract
: Data schema identifier (e.g., "_historian", "_analytics")tag_name
: Name of the tag/variable (e.g., "temperature", "pressure")
Optional Fields:
virtual_path
: Logical, non-physical grouping path in dot notation (e.g., "axis.x.position")
Generated Fields:
umh_topic
: Automatically generated from the above fields in the format:umh.v1.<location_path>.<data_contract>.<virtual_path>.<tag_name>
Empty or undefined fields are skipped, and dots are normalized.
Message Structure
Messages in the Tag Processor follow the Node-RED style format:
{
payload: {
// The message content - can be a simple value or complex object
"temperature": 23.5,
"timestamp_ms": 1733903611000
},
meta: {
// Required fields
location_path: "enterprise.site.area.line.workcell.plc123", // Hierarchical location path
data_contract: "_historian", // Data schema identifier
tag_name: "temperature", // Name of the tag/variable
// Optional fields
virtual_path: "axis.x.position", // Logical grouping path
// Generated field (by processor)
umh_topic: "umh.v1.enterprise.site.area.line.workcell.plc123._historian.axis.x.position.temperature",
// Input-specific fields (e.g., from OPC UA)
opcua_node_id: "ns=1;i=2245",
opcua_tag_name: "temperature_sensor_1",
opcua_tag_group: "sensors.temperature",
opcua_tag_path: "sensors.temperature",
opcua_tag_type: "number",
opcua_source_timestamp: "2024-03-12T10:00:00Z",
opcua_server_timestamp: "2024-03-12T10:00:00.001Z",
opcua_attr_nodeid: "ns=1;i=2245",
opcua_attr_nodeclass: "Variable",
opcua_attr_browsename: "Temperature",
opcua_attr_description: "Temperature Sensor 1",
opcua_attr_accesslevel: "CurrentRead",
opcua_attr_datatype: "Double"
}
}
Examples
Basic Defaults Processing
tag_processor:
defaults: |
msg.meta.location_path = "enterprise.plant1.machiningArea.cnc-line.cnc5.plc123";
msg.meta.data_contract = "_historian";
msg.meta.tag_name = "actual";
return msg;
Input:
23.5
Output:
{
"actual": 23.5,
"timestamp_ms": 1733903611000
}
UMH Topic: umh.v1.enterprise.plant1.machiningArea.cnc-line.cnc5.plc123._historian.actual
OPC UA Node ID Based Processing
tag_processor:
defaults: |
msg.meta.location_path = "enterprise.plant1.machiningArea.cnc-line.cnc5.plc123";
msg.meta.data_contract = "_historian";
return msg;
conditions:
- if: msg.meta.opcua_attr_nodeid === "ns=1;i=2245"
then: |
msg.meta.virtual_path = "axis.x.position";
msg.meta.tag_name = "actual";
return msg;
Input with metadata opcua_attr_nodeid: "ns=1;i=2245"
:
23.5
Output:
{
"actual": 23.5,
"timestamp_ms": 1733903611000
}
UMH Topic: umh.v1.enterprise.plant1.machiningArea.cnc-line.cnc5.plc123._historian.axis.x.position.actual
Moving Folder Structures in Virtual Path
tag_processor:
defaults: |
msg.meta.location_path = "enterprise.plant1";
msg.meta.data_contract = "_historian";
msg.meta.virtual_path = msg.meta.opcua_tag_path;
msg.meta.tag_name = msg.meta.opcua_tag_name;
return msg;
conditions:
# Move the entire DataAccess_AnalogType folder and its children into axis.x
- if: msg.meta.opcua_tag_path && msg.meta.opcua_tag_path.includes("DataAccess_AnalogType")
then: |
msg.meta.location_path += ".area1.machining_line.cnc5.plc123";
msg.meta.virtual_path = "axis.x." + msg.meta.opcua_tag_path;
return msg;
Input messages with OPC UA tags:
// Original tag paths from OPC UA:
// DataAccess_AnalogType
// DataAccess_AnalogType.EURange
// DataAccess_AnalogType.Min
// DataAccess_AnalogType.Max
Output UMH topics will be:
umh.v1.enterprise.plant1.area1.machining_line.cnc5.plc123._historian.axis.x.DataAccess_AnalogType
umh.v1.enterprise.plant1.area1.machining_line.cnc5.plc123._historian.axis.x.DataAccess_AnalogType.EURange
umh.v1.enterprise.plant1.area1.machining_line.cnc5.plc123._historian.axis.x.DataAccess_AnalogType.Min
umh.v1.enterprise.plant1.area1.machining_line.cnc5.plc123._historian.axis.x.DataAccess_AnalogType.Max
This example shows how to:
Match an entire folder structure using
includes("DataAccess_AnalogType")
Move all matching nodes into a new virtual path prefix (
axis.x
)Preserve the original folder hierarchy under the new location
Apply consistent location path for the entire folder structure
Advanced Processing with getLastPayload
getLastPayload is a function that returns the last payload of a message that was avaialble in Kafka. Remember that you will get the full payload, and might still need to extract the value you need.
This is not yet implemented, but will be available in the future.
tag_processor:
defaults: |
msg.meta.location_path = "enterprise.site.area.line.workcell";
msg.meta.data_contract = "_analytics";
msg.meta.virtual_path = "work_order";
return msg;
advancedProcessing: |
msg.payload = {
"work_order_id": msg.payload.work_order_id,
"work_order_start_time": umh.getLastPayload("enterprise.site.area.line.workcell._historian.workorder.work_order_start_time").work_order_start_time,
"work_order_end_time": umh.getLastPayload("enterprise.site.area.line.workcell._historian.workorder.work_order_end_time").work_order_end_time
};
return msg;
Input:
{
"work_order_id": "WO123"
}
Output:
{
"work_order_id": "WO123",
"work_order_start_time": "2024-03-12T10:00:00Z",
"work_order_end_time": "2024-03-12T18:00:00Z"
}
UMH Topic: umh.v1.enterprise.site.area.line.workcell._analytics.work_order
Dropping Messages Based on Value
tag_processor:
defaults: |
msg.meta.location_path = "enterprise";
msg.meta.data_contract = "_historian";
msg.meta.tag_name = "temperature";
return msg;
advancedProcessing: |
if (msg.payload < 0) {
// Drop negative values
return null;
}
return msg;
Input:
-10
Output: Message is dropped (no output)
Input:
10
Output:
{
"temperature": 10,
"timestamp_ms": 1733903611000
}
UMH Topic: umh.v1.enterprise._historian.temperature
Duplicating Messages for Different Data Contracts
tag_processor:
defaults: |
msg.meta.location_path = "enterprise";
msg.meta.data_contract = "_historian";
msg.meta.tag_name = "temperature";
return msg;
conditions:
- if: true
then: |
msg.meta.location_path += ".production";
return msg;
advancedProcessing: |
// Create two versions of the message:
// 1. Original value for historian
// 2. Doubled value for custom
let doubledValue = msg.payload * 2;
msg1 = {
payload: msg.payload,
meta: { ...msg.meta, data_contract: "_historian" }
};
msg2 = {
payload: doubledValue,
meta: { ...msg.meta, data_contract: "_custom", tag_name: msg.meta.tag_name + "_doubled" }
};
return [msg1, msg2];
Input:
23.5
Output 1 (Historian):
{
"temperature": 23.5,
"timestamp_ms": 1733903611000
}
UMH Topic: umh.v1.enterprise.production._historian.temperature
Output 2 (custom):
{
"temperature_doubled": 47,
"timestamp_ms": 1733903611000
}
UMH Topic: umh.v1.enterprise.production._custom.temperature_doubled
Processing Full MQTT Message Payload
tag_processor:
defaults: |
msg.meta.location_path = "enterprise.area._workorder";
msg.meta.data_contract = "_workorder";
msg.meta.virtual_path = "new";
msg.meta.tag_name = "maintenance";
return msg;
Input:
{
"maintenanceSchedule": {
"eventType": "ScheduledMaintenance",
"eventId": "SM-20240717-025",
"timestamp": "2024-07-17T13:00:00Z",
"equipmentId": "InjectionMoldingMachine5",
"equipmentName": "Engel Victory 120",
"scheduledDate": "2024-07-22",
"maintenanceType": "Preventive",
"description": "Inspection and cleaning of injection unit and mold.",
"maintenanceDuration": "6 hours",
"assignedTo": {
"employeeId": "EMP-5005",
"name": "Hans Becker"
},
"status": "Scheduled",
"partsRequired": [
{
"partId": "NOZZLE-015",
"description": "Injection Nozzle",
"quantity": 1
}
],
"notes": "Replace worn nozzle to prevent defects."
}
}
Output:
{
"maintenance": {
"maintenanceSchedule": {
"eventType": "ScheduledMaintenance",
"eventId": "SM-20240717-025",
"timestamp": "2024-07-17T13:00:00Z",
"equipmentId": "InjectionMoldingMachine5",
"equipmentName": "Engel Victory 120",
"scheduledDate": "2024-07-22",
"maintenanceType": "Preventive",
"description": "Inspection and cleaning of injection unit and mold.",
"maintenanceDuration": "6 hours",
"assignedTo": {
"employeeId": "EMP-5005",
"name": "Hans Becker"
},
"status": "Scheduled",
"partsRequired": [
{
"partId": "NOZZLE-015",
"description": "Injection Nozzle",
"quantity": 1
}
],
"notes": "Replace worn nozzle to prevent defects."
}
},
"timestamp_ms": 1733903611000
}
UMH Topic: umh.v1.enterprise.area._workorder.maintenance
Setting Custom Timestamps
By default, the tag processor uses the current time. You can set custom timestamps using the timestamp_ms
metadata field (Unix milliseconds or RFC3339Nano as string):
tag_processor:
defaults: |
msg.meta.location_path = "enterprise.site.area";
msg.meta.data_contract = "_historian";
msg.meta.tag_name = "temperature";
// Use OPC UA timestamp either as Unix milliseconds
if (msg.meta.opcua_source_timestamp) {
msg.meta.timestamp_ms = new Date(msg.meta.opcua_source_timestamp).getTime().toString();
}
// or use it directly as RFC3339Nano
if (msg.meta.opcua_source_timestamp) {
msg.meta.timestamp_ms = msg.meta.opcua_source_timestamp;
}
// Or use Sparkplug B timestamp directly
if (msg.meta.spb_timestamp) {
msg.meta.timestamp_ms = msg.meta.spb_timestamp;
}
return msg;
Note: In the tag_processor
, the resulting payload will always include timestamp_ms
and one additional key corresponding to the tag_name
. If you need to fully control the resulting payload structure, consider using the nodered_js
processor instead. You can set the topic and payload manually, as shown below:
pipeline:
processors:
- nodered_js:
code: |
// set kafka topic manually
msg.meta.umh_topic = "umh.v1.enterprise.site.area._workorder.new"
// only take two fields from the payload
msg.payload = {
"maintenanceSchedule": {
"eventType": msg.payload.maintenanceSchedule.eventType,
"description": msg.payload.maintenanceSchedule.description
}
}
return msg;
Last updated