# Classic to Core Processor

The `classic_to_core` processor converts UMH Historian schema format messages into Core format, following the "one tag, one message, one topic" principle. This processor is essential for migrating from Classic historian schemas to Core data architecture.

## Overview

The Classic to Core Processor transforms single messages containing multiple values and timestamps into separate Core format messages, each containing a single value. It handles nested tag groups by flattening them into intuitive dot-notation paths and reconstructs topics according to Core conventions.

## When to Use

Use the `classic_to_core` processor when you need to:

* **Process Classic Data in Core**: Convert Classic UMH Historian schema messages to Core format for processing in Core-based systems
* **Use Core Processors**: Enable the use of processors that rely on the Core data model (like downsampler)

## Quick Start

```yaml
pipeline:
  processors:
    - classic_to_core: {}
```

### Configuration Options

| Parameter              | Type   | Default | Description                                         |
| ---------------------- | ------ | ------- | --------------------------------------------------- |
| `target_data_contract` | string | `""`    | Target data contract. If empty, uses input's schema |

## Message Transformation

### Basic Conversion

The processor transforms single messages with multiple values into separate Core format messages:

**Input (Historian Schema):**

* Topic: `umh.v1.acme._historian.weather`
* Payload:

```json
{
  "timestamp_ms": 1717083000000,
  "temperature": 23.4,
  "humidity": 42.1
}
```

* target\_data\_contract: "\_raw"

**Output (Core Format):**

Message 1:

* Topic: `umh.v1.acme._raw.weather.temperature`
* Payload: `{"value": 23.4, "timestamp_ms": 1717083000000}`
* Metadata: `umh_topic: umh.v1.acme._raw.weather.temperature`

Message 2:

* Topic: `umh.v1.acme._raw.weather.humidity`
* Payload: `{"value": 42.1, "timestamp_ms": 1717083000000}`
* Metadata: `umh_topic: umh.v1.acme._raw.weather.humidity`

### Tag Groups (Nested Objects)

The processor flattens nested tag groups using dot notation:

**Input (Historian Schema with Tag Groups):**

* Topic: `umh.v1.acme._historian.cnc-mill`
* Payload:

```json
{
  "timestamp_ms": 1670001234567,
  "pos": {
    "x": 12.5,
    "y": 7.3,
    "z": 3.2
  },
  "temperature": 50.0,
  "collision": false
}
```

* target\_data\_contract: "\_raw"

**Output (Core Format with Flattened Tags):**

The processor creates 5 separate messages:

1. Topic: `umh.v1.acme._raw.cnc-mill.pos.x`
   * Payload: `{"value": 12.5, "timestamp_ms": 1670001234567}`
2. Topic: `umh.v1.acme._raw.cnc-mill.pos.y`
   * Payload: `{"value": 7.3, "timestamp_ms": 1670001234567}`
3. Topic: `umh.v1.acme._raw.cnc-mill.pos.z`
   * Payload: `{"value": 3.2, "timestamp_ms": 1670001234567}`
4. Topic: `umh.v1.acme._raw.cnc-mill.temperature`
   * Payload: `{"value": 50.0, "timestamp_ms": 1670001234567}`
5. Topic: `umh.v1.acme._raw.cnc-mill.collision`
   * Payload: `{"value": false, "timestamp_ms": 1670001234567}`

### Array Handling

Arrays are converted to string representation to ensure UMH-Core compliance (scalar values only):

**Input (Historian Schema with Arrays):**

* Topic: `umh.v1.factory._historian.sensors`
* Payload:

```json
{
  "timestamp_ms": 1670001234567,
  "measurements": [23.4, 24.1, 22.8],
  "sensor_ids": ["temp1", "temp2", "temp3"],
  "status_flags": [true, false, true],
  "mixed_data": ["sensor", 42, true, null]
}
```

* target\_data\_contract: "\_raw"

**Output (Core Format with Array Conversion):**

The processor creates 4 separate messages with arrays converted to strings:

1. Topic: `umh.v1.factory._raw.sensors.measurements`
   * Payload: `{"value": "[23.4 24.1 22.8]", "timestamp_ms": 1670001234567}`
2. Topic: `umh.v1.factory._raw.sensors.sensor_ids`
   * Payload: `{"value": "[temp1 temp2 temp3]", "timestamp_ms": 1670001234567}`
3. Topic: `umh.v1.factory._raw.sensors.status_flags`
   * Payload: `{"value": "[true false true]", "timestamp_ms": 1670001234567}`
4. Topic: `umh.v1.factory._raw.sensors.mixed_data`
   * Payload: `{"value": "[sensor 42 true <nil>]", "timestamp_ms": 1670001234567}`

**Array Conversion Rules:**

* Arrays are converted to space-separated string format: `[item1 item2 item3]`
* Empty arrays become `[]`
* Single-item arrays become `[item]`
* All array elements are converted using Go's `fmt.Sprintf("%v", item)` format
* Null values in arrays appear as `<nil>`
* Arrays within nested objects are also converted to strings

## Advanced Configuration

### Using Input Schema

When `target_data_contract` is not specified, the processor uses the input's schema:

```yaml
classic_to_core:
  # target_data_contract not specified - uses input's schema
```

**Input:**

* Topic: `umh.v1.acme._historian.weather`

**Output:**

* Topic: `umh.v1.acme._historian.weather.pressure`
* Maintains the original `_historian` schema

## Topic Transformation

The processor parses Classic topics and reconstructs them for Core format:

| Component     | Classic Example          | Core Example             | Description                    |
| ------------- | ------------------------ | ------------------------ | ------------------------------ |
| Prefix        | `umh.v1`                 | `umh.v1`                 | Unchanged                      |
| Location Path | `enterprise.plant1.area` | `enterprise.plant1.area` | Unchanged                      |
| Schema        | `_historian`             | `_raw` (configurable)    | Updated based on configuration |
| Context       | `weather`                | `weather`                | Becomes virtual\_path          |
| Field Name    | N/A                      | `temperature`            | Added for each field           |

**Example Transformation:**

* **Input Topic**: `umh.v1.enterprise.plant1.area._historian.weather`
* **Output Topic**: `umh.v1.enterprise.plant1.area._raw.weather.temperature`

## Metadata Handling

The processor sets the following metadata fields:

**Generated Metadata Fields:**

* `topic`: The new Core topic
* `umh_topic`: Same as topic (enables direct use with `uns_output`)
* `location_path`: Extracted from original topic
* `data_contract`: The target data contract (or input's schema if not specified)
* `tag_name`: The field name
* `virtual_path`: Original context (if present)

**Metadata Preservation:** Original metadata is always preserved alongside new fields.

## Performance & Reliability

### Built-in Safeguards

The processor includes several safeguards for production use:

* **Recursion Limit**: Maximum recursion depth of 10 levels for flattening nested tag groups
* **Message Size Limit**: Maximum of 1000 tags per message to prevent memory exhaustion
* **Topic Validation**: Strict UMH v1 topic format validation with proper error handling
* **Comprehensive Metrics**: Tracks processing counts, errors, and limit violations for monitoring

### Performance Considerations

* **Message Expansion**: Each input message creates N output messages (N = number of data fields)
* **Memory Usage**: Metadata is copied for each output message
* **Processing Overhead**: Minimal - efficient string parsing with optimized allocations

## Error Handling

The processor includes comprehensive error handling and logging for non-standard messages. Each error condition is logged with detailed information to help diagnose issues:

| Error Condition          | Log Message Example                                          | Behavior                      | Metrics               |
| ------------------------ | ------------------------------------------------------------ | ----------------------------- | --------------------- |
| Invalid JSON             | "failed to parse as structured data: ..."                    | Message skipped, error logged | `messages_errored`    |
| Missing timestamp\_ms    | "timestamp field 'timestamp\_ms' not found in payload"       | Message skipped, error logged | `messages_errored`    |
| Invalid timestamp format | "failed to parse timestamp: ..."                             | Message skipped, error logged | `messages_errored`    |
| Missing topic metadata   | "no topic found in message metadata"                         | Message skipped, error logged | `messages_errored`    |
| Invalid topic format     | "invalid topic structure, expected at least 4 parts: ..."    | Message skipped, error logged | `messages_errored`    |
| Invalid UMH prefix       | "invalid UMH topic prefix, expected 'umh.v1': ..."           | Message skipped, error logged | `messages_errored`    |
| Missing data contract    | "no data contract found in topic: ..."                       | Message skipped, error logged | `messages_errored`    |
| Missing location path    | "missing location path in topic: ..."                        | Message skipped, error logged | `messages_errored`    |
| Recursion depth exceeded | "Maximum recursion depth of 10 reached, stopping flattening" | Flattening stopped at limit   | `recursion_limit_hit` |
| Too many tags            | "Message exceeds maximum tag limit of 1000, truncating"      | Processing stopped at limit   | `tag_limit_exceeded`  |

All error messages include the original message content and specific details about what went wrong, making it easy to identify and fix issues with non-standard messages.

## Metrics

The processor exposes comprehensive metrics for monitoring:

| Metric                | Type    | Description                             |
| --------------------- | ------- | --------------------------------------- |
| `messages_processed`  | Counter | Total input messages processed          |
| `messages_errored`    | Counter | Messages that failed processing         |
| `messages_expanded`   | Counter | Total output messages created           |
| `messages_dropped`    | Counter | Messages dropped due to configuration   |
| `recursion_limit_hit` | Counter | Times recursion depth limit was reached |
| `tag_limit_exceeded`  | Counter | Times tag limit was exceeded            |

These metrics can be used to monitor the health of the processor and identify patterns of non-standard messages that need attention.

## Complete Integration Example

Here's a complete Benthos configuration for migrating from Classic to Core format:

```yaml
input:
  kafka:
    addresses: ['kafka:9092']
    topics: ['umh.v1.+.+.+._historian.+']
    consumer_group: historian-to-core-migration

pipeline:
  processors:
    - classic_to_core: {}

output:
  # Use UNS output for seamless integration
  uns_output: {}
```

**This configuration:**

1. Consumes all Classic `_historian` topics
2. Converts them to Core format using the `_raw` data contract
3. Publishes to individual Core topics via `uns_output`
4. Uses `umh_topic` metadata for automatic topic routing
5. Preserves all metadata for downstream processing

## Troubleshooting

### Common Issues

**No output messages**

* Check that input has valid JSON with timestamp\_ms field
* Verify topic metadata is present
* Ensure input topics follow Classic format: `umh.v1.<location>._historian.<context>`

**Missing fields**

* Check logs for parsing errors
* Ensure fields are not nested beyond maximum recursion depth (10 levels)

**Wrong topics**

* Validate input topic format
* Check `target_data_contract` configuration
* Verify UMH v1 topic structure

**Timestamp errors**

* Check that timestamp\_ms field contains numeric values (int, float, or string numbers)
* Ensure timestamp values are valid Unix timestamps
