Classic to Core Processor
The classic_to_core
processor converts UMH Historian schema format messages into Core format, following the "one tag, one message, one topic" principle. This processor is essential for migrating from Classic historian schemas to Core data architecture.
Overview
The Classic to Core Processor transforms single messages containing multiple values and timestamps into separate Core format messages, each containing a single value. It handles nested tag groups by flattening them into intuitive dot-notation paths and reconstructs topics according to Core conventions.
When to Use
Use the classic_to_core
processor when you need to:
Process Classic Data in Core: Convert Classic UMH Historian schema messages to Core format for processing in Core-based systems
Use Core Processors: Enable the use of processors that rely on the Core data model (like downsampler)
Quick Start
pipeline:
processors:
- classic_to_core: {}
Configuration Options
target_data_contract
string
""
Target data contract. If empty, uses input's schema
Message Transformation
Basic Conversion
The processor transforms single messages with multiple values into separate Core format messages:
Input (Historian Schema):
Topic:
umh.v1.acme._historian.weather
Payload:
{
"timestamp_ms": 1717083000000,
"temperature": 23.4,
"humidity": 42.1
}
target_data_contract: "_raw"
Output (Core Format):
Message 1:
Topic:
umh.v1.acme._raw.weather.temperature
Payload:
{"value": 23.4, "timestamp_ms": 1717083000000}
Metadata:
umh_topic: umh.v1.acme._raw.weather.temperature
Message 2:
Topic:
umh.v1.acme._raw.weather.humidity
Payload:
{"value": 42.1, "timestamp_ms": 1717083000000}
Metadata:
umh_topic: umh.v1.acme._raw.weather.humidity
Tag Groups (Nested Objects)
The processor flattens nested tag groups using dot notation:
Input (Historian Schema with Tag Groups):
Topic:
umh.v1.acme._historian.cnc-mill
Payload:
{
"timestamp_ms": 1670001234567,
"pos": {
"x": 12.5,
"y": 7.3,
"z": 3.2
},
"temperature": 50.0,
"collision": false
}
target_data_contract: "_raw"
Output (Core Format with Flattened Tags):
The processor creates 5 separate messages:
Topic:
umh.v1.acme._raw.cnc-mill.pos.x
Payload:
{"value": 12.5, "timestamp_ms": 1670001234567}
Topic:
umh.v1.acme._raw.cnc-mill.pos.y
Payload:
{"value": 7.3, "timestamp_ms": 1670001234567}
Topic:
umh.v1.acme._raw.cnc-mill.pos.z
Payload:
{"value": 3.2, "timestamp_ms": 1670001234567}
Topic:
umh.v1.acme._raw.cnc-mill.temperature
Payload:
{"value": 50.0, "timestamp_ms": 1670001234567}
Topic:
umh.v1.acme._raw.cnc-mill.collision
Payload:
{"value": false, "timestamp_ms": 1670001234567}
Array Handling
Arrays are converted to string representation to ensure UMH-Core compliance (scalar values only):
Input (Historian Schema with Arrays):
Topic:
umh.v1.factory._historian.sensors
Payload:
{
"timestamp_ms": 1670001234567,
"measurements": [23.4, 24.1, 22.8],
"sensor_ids": ["temp1", "temp2", "temp3"],
"status_flags": [true, false, true],
"mixed_data": ["sensor", 42, true, null]
}
target_data_contract: "_raw"
Output (Core Format with Array Conversion):
The processor creates 4 separate messages with arrays converted to strings:
Topic:
umh.v1.factory._raw.sensors.measurements
Payload:
{"value": "[23.4 24.1 22.8]", "timestamp_ms": 1670001234567}
Topic:
umh.v1.factory._raw.sensors.sensor_ids
Payload:
{"value": "[temp1 temp2 temp3]", "timestamp_ms": 1670001234567}
Topic:
umh.v1.factory._raw.sensors.status_flags
Payload:
{"value": "[true false true]", "timestamp_ms": 1670001234567}
Topic:
umh.v1.factory._raw.sensors.mixed_data
Payload:
{"value": "[sensor 42 true <nil>]", "timestamp_ms": 1670001234567}
Array Conversion Rules:
Arrays are converted to space-separated string format:
[item1 item2 item3]
Empty arrays become
[]
Single-item arrays become
[item]
All array elements are converted using Go's
fmt.Sprintf("%v", item)
formatNull values in arrays appear as
<nil>
Arrays within nested objects are also converted to strings
Advanced Configuration
Using Input Schema
When target_data_contract
is not specified, the processor uses the input's schema:
classic_to_core:
# target_data_contract not specified - uses input's schema
Input:
Topic:
umh.v1.acme._historian.weather
Output:
Topic:
umh.v1.acme._historian.weather.pressure
Maintains the original
_historian
schema
Topic Transformation
The processor parses Classic topics and reconstructs them for Core format:
Prefix
umh.v1
umh.v1
Unchanged
Location Path
enterprise.plant1.area
enterprise.plant1.area
Unchanged
Schema
_historian
_raw
(configurable)
Updated based on configuration
Context
weather
weather
Becomes virtual_path
Field Name
N/A
temperature
Added for each field
Example Transformation:
Input Topic:
umh.v1.enterprise.plant1.area._historian.weather
Output Topic:
umh.v1.enterprise.plant1.area._raw.weather.temperature
Metadata Handling
The processor sets the following metadata fields:
Generated Metadata Fields:
topic
: The new Core topicumh_topic
: Same as topic (enables direct use withuns_output
)location_path
: Extracted from original topicdata_contract
: The target data contract (or input's schema if not specified)tag_name
: The field namevirtual_path
: Original context (if present)
Metadata Preservation: Original metadata is always preserved alongside new fields.
Performance & Reliability
Built-in Safeguards
The processor includes several safeguards for production use:
Recursion Limit: Maximum recursion depth of 10 levels for flattening nested tag groups
Message Size Limit: Maximum of 1000 tags per message to prevent memory exhaustion
Topic Validation: Strict UMH v1 topic format validation with proper error handling
Comprehensive Metrics: Tracks processing counts, errors, and limit violations for monitoring
Performance Considerations
Message Expansion: Each input message creates N output messages (N = number of data fields)
Memory Usage: Metadata is copied for each output message
Processing Overhead: Minimal - efficient string parsing with optimized allocations
Error Handling
The processor includes comprehensive error handling and logging for non-standard messages. Each error condition is logged with detailed information to help diagnose issues:
Invalid JSON
"failed to parse as structured data: ..."
Message skipped, error logged
messages_errored
Missing timestamp_ms
"timestamp field 'timestamp_ms' not found in payload"
Message skipped, error logged
messages_errored
Invalid timestamp format
"failed to parse timestamp: ..."
Message skipped, error logged
messages_errored
Missing topic metadata
"no topic found in message metadata"
Message skipped, error logged
messages_errored
Invalid topic format
"invalid topic structure, expected at least 4 parts: ..."
Message skipped, error logged
messages_errored
Invalid UMH prefix
"invalid UMH topic prefix, expected 'umh.v1': ..."
Message skipped, error logged
messages_errored
Missing data contract
"no data contract found in topic: ..."
Message skipped, error logged
messages_errored
Missing location path
"missing location path in topic: ..."
Message skipped, error logged
messages_errored
Recursion depth exceeded
"Maximum recursion depth of 10 reached, stopping flattening"
Flattening stopped at limit
recursion_limit_hit
Too many tags
"Message exceeds maximum tag limit of 1000, truncating"
Processing stopped at limit
tag_limit_exceeded
All error messages include the original message content and specific details about what went wrong, making it easy to identify and fix issues with non-standard messages.
Metrics
The processor exposes comprehensive metrics for monitoring:
messages_processed
Counter
Total input messages processed
messages_errored
Counter
Messages that failed processing
messages_expanded
Counter
Total output messages created
messages_dropped
Counter
Messages dropped due to configuration
recursion_limit_hit
Counter
Times recursion depth limit was reached
tag_limit_exceeded
Counter
Times tag limit was exceeded
These metrics can be used to monitor the health of the processor and identify patterns of non-standard messages that need attention.
Complete Integration Example
Here's a complete Benthos configuration for migrating from Classic to Core format:
input:
kafka:
addresses: ['kafka:9092']
topics: ['umh.v1.+.+.+._historian.+']
consumer_group: historian-to-core-migration
pipeline:
processors:
- classic_to_core: {}
output:
# Use UNS output for seamless integration
uns_output: {}
This configuration:
Consumes all Classic
_historian
topicsConverts them to Core format using the
_raw
data contractPublishes to individual Core topics via
uns_output
Uses
umh_topic
metadata for automatic topic routingPreserves all metadata for downstream processing
Troubleshooting
Common Issues
No output messages
Check that input has valid JSON with timestamp_ms field
Verify topic metadata is present
Ensure input topics follow Classic format:
umh.v1.<location>._historian.<context>
Missing fields
Check logs for parsing errors
Ensure fields are not nested beyond maximum recursion depth (10 levels)
Wrong topics
Validate input topic format
Check
target_data_contract
configurationVerify UMH v1 topic structure
Timestamp errors
Check that timestamp_ms field contains numeric values (int, float, or string numbers)
Ensure timestamp values are valid Unix timestamps
Last updated