Classic to Core Processor
The classic_to_core processor converts UMH Historian schema format messages into Core format, following the "one tag, one message, one topic" principle. This processor is essential for migrating from Classic historian schemas to Core data architecture.
Overview
The Classic to Core Processor transforms single messages containing multiple values and timestamps into separate Core format messages, each containing a single value. It handles nested tag groups by flattening them into intuitive dot-notation paths and reconstructs topics according to Core conventions.
When to Use
Use the classic_to_core processor when you need to:
- Process Classic Data in Core: Convert Classic UMH Historian schema messages to Core format for processing in Core-based systems 
- Use Core Processors: Enable the use of processors that rely on the Core data model (like downsampler) 
Quick Start
pipeline:
  processors:
    - classic_to_core: {}Configuration Options
target_data_contract
string
""
Target data contract. If empty, uses input's schema
Message Transformation
Basic Conversion
The processor transforms single messages with multiple values into separate Core format messages:
Input (Historian Schema):
- Topic: - umh.v1.acme._historian.weather
- Payload: 
{
  "timestamp_ms": 1717083000000,
  "temperature": 23.4,
  "humidity": 42.1
}- target_data_contract: "_raw" 
Output (Core Format):
Message 1:
- Topic: - umh.v1.acme._raw.weather.temperature
- Payload: - {"value": 23.4, "timestamp_ms": 1717083000000}
- Metadata: - umh_topic: umh.v1.acme._raw.weather.temperature
Message 2:
- Topic: - umh.v1.acme._raw.weather.humidity
- Payload: - {"value": 42.1, "timestamp_ms": 1717083000000}
- Metadata: - umh_topic: umh.v1.acme._raw.weather.humidity
Tag Groups (Nested Objects)
The processor flattens nested tag groups using dot notation:
Input (Historian Schema with Tag Groups):
- Topic: - umh.v1.acme._historian.cnc-mill
- Payload: 
{
  "timestamp_ms": 1670001234567,
  "pos": {
    "x": 12.5,
    "y": 7.3,
    "z": 3.2
  },
  "temperature": 50.0,
  "collision": false
}- target_data_contract: "_raw" 
Output (Core Format with Flattened Tags):
The processor creates 5 separate messages:
- Topic: - umh.v1.acme._raw.cnc-mill.pos.x- Payload: - {"value": 12.5, "timestamp_ms": 1670001234567}
 
- Topic: - umh.v1.acme._raw.cnc-mill.pos.y- Payload: - {"value": 7.3, "timestamp_ms": 1670001234567}
 
- Topic: - umh.v1.acme._raw.cnc-mill.pos.z- Payload: - {"value": 3.2, "timestamp_ms": 1670001234567}
 
- Topic: - umh.v1.acme._raw.cnc-mill.temperature- Payload: - {"value": 50.0, "timestamp_ms": 1670001234567}
 
- Topic: - umh.v1.acme._raw.cnc-mill.collision- Payload: - {"value": false, "timestamp_ms": 1670001234567}
 
Array Handling
Arrays are converted to string representation to ensure UMH-Core compliance (scalar values only):
Input (Historian Schema with Arrays):
- Topic: - umh.v1.factory._historian.sensors
- Payload: 
{
  "timestamp_ms": 1670001234567,
  "measurements": [23.4, 24.1, 22.8],
  "sensor_ids": ["temp1", "temp2", "temp3"],
  "status_flags": [true, false, true],
  "mixed_data": ["sensor", 42, true, null]
}- target_data_contract: "_raw" 
Output (Core Format with Array Conversion):
The processor creates 4 separate messages with arrays converted to strings:
- Topic: - umh.v1.factory._raw.sensors.measurements- Payload: - {"value": "[23.4 24.1 22.8]", "timestamp_ms": 1670001234567}
 
- Topic: - umh.v1.factory._raw.sensors.sensor_ids- Payload: - {"value": "[temp1 temp2 temp3]", "timestamp_ms": 1670001234567}
 
- Topic: - umh.v1.factory._raw.sensors.status_flags- Payload: - {"value": "[true false true]", "timestamp_ms": 1670001234567}
 
- Topic: - umh.v1.factory._raw.sensors.mixed_data- Payload: - {"value": "[sensor 42 true <nil>]", "timestamp_ms": 1670001234567}
 
Array Conversion Rules:
- Arrays are converted to space-separated string format: - [item1 item2 item3]
- Empty arrays become - []
- Single-item arrays become - [item]
- All array elements are converted using Go's - fmt.Sprintf("%v", item)format
- Null values in arrays appear as - <nil>
- Arrays within nested objects are also converted to strings 
Advanced Configuration
Using Input Schema
When target_data_contract is not specified, the processor uses the input's schema:
classic_to_core:
  # target_data_contract not specified - uses input's schemaInput:
- Topic: - umh.v1.acme._historian.weather
Output:
- Topic: - umh.v1.acme._historian.weather.pressure
- Maintains the original - _historianschema
Topic Transformation
The processor parses Classic topics and reconstructs them for Core format:
Prefix
umh.v1
umh.v1
Unchanged
Location Path
enterprise.plant1.area
enterprise.plant1.area
Unchanged
Schema
_historian
_raw (configurable)
Updated based on configuration
Context
weather
weather
Becomes virtual_path
Field Name
N/A
temperature
Added for each field
Example Transformation:
- Input Topic: - umh.v1.enterprise.plant1.area._historian.weather
- Output Topic: - umh.v1.enterprise.plant1.area._raw.weather.temperature
Metadata Handling
The processor sets the following metadata fields:
Generated Metadata Fields:
- topic: The new Core topic
- umh_topic: Same as topic (enables direct use with- uns_output)
- location_path: Extracted from original topic
- data_contract: The target data contract (or input's schema if not specified)
- tag_name: The field name
- virtual_path: Original context (if present)
Metadata Preservation: Original metadata is always preserved alongside new fields.
Performance & Reliability
Built-in Safeguards
The processor includes several safeguards for production use:
- Recursion Limit: Maximum recursion depth of 10 levels for flattening nested tag groups 
- Message Size Limit: Maximum of 1000 tags per message to prevent memory exhaustion 
- Topic Validation: Strict UMH v1 topic format validation with proper error handling 
- Comprehensive Metrics: Tracks processing counts, errors, and limit violations for monitoring 
Performance Considerations
- Message Expansion: Each input message creates N output messages (N = number of data fields) 
- Memory Usage: Metadata is copied for each output message 
- Processing Overhead: Minimal - efficient string parsing with optimized allocations 
Error Handling
The processor includes comprehensive error handling and logging for non-standard messages. Each error condition is logged with detailed information to help diagnose issues:
Invalid JSON
"failed to parse as structured data: ..."
Message skipped, error logged
messages_errored
Missing timestamp_ms
"timestamp field 'timestamp_ms' not found in payload"
Message skipped, error logged
messages_errored
Invalid timestamp format
"failed to parse timestamp: ..."
Message skipped, error logged
messages_errored
Missing topic metadata
"no topic found in message metadata"
Message skipped, error logged
messages_errored
Invalid topic format
"invalid topic structure, expected at least 4 parts: ..."
Message skipped, error logged
messages_errored
Invalid UMH prefix
"invalid UMH topic prefix, expected 'umh.v1': ..."
Message skipped, error logged
messages_errored
Missing data contract
"no data contract found in topic: ..."
Message skipped, error logged
messages_errored
Missing location path
"missing location path in topic: ..."
Message skipped, error logged
messages_errored
Recursion depth exceeded
"Maximum recursion depth of 10 reached, stopping flattening"
Flattening stopped at limit
recursion_limit_hit
Too many tags
"Message exceeds maximum tag limit of 1000, truncating"
Processing stopped at limit
tag_limit_exceeded
All error messages include the original message content and specific details about what went wrong, making it easy to identify and fix issues with non-standard messages.
Metrics
The processor exposes comprehensive metrics for monitoring:
messages_processed
Counter
Total input messages processed
messages_errored
Counter
Messages that failed processing
messages_expanded
Counter
Total output messages created
messages_dropped
Counter
Messages dropped due to configuration
recursion_limit_hit
Counter
Times recursion depth limit was reached
tag_limit_exceeded
Counter
Times tag limit was exceeded
These metrics can be used to monitor the health of the processor and identify patterns of non-standard messages that need attention.
Complete Integration Example
Here's a complete Benthos configuration for migrating from Classic to Core format:
input:
  kafka:
    addresses: ['kafka:9092']
    topics: ['umh.v1.+.+.+._historian.+']
    consumer_group: historian-to-core-migration
pipeline:
  processors:
    - classic_to_core: {}
output:
  # Use UNS output for seamless integration
  uns_output: {}This configuration:
- Consumes all Classic - _historiantopics
- Converts them to Core format using the - _rawdata contract
- Publishes to individual Core topics via - uns_output
- Uses - umh_topicmetadata for automatic topic routing
- Preserves all metadata for downstream processing 
Troubleshooting
Common Issues
No output messages
- Check that input has valid JSON with timestamp_ms field 
- Verify topic metadata is present 
- Ensure input topics follow Classic format: - umh.v1.<location>._historian.<context>
Missing fields
- Check logs for parsing errors 
- Ensure fields are not nested beyond maximum recursion depth (10 levels) 
Wrong topics
- Validate input topic format 
- Check - target_data_contractconfiguration
- Verify UMH v1 topic structure 
Timestamp errors
- Check that timestamp_ms field contains numeric values (int, float, or string numbers) 
- Ensure timestamp values are valid Unix timestamps 
Last updated

