Skip to content

WIP on intake api v2 #930

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ https://github.com/elastic/apm-server/compare/4daa36bd5c144cf9182afc62dc8042af66
- Fix issue preventing server from being stopped {pull}704[704].
- Limit the amount of concurrent requests being processed {pull}731[731].
- Return proper response code for request entity too large {pull}862[862].
- Make APM Server docker image listen on all interfaces by default https://github.com/elastic/apm-server-docker/pull/16[apm-server-dockers#16]

==== Added

Expand Down
134 changes: 105 additions & 29 deletions beater/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/subtle"
"encoding/json"
"expvar"
"fmt"
"net/http"
"regexp"
"strings"
Expand All @@ -18,11 +19,12 @@ import (

conf "github.com/elastic/apm-server/config"
"github.com/elastic/apm-server/decoder"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/processor"
perr "github.com/elastic/apm-server/processor/error"
"github.com/elastic/apm-server/processor/healthcheck"
"github.com/elastic/apm-server/processor/sourcemap"
"github.com/elastic/apm-server/processor/transaction"

"github.com/elastic/apm-server/utility"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
Expand All @@ -35,10 +37,13 @@ const (
FrontendErrorsURL = "/v1/client-side/errors"
HealthCheckURL = "/healthcheck"
SourcemapsURL = "/v1/client-side/sourcemaps"
StreamBackendURL = "/v2/intake"

rateLimitCacheSize = 1000
rateLimitBurstMultiplier = 2

v2TransformBatchSize = 100

supportedHeaders = "Content-Type, Content-Encoding, Accept"
supportedMethods = "POST, OPTIONS"
)
Expand All @@ -58,6 +63,10 @@ type serverResponse struct {
counter *monitoring.Int
}

func (s *serverResponse) IsError() bool {
return s.err != nil
}

var (
serverMetrics = monitoring.Default.NewRegistry("apm-server.server", monitoring.PublishExpvar)
counter = func(s string) *monitoring.Int {
Expand Down Expand Up @@ -105,25 +114,33 @@ var (
errors.New("timeout waiting to be processed"), http.StatusServiceUnavailable, counter("response.errors.concurrency"),
}
fullQueueCounter = counter("response.errors.queue")
fullQueueResponse = func(err error) serverResponse {
return serverResponse{
errors.New("queue is full"), http.StatusServiceUnavailable, fullQueueCounter,
}
fullQueueResponse = serverResponse{
errors.New("queue is full"), http.StatusServiceUnavailable, fullQueueCounter,
}
serverShuttingDownCounter = counter("response.errors.closed")
serverShuttingDownResponse = func(err error) serverResponse {
serverShuttingDownResponse = serverResponse{
errors.New("server is shutting down"), http.StatusServiceUnavailable, serverShuttingDownCounter,
}

invalidContentTypeCoutner = counter("response.errors.contenttype")
invalidContentType = func(err error) serverResponse {
return serverResponse{
errors.New("server is shutting down"), http.StatusServiceUnavailable, serverShuttingDownCounter,
err, http.StatusBadRequest, serverShuttingDownCounter,
}
}

errHeaderMissing = errors.New("header must be first object in stream")

Routes = map[string]routeMapping{
BackendTransactionsURL: {backendHandler, transaction.NewProcessor},
FrontendTransactionsURL: {frontendHandler, transaction.NewProcessor},
BackendErrorsURL: {backendHandler, perr.NewProcessor},
FrontendErrorsURL: {frontendHandler, perr.NewProcessor},
HealthCheckURL: {healthCheckHandler, healthcheck.NewProcessor},
SourcemapsURL: {sourcemapHandler, sourcemap.NewProcessor},

StreamBackendURL: {streamBackendHandler, nil},

BackendErrorsURL: {backendHandler, perr.NewProcessor},
FrontendErrorsURL: {frontendHandler, perr.NewProcessor},
HealthCheckURL: {healthCheckHandler, nil},
SourcemapsURL: {sourcemapHandler, sourcemap.NewProcessor},
}
)

Expand Down Expand Up @@ -163,19 +180,29 @@ func concurrencyLimitHandler(beaterConfig *Config, h http.Handler) http.Handler
}

func backendHandler(pf ProcessorFactory, beaterConfig *Config, report reporter) http.Handler {
extractors := []decoder.Extractor{}
if beaterConfig.AugmentEnabled {
extractors = append(extractors, decoder.SystemExtractor)
}

return logHandler(
concurrencyLimitHandler(beaterConfig,
authHandler(beaterConfig.SecretToken,
processRequestHandler(pf, conf.Config{}, report,
decoder.DecodeSystemData(decoder.DecodeLimitJSONData(beaterConfig.MaxUnzippedSize), beaterConfig.AugmentEnabled)))))
processRequestHandler(pf, conf.TransformConfig{}, report, extractors,
decoder.DecodeLimitJSONData(beaterConfig.MaxUnzippedSize)))))
}

func frontendHandler(pf ProcessorFactory, beaterConfig *Config, report reporter) http.Handler {
extractors := []decoder.Extractor{}
if beaterConfig.AugmentEnabled {
extractors = append(extractors, decoder.UserExtractor)
}

smapper, err := beaterConfig.Frontend.memoizedSmapMapper()
if err != nil {
logp.NewLogger("handler").Error(err.Error())
}
config := conf.Config{
config := conf.TransformConfig{
SmapMapper: smapper,
LibraryPattern: regexp.MustCompile(beaterConfig.Frontend.LibraryPattern),
ExcludeFromGrouping: regexp.MustCompile(beaterConfig.Frontend.ExcludeFromGrouping),
Expand All @@ -185,8 +212,23 @@ func frontendHandler(pf ProcessorFactory, beaterConfig *Config, report reporter)
concurrencyLimitHandler(beaterConfig,
ipRateLimitHandler(beaterConfig.Frontend.RateLimit,
corsHandler(beaterConfig.Frontend.AllowOrigins,
processRequestHandler(pf, config, report,
decoder.DecodeUserData(decoder.DecodeLimitJSONData(beaterConfig.MaxUnzippedSize), beaterConfig.AugmentEnabled)))))))
ensureContentTypeHandler("application/json",
processRequestHandler(pf, config, report, extractors,
decoder.DecodeLimitJSONData(beaterConfig.MaxUnzippedSize))))))))
}

func streamBackendHandler(_ ProcessorFactory, beaterConfig *Config, report reporter) http.Handler {
extractors := []decoder.Extractor{
decoder.SystemExtractor,
}

requestDecodeer := decoder.StreamDecodeLimitJSONData(beaterConfig.MaxUnzippedSize)

return logHandler(
concurrencyLimitHandler(beaterConfig,
authHandler(beaterConfig.SecretToken,
ensureContentTypeHandler("application/ndjson",
processStreamRequest(v2TransformBatchSize, conf.TransformConfig{}, report, extractors, requestDecodeer)))))
}

func sourcemapHandler(pf ProcessorFactory, beaterConfig *Config, report reporter) http.Handler {
Expand All @@ -197,7 +239,7 @@ func sourcemapHandler(pf ProcessorFactory, beaterConfig *Config, report reporter
return logHandler(
killSwitchHandler(beaterConfig.Frontend.isEnabled(),
authHandler(beaterConfig.SecretToken,
processRequestHandler(pf, conf.Config{SmapMapper: smapper}, report, decoder.DecodeSourcemapFormData))))
processRequestHandler(pf, conf.TransformConfig{SmapMapper: smapper}, report, []decoder.Extractor{}, decoder.DecodeSourcemapFormData))))
}

func healthCheckHandler(_ ProcessorFactory, _ *Config, _ reporter) http.Handler {
Expand Down Expand Up @@ -346,14 +388,25 @@ func corsHandler(allowedOrigins []string, h http.Handler) http.Handler {
})
}

func processRequestHandler(pf ProcessorFactory, config conf.Config, report reporter, decode decoder.Decoder) http.Handler {
func ensureContentTypeHandler(expectedContentType string, h http.Handler) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
res := processRequest(r, pf, config, report, decode)
actualContentType := r.Header.Get("Content-Type")
if !strings.Contains(actualContentType, expectedContentType) {
sendStatus(w, r, invalidContentType(fmt.Errorf("invalid content type: %s", r.Header.Get("Content-Type"))))
return
}
h.ServeHTTP(w, r)
})
}

func processRequestHandler(pf ProcessorFactory, config conf.TransformConfig, report reporter, extractors []decoder.Extractor, decode func(req *http.Request) (map[string]interface{}, error)) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
res := processRequest(r, pf, config, report, extractors, decode)
sendStatus(w, r, res)
})
}

func processRequest(r *http.Request, pf ProcessorFactory, config conf.Config, report reporter, decode decoder.Decoder) serverResponse {
func processRequest(r *http.Request, pf ProcessorFactory, config conf.TransformConfig, report reporter, extractors []decoder.Extractor, decode func(req *http.Request) (map[string]interface{}, error)) serverResponse {
processor := pf()

if r.Method != "POST" {
Expand All @@ -369,25 +422,45 @@ func processRequest(r *http.Request, pf ProcessorFactory, config conf.Config, re

}

if err = processor.Validate(data); err != nil {
return cannotValidateResponse(err)
transformables, tctx, response := ProcessPayload(data, processor)
if response.err != nil {
return response
}

payload, err := processor.Decode(data)
if err != nil {
return cannotDecodeResponse(err)
req := pendingReq{
transformable: transformables,
config: config,
context: tctx,
}

if err = report(pendingReq{payload: payload, config: config}); err != nil {
if err = report(req); err != nil {
if strings.Contains(err.Error(), "publisher is being stopped") {
return serverShuttingDownResponse(err)
return serverShuttingDownResponse
}
return fullQueueResponse(err)
return fullQueueResponse
}

return acceptedResponse
}

func ProcessPayload(data map[string]interface{}, p processor.Processor) (model.TransformableBatch, *model.TransformContext, serverResponse) {
var err error
if err = p.Validate(data); err != nil {
return nil, nil, cannotValidateResponse(err)
}

transformationContext, err := model.DecodeContext(data, err)
if err != nil {
return nil, nil, cannotDecodeResponse(err)
}

transformables, err := p.Decode(data)
if err != nil {
return nil, nil, cannotDecodeResponse(err)
}
return transformables, transformationContext, okResponse
}

func sendStatus(w http.ResponseWriter, r *http.Request, res serverResponse) {
contentType := "text/plain; charset=utf-8"
if acceptsJSON(r) {
Expand All @@ -397,7 +470,10 @@ func sendStatus(w http.ResponseWriter, r *http.Request, res serverResponse) {
w.WriteHeader(res.code)

responseCounter.Inc()
res.counter.Inc()
if res.counter != nil {
res.counter.Inc()
}

if res.err == nil {
responseSuccesses.Inc()
return
Expand Down
12 changes: 8 additions & 4 deletions beater/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/pkg/errors"

"github.com/elastic/apm-server/config"
pr "github.com/elastic/apm-server/processor"
"github.com/elastic/apm-server/model"
"github.com/elastic/beats/libbeat/beat"
)

Expand All @@ -27,8 +27,9 @@ type publisher struct {
}

type pendingReq struct {
payload pr.Payload
config config.Config
transformable []model.Transformable
config config.TransformConfig
context *model.TransformContext
}

var (
Expand Down Expand Up @@ -102,6 +103,9 @@ func (p *publisher) Send(req pendingReq) error {

func (p *publisher) run() {
for req := range p.pendingRequests {
p.client.PublishAll(req.payload.Transform(req.config))
for _, transformable := range req.transformable {
// todo: is this threadsafe?
p.client.Publish(transformable.Transform(req.config, req.context))
}
}
}
Loading