UMH Topic Parser
The UMH Topic Parser is a high-performance Go package that provides parsing, validation, and construction utilities for UMH (Unified Manufacturing Hub) topics. It ensures that all topics conform to the UMH specification and can be safely used as Kafka message keys.
Overview
UMH topics follow a strict hierarchical structure that represents the physical and logical organization of manufacturing systems. This package centralizes topic handling across all UMH components, providing consistent validation and preventing bad data from entering the UNS.
Topic Structure
UMH topics follow this format:
umh.v1.<location_path>.<data_contract>[.<virtual_path>].<name>
Components:
location_path: 1-N hierarchical levels representing physical organization (enterprise.site.area.line...)
data_contract: Service/schema identifier starting with underscore (_historian, _analytics, etc.)
virtual_path: Optional logical grouping (axis.x.position, diagnostics.*, etc.)
name: Final identifier for the specific data point
Validation Rules
The package enforces these fixed rules:
Location path levels: MUST NOT start with underscore, at least 1 level required
Data contract: MUST start with underscore, cannot be just "_"
Virtual path: CAN start with underscore, optional
Name: CAN start with underscore, required
Kafka compatibility: Only
[a-zA-Z0-9._-]
characters allowedNo consecutive dots, leading dots, or trailing dots
Examples
Valid Topics
umh.v1.enterprise._historian.temperature
umh.v1.acme.berlin._historian.pressure
umh.v1.factory.line1.station2._raw.motor.diagnostics.vibration
umh.v1.plant._analytics.efficiency._kpi.oee
Invalid Topics
umh.v1._enterprise._historian.temp // location cannot start with _
umh.v1.factory.historian.temp // data contract must start with _
umh.v1.factory._._historian.temp // data contract cannot be just _
umh.v1.factory.._historian.temp // empty location level
umh.v1.factory._historian.temp@ture // invalid characters
Usage
Basic Parsing and Validation
import "github.com/united-manufacturing-hub/benthos-umh/pkg/umh/topic"
// Parse and validate a topic string
uns, err := topic.NewUnsTopic("umh.v1.enterprise.site._historian.temperature")
if err != nil {
log.Fatal(err)
}
// Use the topic as a Kafka key
kafkaKey := uns.AsKafkaKey()
// Access parsed components
info := uns.Info()
fmt.Println("Level0:", info.Level0) // "enterprise"
fmt.Println("Location Sublevels:", info.LocationSublevels) // ["site"]
fmt.Println("Data Contract:", info.DataContract) // "_historian"
fmt.Println("Name:", info.Name) // "temperature"
Programmatic Construction
// Build a simple topic
topic, err := topic.NewBuilder().
SetLevel0("enterprise").
SetDataContract("_historian").
SetName("temperature").
Build()
// Build a complex topic with all components
complexTopic, err := topic.NewBuilder().
SetLocationLevels("enterprise", "site", "area", "line").
SetDataContract("_historian").
SetVirtualPath("motor.axis.x").
SetName("position").
Build()
High-Performance Scenarios
For high-throughput scenarios, reuse builder instances:
builder := topic.NewBuilder()
for _, sensor := range sensors {
topic, err := builder.Reset().
SetLocationPath(sensor.Location).
SetDataContract("_historian").
SetName(sensor.Name).
Build()
if err != nil {
log.Printf("Invalid topic for sensor %s: %v", sensor.Name, err)
continue
}
// Process topic...
}
Location Path Handling
// Different ways to set location paths
builder := topic.NewBuilder()
// Method 1: Dot-separated string
builder.SetLocationPath("enterprise.site.area")
// Method 2: Individual levels
builder.SetLocationLevels("enterprise", "site", "area")
// Method 3: Incremental building
builder.SetLevel0("enterprise").
AddLocationLevel("site").
AddLocationLevel("area")
// Method 4: Get current path from builder
currentPath := builder.GetLocationPath() // Returns: "enterprise.site.area"
Performance Characteristics
The package is optimized for high-throughput scenarios:
Simple topic parsing: ~656ns per operation
Complex topic parsing: ~1322ns per operation
Simple topic construction: ~751ns per operation
Complex topic construction: ~1436ns per operation
Memory allocations: 3-5 allocations per topic creation
Read operations: <1ns (String(), Info(), AsKafkaKey() are essentially free)
Thread Safety
UnsTopic instances: Immutable and safe for concurrent use
Builder instances: NOT thread-safe (use separate builders per goroutine)
Package-level functions: Thread-safe
Integration Patterns
Message Processing
func processMessage(topicStr string, payload []byte) error {
topic, err := topic.NewUnsTopic(topicStr)
if err != nil {
return fmt.Errorf("invalid topic: %w", err)
}
// Use topic for routing, validation, etc.
return routeMessage(topic, payload)
}
Location-Based Routing
func routeByLocation(topic *topic.UnsTopic) string {
info := topic.Info()
totalLevels := 1 + len(info.LocationSublevels) // level0 + sublevels
switch totalLevels {
case 1:
return "enterprise-router"
case 2:
return "site-router"
default:
return "local-router"
}
}
Bulk Topic Generation
func createTopics(sensors []Sensor) ([]*topic.UnsTopic, error) {
builder := topic.NewBuilder()
topics := make([]*topic.UnsTopic, 0, len(sensors))
for _, sensor := range sensors {
topic, err := builder.Reset().
SetLocationPath(sensor.LocationPath).
SetDataContract("_historian").
SetName(sensor.Name).
Build()
if err != nil {
return nil, err
}
topics = append(topics, topic)
}
return topics, nil
}
Error Handling
The package provides detailed error messages for debugging:
topic, err := topic.NewUnsTopic("umh.v1._enterprise._historian.temp")
if err != nil {
fmt.Println(err) // "level0 cannot start with underscore"
}
Error messages include:
Specific validation rule that failed
Position information for parsing errors
Suggestions for common mistakes
Migration Guide
From Manual String Parsing
Before:
parts := strings.Split(topicStr, ".")
if len(parts) < 5 || !strings.HasPrefix(parts[0], "umh") {
return errors.New("invalid topic")
}
// ... manual validation logic
level0 := parts[2]
dataContract := parts[3] // This might be wrong!
After:
topic, err := topic.NewUnsTopic(topicStr)
if err != nil {
return err
}
info := topic.Info()
level0 := info.Level0
dataContract := info.DataContract // Correctly parsed
From Existing Validation Code
The topic parser replaces manual validation logic throughout the codebase:
topic_browser_plugin: Replace
topicToUNSInfo()
and manual parsinguns_output: Replace manual topic validation
tag_processor_plugin: Replace
constructUMHTopic()
with Builderclassic_to_core_plugin: Replace manual topic parsing
API Reference
Core Types
// UnsTopic represents a validated UMH topic
type UnsTopic struct { /* ... */ }
// TopicInfo contains parsed topic components (protobuf-generated)
// Located in pkg/umh/topic/proto package
type TopicInfo struct {
Level0 string // Enterprise level
LocationSublevels []string // Additional location levels
DataContract string // Data contract (_historian, etc.)
VirtualPath *string // Optional virtual path (pointer, can be nil)
Name string // Final name segment
Metadata map[string]string // Kafka header metadata
}
// Builder provides fluent topic construction
type Builder struct { /* ... */ }
Key Methods
// Parsing and validation
func NewUnsTopic(topic string) (*UnsTopic, error)
// Topic access
func (u *UnsTopic) String() string
func (u *UnsTopic) AsKafkaKey() string
func (u *UnsTopic) Info() *proto.TopicInfo
// Field access (direct field access, not methods)
info.Level0 // string
info.LocationSublevels // []string
info.DataContract // string
info.VirtualPath // *string (can be nil)
info.Name // string
info.Metadata // map[string]string
// Builder methods
func NewBuilder() *Builder
func (b *Builder) SetLevel0(level0 string) *Builder
func (b *Builder) SetLocationPath(path string) *Builder
func (b *Builder) SetLocationLevels(level0 string, additional ...string) *Builder
func (b *Builder) AddLocationLevel(level string) *Builder
func (b *Builder) SetDataContract(contract string) *Builder
func (b *Builder) SetVirtualPath(path string) *Builder
func (b *Builder) SetName(name string) *Builder
func (b *Builder) Build() (*UnsTopic, error)
func (b *Builder) BuildString() (string, error)
func (b *Builder) GetLocationPath() string
func (b *Builder) Reset() *Builder
Testing
The package includes comprehensive test coverage:
100+ test cases covering valid and invalid topics
Performance benchmarks for all operations
Concurrency tests for thread safety
Edge case validation for boundary conditions
Run tests:
cd pkg/umh/topic
go test -v
go test -bench=. -benchmem
Last updated