UMH Topic Parser
The UMH Topic Parser is a high-performance Go package that provides parsing, validation, and construction utilities for UMH (Unified Manufacturing Hub) topics. It ensures that all topics conform to the UMH specification and can be safely used as Kafka message keys.
Overview
UMH topics follow a strict hierarchical structure that represents the physical and logical organization of manufacturing systems. This package centralizes topic handling across all UMH components, providing consistent validation and preventing bad data from entering the UNS.
Topic Structure
UMH topics follow this format:
umh.v1.<location_path>.<data_contract>[.<virtual_path>].<name>Components:
- location_path: 1-N hierarchical levels representing physical organization (enterprise.site.area.line...) 
- data_contract: Service/schema identifier starting with underscore (_historian, _analytics, etc.) 
- virtual_path: Optional logical grouping (axis.x.position, diagnostics.*, etc.) 
- name: Final identifier for the specific data point 
Validation Rules
The package enforces these fixed rules:
- Location path levels: MUST NOT start with underscore, at least 1 level required 
- Data contract: MUST start with underscore, cannot be just "_" 
- Virtual path: CAN start with underscore, optional 
- Name: CAN start with underscore, required 
- Kafka compatibility: Only - [a-zA-Z0-9._-]characters allowed
- No consecutive dots, leading dots, or trailing dots 
Examples
Valid Topics
umh.v1.enterprise._historian.temperature
umh.v1.acme.berlin._historian.pressure
umh.v1.factory.line1.station2._raw.motor.diagnostics.vibration
umh.v1.plant._analytics.efficiency._kpi.oeeInvalid Topics
umh.v1._enterprise._historian.temp     // location cannot start with _
umh.v1.factory.historian.temp          // data contract must start with _
umh.v1.factory._._historian.temp       // data contract cannot be just _
umh.v1.factory.._historian.temp        // empty location level
umh.v1.factory._historian.temp@ture    // invalid charactersUsage
Basic Parsing and Validation
import "github.com/united-manufacturing-hub/benthos-umh/pkg/umh/topic"
// Parse and validate a topic string
uns, err := topic.NewUnsTopic("umh.v1.enterprise.site._historian.temperature")
if err != nil {
    log.Fatal(err)
}
// Use the topic as a Kafka key
kafkaKey := uns.AsKafkaKey()
// Access parsed components
info := uns.Info()
fmt.Println("Level0:", info.Level0)                          // "enterprise"
fmt.Println("Location Sublevels:", info.LocationSublevels)   // ["site"]
fmt.Println("Data Contract:", info.DataContract)             // "_historian"
fmt.Println("Name:", info.Name)                              // "temperature"Programmatic Construction
// Build a simple topic
topic, err := topic.NewBuilder().
    SetLevel0("enterprise").
    SetDataContract("_historian").
    SetName("temperature").
    Build()
// Build a complex topic with all components
complexTopic, err := topic.NewBuilder().
    SetLocationLevels("enterprise", "site", "area", "line").
    SetDataContract("_historian").
    SetVirtualPath("motor.axis.x").
    SetName("position").
    Build()High-Performance Scenarios
For high-throughput scenarios, reuse builder instances:
builder := topic.NewBuilder()
for _, sensor := range sensors {
    topic, err := builder.Reset().
        SetLocationPath(sensor.Location).
        SetDataContract("_historian").
        SetName(sensor.Name).
        Build()
    
    if err != nil {
        log.Printf("Invalid topic for sensor %s: %v", sensor.Name, err)
        continue
    }
    
    // Process topic...
}Location Path Handling
// Different ways to set location paths
builder := topic.NewBuilder()
// Method 1: Dot-separated string
builder.SetLocationPath("enterprise.site.area")
// Method 2: Individual levels
builder.SetLocationLevels("enterprise", "site", "area")
// Method 3: Incremental building
builder.SetLevel0("enterprise").
    AddLocationLevel("site").
    AddLocationLevel("area")
// Method 4: Get current path from builder
currentPath := builder.GetLocationPath() // Returns: "enterprise.site.area"Performance Characteristics
The package is optimized for high-throughput scenarios:
- Simple topic parsing: ~656ns per operation 
- Complex topic parsing: ~1322ns per operation 
- Simple topic construction: ~751ns per operation 
- Complex topic construction: ~1436ns per operation 
- Memory allocations: 3-5 allocations per topic creation 
- Read operations: <1ns (String(), Info(), AsKafkaKey() are essentially free) 
Thread Safety
- UnsTopic instances: Immutable and safe for concurrent use 
- Builder instances: NOT thread-safe (use separate builders per goroutine) 
- Package-level functions: Thread-safe 
Integration Patterns
Message Processing
func processMessage(topicStr string, payload []byte) error {
    topic, err := topic.NewUnsTopic(topicStr)
    if err != nil {
        return fmt.Errorf("invalid topic: %w", err)
    }
    // Use topic for routing, validation, etc.
    return routeMessage(topic, payload)
}Location-Based Routing
func routeByLocation(topic *topic.UnsTopic) string {
    info := topic.Info()
    totalLevels := 1 + len(info.LocationSublevels) // level0 + sublevels
    switch totalLevels {
    case 1:
        return "enterprise-router"
    case 2:
        return "site-router"
    default:
        return "local-router"
    }
}Bulk Topic Generation
func createTopics(sensors []Sensor) ([]*topic.UnsTopic, error) {
    builder := topic.NewBuilder()
    topics := make([]*topic.UnsTopic, 0, len(sensors))
    for _, sensor := range sensors {
        topic, err := builder.Reset().
            SetLocationPath(sensor.LocationPath).
            SetDataContract("_historian").
            SetName(sensor.Name).
            Build()
        if err != nil {
            return nil, err
        }
        topics = append(topics, topic)
    }
    return topics, nil
}Error Handling
The package provides detailed error messages for debugging:
topic, err := topic.NewUnsTopic("umh.v1._enterprise._historian.temp")
if err != nil {
    fmt.Println(err) // "level0 cannot start with underscore"
}Error messages include:
- Specific validation rule that failed 
- Position information for parsing errors 
- Suggestions for common mistakes 
Migration Guide
From Manual String Parsing
Before:
parts := strings.Split(topicStr, ".")
if len(parts) < 5 || !strings.HasPrefix(parts[0], "umh") {
    return errors.New("invalid topic")
}
// ... manual validation logic
level0 := parts[2]
dataContract := parts[3] // This might be wrong!After:
topic, err := topic.NewUnsTopic(topicStr)
if err != nil {
    return err
}
info := topic.Info()
level0 := info.Level0
dataContract := info.DataContract // Correctly parsedFrom Existing Validation Code
The topic parser replaces manual validation logic throughout the codebase:
- topic_browser_plugin: Replace - topicToUNSInfo()and manual parsing
- uns_output: Replace manual topic validation 
- tag_processor_plugin: Replace - constructUMHTopic()with Builder
- classic_to_core_plugin: Replace manual topic parsing 
API Reference
Core Types
// UnsTopic represents a validated UMH topic
type UnsTopic struct { /* ... */ }
// TopicInfo contains parsed topic components (protobuf-generated)
// Located in pkg/umh/topic/proto package
type TopicInfo struct {
    Level0            string   // Enterprise level
    LocationSublevels []string // Additional location levels
    DataContract      string   // Data contract (_historian, etc.)
    VirtualPath       *string  // Optional virtual path (pointer, can be nil)
    Name              string   // Final name segment
    Metadata          map[string]string // Kafka header metadata
}
// Builder provides fluent topic construction
type Builder struct { /* ... */ }Key Methods
// Parsing and validation
func NewUnsTopic(topic string) (*UnsTopic, error)
// Topic access
func (u *UnsTopic) String() string
func (u *UnsTopic) AsKafkaKey() string  
func (u *UnsTopic) Info() *proto.TopicInfo
// Field access (direct field access, not methods)
info.Level0                // string
info.LocationSublevels     // []string
info.DataContract          // string
info.VirtualPath           // *string (can be nil)
info.Name                  // string
info.Metadata              // map[string]string
// Builder methods
func NewBuilder() *Builder
func (b *Builder) SetLevel0(level0 string) *Builder
func (b *Builder) SetLocationPath(path string) *Builder
func (b *Builder) SetLocationLevels(level0 string, additional ...string) *Builder
func (b *Builder) AddLocationLevel(level string) *Builder
func (b *Builder) SetDataContract(contract string) *Builder
func (b *Builder) SetVirtualPath(path string) *Builder
func (b *Builder) SetName(name string) *Builder
func (b *Builder) Build() (*UnsTopic, error)
func (b *Builder) BuildString() (string, error)
func (b *Builder) GetLocationPath() string
func (b *Builder) Reset() *BuilderTesting
The package includes comprehensive test coverage:
- 100+ test cases covering valid and invalid topics 
- Performance benchmarks for all operations 
- Concurrency tests for thread safety 
- Edge case validation for boundary conditions 
Run tests:
cd pkg/umh/topic
go test -v
go test -bench=. -benchmemLast updated

