diff --git a/events/kafka.go b/events/kafka.go index b61499c4..12d17091 100644 --- a/events/kafka.go +++ b/events/kafka.go @@ -3,9 +3,10 @@ package events type KafkaEvent struct { - EventSource string `json:"eventSource"` - EventSourceARN string `json:"eventSourceArn"` - Records map[string][]KafkaRecord `json:"records"` + EventSource string `json:"eventSource"` + EventSourceARN string `json:"eventSourceArn"` + Records map[string][]KafkaRecord `json:"records"` + BootstrapServers string `json:"bootstrapServers"` } type KafkaRecord struct { @@ -16,4 +17,5 @@ type KafkaRecord struct { TimestampType string `json:"timestampType"` Key string `json:"key,omitempty"` Value string `json:"value,omitempty"` + Headers []map[string][]byte `json:"headers"` } diff --git a/events/kafka_test.go b/events/kafka_test.go index bea50b32..27fdd91c 100644 --- a/events/kafka_test.go +++ b/events/kafka_test.go @@ -20,21 +20,25 @@ func TestKafkaEventMarshaling(t *testing.T) { t.Errorf("could not unmarshal event. details: %v", err) } + assert.Equal(t, inputEvent.BootstrapServers, "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092") + assert.Equal(t, inputEvent.EventSource, "aws:kafka") + assert.Equal(t, inputEvent.EventSourceARN, "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4") for _, records := range inputEvent.Records { for _, record := range records { utc := record.Timestamp.UTC() assert.Equal(t, 2020, utc.Year()) + assert.Equal(t, record.Key, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj") + assert.Equal(t, record.Value, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj") + + for _, header := range record.Headers { + for key, value := range header { + assert.Equal(t, key, "headerKey") + var headerValue string = string(value) + assert.Equal(t, headerValue, "headerValue") + } + } } } - - // 3. serialize to JSON - outputJson, err := json.Marshal(inputEvent) - if err != nil { - t.Errorf("could not marshal event. details: %v", err) - } - - // 4. check result - assert.JSONEq(t, string(inputJson), string(outputJson)) } func TestKafkaMarshalingMalformedJson(t *testing.T) { diff --git a/events/testdata/kafka-event.json b/events/testdata/kafka-event.json index d940e57c..beec3cd6 100644 --- a/events/testdata/kafka-event.json +++ b/events/testdata/kafka-event.json @@ -1,6 +1,7 @@ { "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4", + "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records": { "AWSKafkaTopic-0": [ { @@ -10,7 +11,24 @@ "timestamp": 1595035749700, "timestampType": "CREATE_TIME", "key": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj", - "value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj" + "value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj", + "headers": [ + { + "headerKey": [ + 104, + 101, + 97, + 100, + 101, + 114, + 86, + 97, + 108, + 117, + 101 + ] + } + ] } ] }