A high-performance, indexer service for the LFX V2 platform that processes resource transactions into OpenSearch with comprehensive NATS message processing and queue group load balancing.
The LFX V2 Indexer Service is responsible for:
- Message Processing: NATS stream processing with queue group load balancing across multiple instances
- Transaction Enrichment: JWT authentication, data validation, and principal parsing with delegation support
- Search Indexing: OpenSearch document indexing with optimistic concurrency control
- Data Consistency: Event-driven janitor service for conflict resolution
- Dual Format Support: Both LFX v2 (past-tense actions) and legacy v1 (present-tense actions) message formats
- Health Monitoring: Kubernetes-ready health check endpoints
- Go 1.24
- NATS Server (message streaming)
- OpenSearch/Elasticsearch (document indexing)
- Heimdall JWT Service (authentication)
# Core Services (Required)
export NATS_URL=nats://nats:4222
export OPENSEARCH_URL=http://localhost:9200
export JWKS_URL=http://localhost:4457/.well-known/jwks
# Message Processing
export NATS_QUEUE=lfx.indexer.queue
export OPENSEARCH_INDEX=resources
# Optional Configuration
export LOG_LEVEL=info
export PORT=8080
# Install dependencies
go mod download
# Development mode
make run
# Build and run
make build-local
./bin/lfx-indexer
# Direct Go commands
go run ./cmd/lfx-indexer
go build -o bin/lfx-indexer ./cmd/lfx-indexer
# Command-line options
./bin/lfx-indexer -help
./bin/lfx-indexer -check-config
# Service configuration
./bin/lfx-indexer -p 9090 # Custom health check port
./bin/lfx-indexer --bind 0.0.0.0 # Bind to all interfaces
./bin/lfx-indexer -d # Enable debug logging
# Feature toggles
./bin/lfx-indexer --nojanitor # Disable janitor service (overrides JANITOR_ENABLED)
./bin/lfx-indexer --simple-health # Use simple 'OK' health responses
# Utilities
./bin/lfx-indexer --help # Show help message
Note: CLI flags have highest precedence: CLI flags > Environment variables > Defaults
# Kubernetes probes
curl http://localhost:8080/livez # Liveness probe
curl http://localhost:8080/readyz # Readiness probe
curl http://localhost:8080/health # General health
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LFX Indexer Service β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Entry Points (cmd/ - Standard Go Layout) β
β ββ cmd/lfx-indexer/main.go - Pure dependency injection β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Presentation Layer (NATS Protocol + Health Checks) β
β ββ IndexingMessageHandler - Unified V2/V1 message processing β
β ββ BaseMessageHandler - Shared NATS response logic β
β ββ HealthHandler - Kubernetes health probes β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Application Layer (Orchestration) β
β ββ MessageProcessor - Complete workflow coordination β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Domain Layer (Business Logic) β
β ββ IndexerService - Consolidated transaction + health logic β
β ββ Contracts Package - Pure domain interfaces & entities β
β ββ Repository Interfaces - Clean abstractions β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Infrastructure Layer (External Services) β
β ββ MessagingRepository - NATS client with queue groups β
β ββ StorageRepository - OpenSearch client β
β ββ AuthRepository - JWT validation (Heimdall integration) β
β ββ CleanupRepository - Background cleanup operations β
β ββ Container - Pure dependency injection β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββΌβββββββββββββββ
β External Dependencies β
β β
β NATS ββ OpenSearch ββ JWT β
β β
βββββββββββββββββββββββββββββ
graph TB
%% External message sources
V2_APPS["V2 Applications<br/>(Projects API, Orgs API, etc.)"]
V1_APPS["V1 Legacy Applications<br/>(Platform DB Events)"]
%% NATS Infrastructure
NATS_SERVER["NATS Server<br/>nats://nats:4222"]
V2_SUBJECT["V2 Subject: lfx.index.*<br/>(lfx.index.project, lfx.index.organization)"]
V1_SUBJECT["V1 Subject: lfx.v1.index.*<br/>(lfx.v1.index.project)"]
QUEUE_GROUP["Queue Group: lfx.indexer.queue<br/>(Load Balancing)"]
%% Service Instances
INSTANCE1["LFX Indexer Instance 1"]
INSTANCE2["LFX Indexer Instance 2"]
INSTANCE3["LFX Indexer Instance N..."]
%% Clean Architecture Layers
MAIN_GO["cmd/lfx-indexer/main.go<br/>(Entry Point & DI)"]
CONTAINER["Container<br/>(Dependency Injection)"]
MSG_REPO["MessagingRepository<br/>(NATS Wrapper)"]
%% Presentation Layer
UNIFIED_HANDLER["IndexingMessageHandler<br/>(Unified V2 + V1 Messages)"]
HEALTH_HANDLER["HealthHandler<br/>(Kubernetes Probes)"]
%% Application Layer
MESSAGE_PROCESSOR["MessageProcessor<br/>(Workflow Coordination)"]
%% Domain Layer
INDEXER_SVC["IndexerService<br/>(Business Logic + Health + Action Validation)"]
CONTRACTS_PKG["Contracts Package<br/>(Pure Domain Interfaces & Entities)"]
TRANSACTION_ENTITY["LFXTransaction Entity<br/>(Pure Data Structure)"]
%% Infrastructure Layer
STORAGE_REPO["StorageRepository<br/>(OpenSearch Client)"]
AUTH_REPO["AuthRepository<br/>(JWT Validation)"]
OPENSEARCH["OpenSearch Cluster<br/>resources index"]
%% Cleanup System
CLEANUP_REPO["CleanupRepository<br/>(Background Cleanup)"]
%% External Services
JWT_SERVICE["Heimdall JWT Service<br/>(Token Validation)"]
%% Message Flow
V2_APPS -->|"Publish V2 Messages"| V2_SUBJECT
V1_APPS -->|"Publish V1 Messages"| V1_SUBJECT
V2_SUBJECT --> NATS_SERVER
V1_SUBJECT --> NATS_SERVER
NATS_SERVER -->|"Queue Group Distribution"| QUEUE_GROUP
QUEUE_GROUP --> INSTANCE1
QUEUE_GROUP --> INSTANCE2
QUEUE_GROUP --> INSTANCE3
%% Service Architecture Flow
INSTANCE1 --> MAIN_GO
MAIN_GO --> CONTAINER
CONTAINER --> MSG_REPO
CONTAINER --> HEALTH_HANDLER
MSG_REPO -->|"Both V2 + V1"| UNIFIED_HANDLER
%% Processing Flow
UNIFIED_HANDLER --> MESSAGE_PROCESSOR
MESSAGE_PROCESSOR --> INDEXER_SVC
MESSAGE_PROCESSOR --> CLEANUP_REPO
INDEXER_SVC --> CONTRACTS_PKG
INDEXER_SVC --> TRANSACTION_ENTITY
INDEXER_SVC --> AUTH_REPO
INDEXER_SVC --> STORAGE_REPO
CONTRACTS_PKG --> TRANSACTION_ENTITY
STORAGE_REPO --> OPENSEARCH
AUTH_REPO --> JWT_SERVICE
%% Styling
classDef application fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
classDef domain fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef infrastructure fill:#e8f5e8,stroke:#388e3c,stroke-width:2px
classDef external fill:#fff3e0,stroke:#f57c00,stroke-width:2px
class MESSAGE_PROCESSOR,UNIFIED_HANDLER application
class INDEXER_SVC,CONTRACTS_PKG,TRANSACTION_ENTITY domain
class CONTAINER,MSG_REPO,STORAGE_REPO,AUTH_REPO,CLEANUP_REPO infrastructure
class V2_APPS,V1_APPS,JWT_SERVICE,OPENSEARCH,NATS_SERVER external
sequenceDiagram
participant NATS as NATS Server
participant MR as MessagingRepository
participant UH as IndexingMessageHandler
participant MP as MessageProcessor
participant IS as IndexerService
participant AR as AuthRepository
participant JWT as JWT Service
participant TE as LFXTransaction
participant SR as StorageRepository
participant OS as OpenSearch
participant CR as CleanupRepository
Note over NATS,CR: Message Processing Flow
%% Message Arrival & Presentation Layer
NATS->>MR: NATS Message arrives<br/>Subject: lfx.index.project OR lfx.v1.index.project<br/>Queue: lfx.indexer.queue
MR->>UH: HandleWithReply(ctx, data, subject, reply)
Note over UH: Route based on subject prefix<br/>(V2 vs V1 format)
%% Application Layer Coordination
UH->>MP: ProcessIndexingMessage(ctx, data, subject)
Note over MP: Generate messageID<br/>Log reception metrics
%% Transaction Creation & Validation
MP->>TE: createTransaction(data, subject)
TE->>TE: Parse message format<br/>Extract object type from subject<br/>Validate required fields (pure data)
TE-->>MP: LFXTransaction entity (clean data structure)
%% Domain Layer Business Logic
MP->>IS: ProcessTransaction(ctx, transaction, index)
Note over IS: Consolidated processing:<br/>β’ EnrichTransaction() with action validation<br/>β’ GenerateTransactionBody()<br/>β’ Index document<br/>(Action helpers moved to service layer)
%% Authentication & Authorization
IS->>AR: ParsePrincipals(ctx, headers)
AR->>JWT: ValidateToken(ctx, token)
JWT-->>AR: Principal{Principal, Email}
AR-->>IS: []Principal with delegation support
%% Data Enrichment & Validation
IS->>IS: EnrichTransaction()<br/>ValidateObjectType()<br/>GenerateTransactionBody()
%% Document Indexing
IS->>SR: Index(ctx, index, docID, body)
SR->>OS: POST /resources/_doc/{docID}<br/>with optimistic concurrency
alt Successful Index
OS-->>SR: 201 Created
SR-->>IS: Success with DocumentID
%% Background Conflict Resolution
IS-->>MP: ProcessingResult{Success: true, DocumentID}
MP->>CR: CheckItem(documentID)
Note over CR: Background cleanup<br/>Event-driven processing
else Index Conflict/Error
OS-->>SR: 409 Conflict / 4xx Error
SR-->>IS: Error with details
IS-->>MP: ProcessingResult{Success: false, Error}
end
%% Response Handling
MP-->>UH: Processing result
alt Success
UH->>UH: reply([]byte("OK"))
Note over UH: Log success metrics
else Error
UH->>UH: reply([]byte("ERROR: details"))
Note over UH: Log error with context
end
UH-->>MR: Acknowledge message
MR-->>NATS: Message processed
# Required Services
NATS_URL=nats://nats:4222 # NATS server URL
OPENSEARCH_URL=http://localhost:9200 # OpenSearch endpoint
JWKS_URL=http://localhost:4457/.well-known/jwks # JWT validation endpoint
# Message Processing
NATS_INDEXING_SUBJECT=lfx.index.> # V2 subject pattern
NATS_V1_INDEXING_SUBJECT=lfx.v1.index.> # V1 subject pattern
NATS_QUEUE=lfx.indexer.queue # Queue group name
OPENSEARCH_INDEX=resources # OpenSearch index name
# NATS Connection Settings
NATS_MAX_RECONNECTS=10 # Max reconnection attempts
NATS_RECONNECT_WAIT=2s # Wait time between reconnects
NATS_CONNECTION_TIMEOUT=10s # Initial connection timeout
# JWT Configuration
JWT_ISSUER=heimdall # JWT issuer validation
JWT_AUDIENCES=["audience1","audience2"] # Allowed audiences (JSON array)
JWT_CLOCK_SKEW=6h # Clock skew tolerance
# Server Configuration
PORT=8080 # Health check server port
LOG_LEVEL=info # Logging level (debug,info,warn,error)
LOG_FORMAT=text # Log format (text,json)
# Cleanup Service (Background Cleanup)
JANITOR_ENABLED=true # Enable cleanup service (default: true)
Subject Pattern | Purpose | Example | Load Balancing |
---|---|---|---|
lfx.index.* |
V2 resource indexing | lfx.index.project , lfx.index.organization |
Queue group distribution |
lfx.v1.index.* |
V1 legacy support | lfx.v1.index.project |
Queue group distribution |
Queue Group: lfx.indexer.queue
- Load Balancing: Automatic distribution across service instances
- Durability: Messages processed exactly once per queue group
- Fault Tolerance: Failed instances don't lose messages
βββ cmd/ # Application entry points (Standard Go Layout)
β βββ lfx-indexer/ # Main indexer service
β βββ main.go # Service entry point & dependency injection
β βββ cli.go # CLI command handling
β βββ server.go # HTTP server setup
βββ internal/ # Private application code
β βββ application/ # Application layer (use cases)
β β βββ message_processor.go # Message processing coordination
β β βββ message_processor_test.go # Application layer tests
β βββ domain/ # Domain layer (business logic)
β β βββ contracts/ # Domain contracts/interfaces & entities
β β β βββ auth.go # Auth repository interface
β β β βββ cleanup.go # Cleanup repository interface
β β β βββ messaging.go # Messaging repository interface
β β β βββ storage.go # Storage repository interface
β β β βββ transaction.go # Transaction interfaces & entity types (pure data)
β β βββ services/ # Domain services
β β βββ indexer_service.go # Core indexer business logic + action validation
β β βββ indexer_service_test.go # Domain service tests
β βββ infrastructure/ # Infrastructure layer
β β βββ auth/ # Authentication
β β β βββ auth_repository.go # JWT validation implementation
β β β βββ auth_repository_test.go # Auth tests
β β βββ config/ # Configuration management
β β β βββ app_config.go # Application configuration
β β β βββ cli_config.go # CLI configuration
β β βββ cleanup/ # Background cleanup operations
β β β βββ cleanup_repository.go # Background cleanup repository
β β β βββ cleanup_repository_test.go # Cleanup tests
β β βββ messaging/ # NATS messaging
β β β βββ messaging_repository.go # NATS client wrapper
β β β βββ messaging_repository_test.go # Messaging tests
β β βββ storage/ # OpenSearch storage
β β βββ storage_repository.go # OpenSearch client wrapper
β β βββ storage_repository_test.go # Storage tests
β βββ presentation/ # Presentation layer
β β βββ handlers/ # Message and HTTP handlers
β β βββ health_handler.go # Kubernetes health check endpoints
β β βββ health_handler_test.go # Health handler tests
β β βββ indexing_message_handler.go # NATS message handler
β β βββ indexing_message_handler_test.go # Handler tests
β βββ enrichers/ # Data enrichment utilities
β β βββ default_enricher.go # Default enrichment with configurable options
β β βββ project_enricher.go # Project-specific enrichment
β β βββ project_settings_enricher.go # Project settings enrichment
β β βββ committee_enricher.go # Committee-specific enrichment
β β βββ committee_settings_enricher.go # Committee settings enrichment
β β βββ registry.go # Enricher registry
β βββ container/ # Dependency injection
β β βββ container.go # DI container implementation
β β βββ container_test.go # Container tests
β βββ mocks/ # Mock implementations
β βββ repositories.go # Repository mocks for testing
βββ pkg/ # Public packages (reusable)
β βββ constants/ # Shared constants
β β βββ app.go # Application constants
β β βββ auth.go # Authentication constants
β β βββ errors.go # Error constants
β β βββ health.go # Health check constants
β β βββ messaging.go # Messaging constants
β βββ logging/ # Logging utilities
β βββ logger.go # Logger implementation
β βββ logger_test.go # Logger tests
β βββ testing.go # Test logging utilities
βββ deployment/ # Deployment configurations
β βββ deployment.yaml # Kubernetes deployment
βββ Dockerfile # Container definition
βββ Makefile # Build automation
βββ go.mod # Go module definition
βββ go.sum # Go module checksums
βββ run.sh # Development run script
βββ README.md # This file
Layer | Components | Responsibilities |
---|---|---|
Entry Point | cmd/lfx-indexer/main.go |
Pure application startup and dependency injection |
Presentation | IndexingMessageHandler , HealthHandler |
NATS protocol concerns, message parsing, response handling, health checks |
Application | MessageProcessor |
Workflow coordination, use case orchestration |
Domain | IndexerService , Contracts Package , LFXTransaction Entity |
Business logic, action validation, pure domain data structures, repository interfaces |
Infrastructure | Container , MessagingRepository , StorageRepository , AuthRepository , CleanupRepository |
External service integration, data persistence, event-driven processing |
The LFX V2 Indexer Service includes a powerful data enrichment system that transforms raw transaction data into search-optimized documents with access control, metadata, and full-text search capabilities.
The enricher system uses a configurable option pattern that allows for flexible customization while maintaining code reuse:
// Base enricher with configurable behavior
enricher := newDefaultEnricher(
constants.ObjectTypeCommittee,
WithAccessControl(customAccessControlFunction),
WithNameAndAliases(customNameExtractionFunction),
WithPublicFlag(customPublicFlagFunction),
WithParentReferences(customParentReferenceFunction),
)
// Input data
{
"uid": "committee-123",
"name": "Technical Steering Committee"
}
// Enriched output
{
"object_id": "committee-123",
"object_type": "committee",
"public": false,
"access_check_object": "committee:committee-123",
"access_check_relation": "auditor", // Committee-settings default
"history_check_object": "committee:committee-123",
"history_check_relation": "writer",
"sort_name": "Technical Steering Committee",
"name_and_aliases": ["Technical Steering Committee"],
"fulltext": "Technical Steering Committee"
}
To create a custom enricher, you define methods on your enricher struct and use the option pattern to override specific behaviors:
type CustomEnricher struct {
defaultEnricher Enricher
}
// Implement the Enricher interface
func (e *CustomEnricher) ObjectType() string {
return e.defaultEnricher.ObjectType()
}
func (e *CustomEnricher) EnrichData(body *contracts.TransactionBody, transaction *contracts.LFXTransaction) error {
return e.defaultEnricher.EnrichData(body, transaction)
}
// Custom access control method
func (e *CustomEnricher) setAccessControl(body *contracts.TransactionBody, data map[string]any, objectType, objectID string) {
// Custom logic - override specific defaults
if accessCheckRelation, ok := data["accessCheckRelation"].(string); ok {
body.AccessCheckRelation = accessCheckRelation
} else if _, exists := data["accessCheckRelation"]; !exists {
body.AccessCheckRelation = "custom-role" // Your custom default
}
// Keep standard logic for other fields or customize as needed
if _, exists := data["accessCheckObject"]; !exists {
body.AccessCheckObject = fmt.Sprintf("%s:%s", objectType, objectID)
}
// ... handle other access control fields
}
// Constructor using method reference and option pattern
func NewCustomEnricher() Enricher {
enricher := &CustomEnricher{}
enricher.defaultEnricher = newDefaultEnricher(
constants.ObjectTypeCustom,
WithAccessControl(enricher.setAccessControl), // Method reference
)
return enricher
}
Option | Function Signature | Purpose |
---|---|---|
WithAccessControl |
func(body, data, objectType, objectID) |
Override access control logic |
WithNameAndAliases |
func(data) []string |
Override name/alias extraction |
WithPublicFlag |
func(data) bool |
Override public flag logic |
WithParentReferences |
func(body, data, objectType) |
Override parent reference logic |
To add a new enricher:
- Create the enricher struct:
type MyCustomEnricher struct {
defaultEnricher Enricher
}
- Implement the Enricher interface:
func (e *MyCustomEnricher) ObjectType() string {
return e.defaultEnricher.ObjectType()
}
func (e *MyCustomEnricher) EnrichData(body *contracts.TransactionBody, transaction *contracts.LFXTransaction) error {
return e.defaultEnricher.EnrichData(body, transaction)
}
- Add custom behavior (optional):
func (e *MyCustomEnricher) customMethod(body *contracts.TransactionBody, data map[string]any, objectType, objectID string) {
// Your custom logic
}
- Create constructor with options:
func NewMyCustomEnricher() Enricher {
enricher := &MyCustomEnricher{}
enricher.defaultEnricher = newDefaultEnricher(
constants.ObjectTypeCustom,
WithAccessControl(enricher.customMethod),
)
return enricher
}
- Register in the enricher registry:
// In registry.go
registry.Register(NewMyCustomEnricher())
This pattern allows you to:
- Reuse default enrichment logic
- Override specific behaviors as needed
- Extend with custom methods and logic
- Maintain consistency across enrichers
- Test individual components easily
# Local development
make build-local # Build for current OS
make run # Run with environment setup
make fmt # Format code
make lint # Run linters
make test # Run tests
# Production build
make build # Build for Linux (deployment)
make docker-build # Build container image
make docker-push # Push to registry
# Testing by layer
make test-domain # Domain layer tests
make test-application # Application layer tests
make test-infrastructure # Infrastructure layer tests
make test-presentation # Presentation layer tests
make coverage # Generate coverage report
# Test V2 message format
nats pub lfx.index.project '{
"action": "created",
"headers": {"Authorization": "Bearer token"},
"data": {
"id": "test-project-123",
"uid": "test-project-123",
"name": "Test Project",
"slug": "test-project",
"public": true
}
}' --server nats://your-nats-server:4222
# Test V1 message format
nats pub lfx.v1.index.project '{
"action": "create",
"data": {
"id": "test-project-456",
"name": "Legacy Project"
},
"v1_data": {
"legacy_field": "legacy_value"
}
}' --server nats://your-nats-server:4222
# Monitor processing
nats sub "lfx.index.>" --server nats://your-nats-server:4222
1. NATS Connection Failures
# Check NATS connectivity
nats server check --server $NATS_URL
# Verify queue configuration
nats consumer info lfx.indexer.queue --server $NATS_URL
# Test basic pub/sub
nats pub test.subject "hello" --server $NATS_URL
nats sub test.subject --server $NATS_URL
2. OpenSearch Issues
# Check OpenSearch health
curl $OPENSEARCH_URL/_cluster/health
# Verify index exists
curl $OPENSEARCH_URL/resources/_mapping
# Check recent documents
curl $OPENSEARCH_URL/resources/_search?size=5&sort=@timestamp:desc
3. JWT Validation Failures
# Check Heimdall connectivity
curl $JWKS_URL
# Enable debug logging for auth details
LOG_LEVEL=debug ./bin/lfx-indexer
Health Endpoints:
curl http://localhost:8080/livez # Kubernetes liveness
curl http://localhost:8080/readyz # Kubernetes readiness
curl http://localhost:8080/health # General health status
Debug Configuration:
# Enable comprehensive debugging
export LOG_LEVEL=debug
export LOG_FORMAT=json
# Monitor message processing
LOG_LEVEL=debug make run
JWT Authentication:
- Heimdall Integration: All messages validated against JWT service
- Multiple Audiences: Configurable audience validation
- Principal Parsing: Authorization header and X-On-Behalf-Of delegation support
- Machine User Detection:
clients@
prefix identification
Input Validation:
- Domain Validation: LFXTransaction entity validates all inputs
- Subject Format Validation: String prefix validation with enhanced error messages
- Data Structure Validation: Comprehensive JSON schema validation
Error Handling:
- Safe Error Messages: No sensitive data leakage
- Structured Logging: Detailed error context for debugging
- Graceful Degradation: Continue processing on non-critical failures
# Install with Make
make helm-install
# Uninstall with Make
make helm-uninstall
# Manual installation with custom namespace/values
helm install lfx-indexer ./charts/lfx-v2-indexer-service \
--namespace lfx \
--create-namespace \
--values custom-values.yaml
# Manual upgrade
helm upgrade lfx-indexer ./charts/lfx-v2-indexer-service \
--namespace lfx
# Build container
make docker-build
# Run container
docker run -p 8080:8080 \
-e NATS_URL=nats://nats:4222 \
-e OPENSEARCH_URL=http://opensearch:9200 \
-e JWKS_URL=http://heimdall:4457/.well-known/jwks \
lfx-v2-indexer-service:latest
This project uses dual licensing to cover different types of content:
The source code in this project is licensed under the MIT License. See the LICENSE file for the complete license text.
Copyright: The Linux Foundation and each contributor to LFX.
The documentation in this project is licensed under the Creative Commons Attribution 4.0 International License. See the LICENSE-docs file for the complete license text.
For information about reporting security vulnerabilities, please see our SECURITY.md file.
Code review and maintenance responsibilities are defined in the CODEOWNERS file.