# Tag Processor

The Tag Processor is designed to prepare incoming data for the UMH data model. It processes messages through three configurable stages: defaults, conditional transformations, and advanced processing, all using a Node-RED style JavaScript environment.

Use the `tag_processor` compared to the `nodered_js` when you are processing tags or time series data and converting them to the UMH data model within the `_historian` data contract. This processor is optimized for handling structured time series data, automatically formats messages, and generates appropriate metadata.

The JavaScript stages use the same environment as the `nodered_js` processor. For the full list of available globals (`msg`, `console`, `cache`), see the [JavaScript API Reference](/benthos-umh/processing/javascript-api.md).

**Message Formatting Behavior**

The processor automatically formats different input types into a consistent structure with a "value" field:

1. **Simple Values (numbers, strings, booleans)**\
   Input:

```json
42
```

Output:

```json
{
  "value": 42
}
```

Input:

```json
"test string"
```

Output:

```json
{
  "value": "test string"
}
```

Input:

```json
true
```

Output:

```json
{
  "value": true
}
```

2. **Arrays** (converted to JSON string representation)\
   Input:

```json
["a", "b", "c"]
```

Output:

```json
{
  "value": "[\"a\",\"b\",\"c\"]"
}
```

3. **Objects** (preserved as JSON objects)\
   Input:

```json
{
  "key1": "value1",
  "key2": 42
}
```

Output:

```json
{
  "value": "{\"key1\": \"value1\",\"key2\": 42}"
}
```

4. **Numbers** (preserved as numbers)\
   Input:

```json
23.5
```

Output:

```json
{
  "value": 23.5
}
```

Input:

```json
42
```

Output:

```json
{
  "value": 42
}
```

This consistent formatting ensures that:

* All messages have a "value" field
* Simple types (numbers, strings, booleans) are preserved as-is
* Complex types (arrays, objects) are converted to JSON string representations
* Numbers are always preserved as numeric types (integers or floats)

> **Breaking change in v4.0.0**: Arrays now serialize to JSON format `["a","b","c"]` instead of space-separated format `[a b c]`. This preserves type information and enables array parsing in downstream processors.

**Configuration**

```yaml
pipeline:
  processors:
    - tag_processor:
        defaults: |

          // Set default location hierarchy and datacontract
          msg.meta.location_path = "enterprise.plant1.machiningArea.cnc-line.cnc5.plc123";
          msg.meta.data_contract = "_historian";
          msg.meta.tag_name = "value";
          msg.payload = msg.payload; //does not modify the payload
          return msg;
        conditions:
          - if: msg.meta.opcua_node_id === "ns=1;i=2245"
            then: |
              // Set path hierarchy and tag name for specific OPC UA node
              msg.meta.virtual_path = "axis.x.position";
              msg.meta.tag_name = "actual";
              return msg;
        advancedProcessing: |
          // Optional advanced message processing
          // Example: double numeric values
          msg.payload = parseFloat(msg.payload) * 2;
          return msg;
```

**Processing Stages**

1. **Defaults**
   * Sets initial metadata values
   * Runs first on every message
   * Must return a message object
2. **Conditions**
   * List of conditional transformations
   * Each condition has an `if` expression and a `then` code block
   * Runs after defaults
   * Must return a message object
3. **Advanced Processing**
   * Optional final processing stage
   * Can modify both metadata and payload
   * Must return a message object

**How Metadata Works**

The tag processor uses a two-step process for handling metadata:

1. **JavaScript Stage - Working with `msg.meta`**

   In your JavaScript code (defaults, conditions, advancedProcessing), you work with the `msg.meta` object:

   ```javascript
   // In JavaScript processing stages
   msg.meta.location_path = 'enterprise.site.area'
   msg.meta.data_contract = '_historian'
   msg.meta.tag_name = 'temperature'
   msg.meta.virtual_path = 'axis.x.position'
   ```

   At this stage, these are just JavaScript object properties - they're not yet actual Benthos metadata.
2. **Conversion to Benthos Metadata**

   After the JavaScript processing is complete, the tag processor extracts all properties from the `msg.meta` JavaScript object and converts them to actual Benthos message metadata.
3. **Final Message Structure**

   **Important**: Properties set on `msg.meta` become message metadata, **not** part of the payload.

   **Metadata** (accessible via message metadata):

   * `location_path`: "enterprise.site.area"
   * `data_contract`: "\_historian"
   * `tag_name`: "temperature"
   * `virtual_path`: "axis.x.position"
   * `umh_topic`: "umh.v1.enterprise.site.area.\_historian.axis.x.position.temperature" (auto-generated)
   * `topic`: "umh.v1.enterprise.site.area.\_historian.axis.x.position.temperature" (auto-generated, deprecated)

   **Payload** (the actual message content):

   ```json
   {
     "value": 23.5,
     "timestamp_ms": 1647890123456
   }
   ```

**Integration with UNS Output**

The tag processor is typically used together with the `uns` output plugin, which is the standard way to publish data into the United Manufacturing Hub. Here's how metadata flows from tag processor to UNS:

1. **Metadata to UNS Headers**: All metadata fields set by the tag processor become UNS headers, preserving the complete context and routing information.
2. **UMH Topic as Routing Key**: The auto-generated `umh_topic` metadata becomes the UNS routing key, enabling efficient message routing and topic-based subscriptions.
3. **Payload Preservation**: The structured payload (with `value` and `timestamp_ms`) is published as-is to the UNS.

**Complete Flow Example:**

```yaml
pipeline:
  processors:
    - tag_processor:
        defaults: |
          msg.meta.location_path = "enterprise.site.area";
          msg.meta.data_contract = "_historian";
          msg.meta.tag_name = "temperature";
          msg.meta.custom_field = "sensor_data";
          return msg;
  output:
    uns: {}
```

**Input Message**: `23.5`

**UNS Record Result**:

* **UNS Topic**: `umh.messages` (fixed UNS topic)
* **UNS Routing Key**: `umh.v1.enterprise.site.area._historian.temperature` (from `umh_topic` metadata)
* **UNS Value**: `{"value": 23.5, "timestamp_ms": 1647890123456}` (structured payload)
* **UNS Headers**:

  ```
  location_path: "enterprise.site.area"
  data_contract: "_historian"
  tag_name: "temperature"
  umh_topic: "umh.v1.enterprise.site.area._historian.temperature"
  topic: "umh.v1.enterprise.site.area._historian.temperature"
  custom_field: "sensor_data"
  bridged_by: "umh-core"
  ```

**Metadata Fields**

The processor uses the following metadata fields:

**Required Fields:**

* `location_path`: Hierarchical location path in dot notation (e.g., "enterprise.site.area.line.workcell.plc123")
* `data_contract`: Data schema identifier (e.g., "\_historian", "\_analytics")
* `tag_name`: Name of the tag/variable (e.g., "temperature", "pressure")

**Optional Fields:**

* `virtual_path`: Logical, non-physical grouping path in dot notation (e.g., "axis.x.position")

**Generated Fields:**

* `umh_topic`: Automatically generated from the above fields in the format:

  ```
  umh.v1.<location_path>.<data_contract>.<virtual_path>.<tag_name>
  ```

  Empty or undefined fields are skipped, and dots are normalized.

**Message Structure**

Messages in the Tag Processor follow the Node-RED style format:

```javascript
{
  payload: {
    // The message content - can be a simple value or complex object
    "value": 23.5,
    "timestamp_ms": 1733903611000
  },
  meta: {
    // Required fields
    location_path: "enterprise.site.area.line.workcell.plc123",  // Hierarchical location path
    data_contract: "_historian",                                 // Data schema identifier
    tag_name: "temperature",                                     // Name of the tag/variable

    // Optional fields
    virtual_path: "axis.x.position",                            // Logical grouping path

    // Generated field (by processor)
    umh_topic: "umh.v1.enterprise.site.area.line.workcell.plc123._historian.axis.x.position.temperature",

    // Input-specific fields (e.g., from OPC UA)
    opcua_node_id: "ns=1;i=2245",
    opcua_tag_name: "temperature_sensor_1",
    opcua_tag_group: "sensors.temperature",
    opcua_tag_path: "sensors.temperature",
    opcua_tag_type: "number",
    opcua_source_timestamp: "2024-03-12T10:00:00Z",
    opcua_server_timestamp: "2024-03-12T10:00:00.001Z",
    opcua_attr_nodeid: "ns=1;i=2245",
    opcua_attr_nodeclass: "Variable",
    opcua_attr_browsename: "Temperature",
    opcua_attr_description: "Temperature Sensor 1",
    opcua_attr_accesslevel: "CurrentRead",
    opcua_attr_datatype: "Double"
  }
}
```

**Examples**

1. **Basic Defaults Processing**

```yaml
tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise.plant1.machiningArea.cnc-line.cnc5.plc123";
    msg.meta.data_contract = "_historian";
    msg.meta.tag_name = "actual";
    return msg;
```

Input:

```json
23.5
```

Output:

```json
{
  "value": 23.5,
  "timestamp_ms": 1733903611000
}
```

UMH Topic: `umh.v1.enterprise.plant1.machiningArea.cnc-line.cnc5.plc123._historian.actual`

2. **OPC UA Node ID Based Processing**

```yaml
tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise.plant1.machiningArea.cnc-line.cnc5.plc123";
    msg.meta.data_contract = "_historian";
    return msg;
  conditions:
    - if: msg.meta.opcua_attr_nodeid === "ns=1;i=2245"
      then: |
        msg.meta.virtual_path = "axis.x.position";
        msg.meta.tag_name = "actual";
        return msg;
```

Input with metadata `opcua_attr_nodeid: "ns=1;i=2245"`:

```json
23.5
```

Output:

```json
{
  "value": 23.5,
  "timestamp_ms": 1733903611000
}
```

UMH Topic: `umh.v1.enterprise.plant1.machiningArea.cnc-line.cnc5.plc123._historian.axis.x.position.actual`

3. **Moving Folder Structures in Virtual Path**

```yaml
tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise.plant1";
    msg.meta.data_contract = "_historian";
    msg.meta.virtual_path = msg.meta.opcua_tag_path;
    msg.meta.tag_name = msg.meta.opcua_tag_name;
    return msg;
  conditions:
    # Move the entire DataAccess_AnalogType folder and its children into axis.x
    - if: msg.meta.opcua_tag_path && msg.meta.opcua_tag_path.includes("DataAccess_AnalogType")
      then: |
        msg.meta.location_path += ".area1.machining_line.cnc5.plc123";
        msg.meta.virtual_path = "axis.x." + msg.meta.opcua_tag_path;
        return msg;
```

Input messages with OPC UA tags:

```javascript
// Original tag paths from OPC UA:
// DataAccess_AnalogType
// DataAccess_AnalogType.EURange
// DataAccess_AnalogType.Min
// DataAccess_AnalogType.Max
```

Output UMH topics will be:

```
umh.v1.enterprise.plant1.area1.machining_line.cnc5.plc123._historian.axis.x.DataAccess_AnalogType
umh.v1.enterprise.plant1.area1.machining_line.cnc5.plc123._historian.axis.x.DataAccess_AnalogType.EURange
umh.v1.enterprise.plant1.area1.machining_line.cnc5.plc123._historian.axis.x.DataAccess_AnalogType.Min
umh.v1.enterprise.plant1.area1.machining_line.cnc5.plc123._historian.axis.x.DataAccess_AnalogType.Max
```

This example shows how to:

* Match an entire folder structure using `includes("DataAccess_AnalogType")`
* Move all matching nodes into a new virtual path prefix (`axis.x`)
* Preserve the original folder hierarchy under the new location
* Apply consistent location path for the entire folder structure

4. **Advanced Processing with getLastPayload**

getLastPayload is a function that returns the last payload of a message that was avaialble in Kafka. Remember that you will get the full payload, and might still need to extract the value you need.

**This is not yet implemented, but will be available in the future.**

```yaml
tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise.site.area.line.workcell";
    msg.meta.data_contract = "_analytics";
    msg.meta.virtual_path = "work_order";
    return msg;
  advancedProcessing: |
    msg.payload = {
      "work_order_id": msg.payload.work_order_id,
      "work_order_start_time": umh.getLastPayload("enterprise.site.area.line.workcell._historian.workorder.work_order_start_time").work_order_start_time,
      "work_order_end_time": umh.getLastPayload("enterprise.site.area.line.workcell._historian.workorder.work_order_end_time").work_order_end_time
    };
    return msg;
```

Input:

```json
{
  "work_order_id": "WO123"
}
```

Output:

```json
{
  "value": "{\"work_order_id\":\"WO123\",\"work_order_start_time\":\"2024-03-12T10:00:00Z\",\"work_order_end_time\":\"2024-03-12T18:00:00Z\"}",
  "timestamp_ms": 1733903611000
}
```

UMH Topic: `umh.v1.enterprise.site.area.line.workcell._analytics.work_order`

4. **Dropping Messages Based on Value**

```yaml
tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise";
    msg.meta.data_contract = "_historian";
    msg.meta.tag_name = "temperature";
    return msg;
  advancedProcessing: |
    if (msg.payload < 0) {
      // Drop negative values
      return null;
    }
    return msg;
```

Input:

```json
-10
```

Output: Message is dropped (no output)

Input:

```json
10
```

Output:

```json
{
  "value": 10,
  "timestamp_ms": 1733903611000
}
```

UMH Topic: `umh.v1.enterprise._historian.temperature`

5. **Duplicating Messages for Different Data Contracts**

```yaml
tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise";
    msg.meta.data_contract = "_historian";
    msg.meta.tag_name = "temperature";
    return msg;
  conditions:
    - if: true
      then: |
        msg.meta.location_path += ".production";
        return msg;
  advancedProcessing: |
    // Create two versions of the message:
    // 1. Original value for historian
    // 2. Doubled value for custom
    let doubledValue = msg.payload * 2;

    msg1 = {
      payload: msg.payload,
      meta: { ...msg.meta, data_contract: "_historian" }
    };

    msg2 = {
      payload: doubledValue,
      meta: { ...msg.meta, data_contract: "_custom", tag_name: msg.meta.tag_name + "_doubled" }
    };

    return [msg1, msg2];
```

Input:

```json
23.5
```

Output 1 (Historian):

```json
{
  "value": 23.5,
  "timestamp_ms": 1733903611000
}
```

UMH Topic: `umh.v1.enterprise.production._historian.temperature`

Output 2 (custom):

```json
{
  "value": 47,
  "timestamp_ms": 1733903611000
}
```

UMH Topic: `umh.v1.enterprise.production._custom.temperature_doubled`

6. **Processing Full MQTT Message Payload**

```yaml
tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise.area._workorder";
    msg.meta.data_contract = "_workorder";
    msg.meta.virtual_path = "new";
    msg.meta.tag_name = "maintenance";
    return msg;
```

Input:

```json
{
  "maintenanceSchedule": {
    "eventType": "ScheduledMaintenance",
    "eventId": "SM-20240717-025",
    "timestamp": "2024-07-17T13:00:00Z",
    "equipmentId": "InjectionMoldingMachine5",
    "equipmentName": "Engel Victory 120",
    "scheduledDate": "2024-07-22",
    "maintenanceType": "Preventive",
    "description": "Inspection and cleaning of injection unit and mold.",
    "maintenanceDuration": "6 hours",
    "assignedTo": {
      "employeeId": "EMP-5005",
      "name": "Hans Becker"
    },
    "status": "Scheduled",
    "partsRequired": [
      {
        "partId": "NOZZLE-015",
        "description": "Injection Nozzle",
        "quantity": 1
      }
    ],
    "notes": "Replace worn nozzle to prevent defects."
  }
}
```

Output:

```json
{
  "value": "{\"maintenanceSchedule\":{\"eventType\":\"ScheduledMaintenance\",\"eventId\":\"SM-20240717-025\",\"timestamp\":\"2024-07-17T13:00:00Z\",\"equipmentId\":\"InjectionMoldingMachine5\",\"equipmentName\":\"Engel Victory 120\",\"scheduledDate\":\"2024-07-22\",\"maintenanceType\":\"Preventive\",\"description\":\"Inspection and cleaning of injection unit and mold.\",\"maintenanceDuration\":\"6 hours\",\"assignedTo\":{\"employeeId\":\"EMP-5005\",\"name\":\"Hans Becker\"},\"status\":\"Scheduled\",\"partsRequired\":[{\"partId\":\"NOZZLE-015\",\"description\":\"Injection Nozzle\",\"quantity\":1}],\"notes\":\"Replace worn nozzle to prevent defects.\"}}",
  "timestamp_ms": 1733903611000
}
```

UMH Topic: `umh.v1.enterprise.area._workorder.maintenance`

7. **Setting Custom Timestamps**

By default, the tag processor uses the current time. You can set custom timestamps using the `timestamp_ms` metadata field (Unix milliseconds or RFC3339Nano as string):

```yaml
tag_processor:
  defaults: |
    msg.meta.location_path = "enterprise.site.area";
    msg.meta.data_contract = "_historian";
    msg.meta.tag_name = "temperature";

    // Use OPC UA timestamp either as Unix milliseconds
    if (msg.meta.opcua_source_timestamp) {
      msg.meta.timestamp_ms = new Date(msg.meta.opcua_source_timestamp).getTime().toString();
    }
    // or use it directly as RFC3339Nano
    if (msg.meta.opcua_source_timestamp) {
      msg.meta.timestamp_ms = msg.meta.opcua_source_timestamp;
    }
    // Or use Sparkplug B timestamp directly
    if (msg.meta.spb_timestamp) {
      msg.meta.timestamp_ms = msg.meta.spb_timestamp;
    }

    return msg;
```

**Note:** In the `tag_processor`, the resulting payload will always include `timestamp_ms` and one additional key corresponding to the `tag_name`. If you need to fully control the resulting payload structure, consider using the `nodered_js` processor instead. You can set the topic and payload manually, as shown below:

```yaml
pipeline:
  processors:
    - nodered_js:
        code: |
          // set kafka topic manually
          msg.meta.umh_topic = "umh.v1.enterprise.site.area._workorder.new"

          // only take two fields from the payload
          msg.payload = {
            "maintenanceSchedule": {
              "eventType": msg.payload.maintenanceSchedule.eventType,
              "description": msg.payload.maintenanceSchedule.description
            }
          }
          return msg;
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.umh.app/benthos-umh/processing/tag-processor.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
