Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package main

import (
"context"
"net"
"net/http"
"time"
Expand Down Expand Up @@ -34,6 +35,7 @@ import (
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/queryfrontend"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/server/http/middleware"
"github.com/thanos-io/thanos/pkg/tenancy"
Expand Down Expand Up @@ -395,8 +397,55 @@ func runQueryFrontend(
})
}

// Periodically check downstream URL to ensure it is reachable.
{
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {

var firstRun = true
for {
if !firstRun {
select {
case <-ctx.Done():
return nil
case <-time.After(10 * time.Second):
}
}

timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

readinessUrl := cfg.DownstreamURL + "/-/ready"
req, err := http.NewRequestWithContext(timeoutCtx, http.MethodGet, readinessUrl, nil)
if err != nil {
return errors.Wrap(err, "creating request to downstream URL")
}

resp, err := roundTripper.RoundTrip(req)
if err != nil {
level.Warn(logger).Log("msg", "failed to reach downstream URL", "err", err, "readiness_url", readinessUrl)
statusProber.NotReady(err)
firstRun = false
continue
}
runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "downstream health check response body")

if resp.StatusCode/100 == 4 || resp.StatusCode/100 == 5 {
level.Warn(logger).Log("msg", "downstream URL returned an error", "status_code", resp.StatusCode, "readiness_url", readinessUrl)
statusProber.NotReady(errors.Errorf("downstream URL %s returned an error: %d", readinessUrl, resp.StatusCode))
firstRun = false
continue
}

statusProber.Ready()
}
}, func(err error) {
cancel()
})
}

level.Info(logger).Log("msg", "starting query frontend")
statusProber.Ready()
return nil
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/server/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
type Server struct {
logger log.Logger
comp component.Component
prober *prober.HTTPProbe

mux *http.ServeMux
srv *http.Server
Expand Down Expand Up @@ -62,7 +61,6 @@ func New(logger log.Logger, reg *prometheus.Registry, comp component.Component,
return &Server{
logger: log.With(logger, "service", "http/server", "component", comp.String()),
comp: comp,
prober: prober,
mux: mux,
srv: &http.Server{Addr: options.listen, Handler: h},
opts: options,
Expand Down
157 changes: 51 additions & 106 deletions test/e2e/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ package e2e_test

import (
"context"
"crypto/sha256"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"reflect"
"regexp"
"sort"
Expand All @@ -21,9 +20,8 @@ import (
"github.com/efficientgo/e2e"
e2emon "github.com/efficientgo/e2e/monitoring"
"github.com/efficientgo/e2e/monitoring/matchers"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
Expand All @@ -33,6 +31,7 @@ import (
"github.com/thanos-io/thanos/pkg/cacheutil"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/queryfrontend"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)
Expand Down Expand Up @@ -925,108 +924,6 @@ func TestInstantQueryShardingWithRandomData(t *testing.T) {
}
}

func TestQueryFrontendTenantForward(t *testing.T) {
t.Parallel()

tests := []struct {
name string
customTenantHeaderName string
tenantName string
}{
{
name: "default tenant header name with a tenant name",
customTenantHeaderName: tenancy.DefaultTenantHeader,
tenantName: "test-tenant",
},
{
name: "default tenant header name without a tenant name",
customTenantHeaderName: tenancy.DefaultTenantHeader,
},
{
name: "custom tenant header name with a tenant name",
customTenantHeaderName: "X-Foobar-Tenant",
tenantName: "test-tenant",
},
{
name: "custom tenant header name without a tenant name",
customTenantHeaderName: "X-Foobar-Tenant",
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
if tc.tenantName == "" {
tc.tenantName = tenancy.DefaultTenant
}
// Use a shorthash of tc.name as e2e env name because the random name generator is having a collision for
// some reason.
e2ename := fmt.Sprintf("%x", sha256.Sum256([]byte(tc.name)))[:8]
e, err := e2e.New(e2e.WithName(e2ename))
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
// The tenant header present in the outgoing request should be the default tenant header.
testutil.Equals(t, tc.tenantName, r.Header.Get(tenancy.DefaultTenantHeader))

// In case the query frontend is configured with a custom tenant header name, verify such header
// is not present in the outgoing request.
if tc.customTenantHeaderName != tenancy.DefaultTenantHeader {
testutil.Equals(t, "", r.Header.Get(tc.customTenantHeaderName))
}

// Verify the outgoing request will keep the X-Scope-OrgID header for compatibility with Cortex.
testutil.Equals(t, tc.tenantName, r.Header.Get("X-Scope-OrgID"))
}))
t.Cleanup(ts.Close)
tsPort := urlParse(t, ts.URL).Port()

inMemoryCacheConfig := queryfrontend.CacheProviderConfig{
Type: queryfrontend.INMEMORY,
Config: queryfrontend.InMemoryResponseCacheConfig{
MaxSizeItems: 1000,
Validity: time.Hour,
},
}
queryFrontendConfig := queryfrontend.Config{
TenantHeader: tc.customTenantHeaderName,
}
queryFrontend := e2ethanos.NewQueryFrontend(
e,
"qfe",
fmt.Sprintf("http://%s:%s", e.HostAddr(), tsPort),
queryFrontendConfig,
inMemoryCacheConfig,
)
testutil.Ok(t, e2e.StartAndWaitReady(queryFrontend))

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)

promClient, err := api.NewClient(api.Config{
Address: "http://" + queryFrontend.Endpoint("http"),
RoundTripper: tenantRoundTripper{
tenant: tc.tenantName,
rt: http.DefaultTransport,
},
})
testutil.Ok(t, err)
v1api := v1.NewAPI(promClient)

r := v1.Range{
Start: time.Now().Add(-time.Hour),
End: time.Now(),
Step: time.Minute,
}

_, _, _ = v1api.QueryRange(ctx, "rate(prometheus_tsdb_head_samples_appended_total[5m])", r)
_, _, _ = v1api.Query(ctx, "rate(prometheus_tsdb_head_samples_appended_total[5m])", time.Now())
})
}
}

type tenantRoundTripper struct {
tenant string
tenantHeader string
Expand Down Expand Up @@ -1210,3 +1107,51 @@ func TestQueryFrontendAnalyze(t *testing.T) {

require.Equal(t, true, r.MatchString(strings.TrimSpace(string(body))))
}

func TestQueryFrontendReadyOnlyIfDownstreamIsAvailable(t *testing.T) {
t.Parallel()

e, err := e2e.NewDockerEnvironment("qfe-analyze")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

q := e2ethanos.NewQuerierBuilder(e, "1").Init()
qfe := e2ethanos.NewQueryFrontend(e, "1", "http://"+q.InternalEndpoint("http"), queryfrontend.Config{}, queryfrontend.CacheProviderConfig{
Type: queryfrontend.INMEMORY,
})
testutil.Ok(t, qfe.Start())

l := log.NewLogfmtLogger(os.Stdout)

timeoutCtx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
t.Cleanup(cancel)

require.NoError(t, runutil.RetryWithLog(l, 1*time.Second, timeoutCtx.Done(), func() error {
resp, err := http.Get(fmt.Sprintf("http://%s/-/ready", qfe.Endpoint("http")))
if err != nil {
return err
}
t.Cleanup(func() { require.NoError(t, resp.Body.Close()) })

if resp.StatusCode/100 == 2 {
return fmt.Errorf("expected query frontend to be not ready, but it is ready")
}
return nil
}))

testutil.Ok(t, e2e.StartAndWaitReady(q))

require.NoError(t, runutil.RetryWithLog(l, 1*time.Second, timeoutCtx.Done(), func() error {
resp, err := http.Get(fmt.Sprintf("http://%s/-/ready", qfe.Endpoint("http")))
if err != nil {
return err
}
t.Cleanup(func() { require.NoError(t, resp.Body.Close()) })

if resp.StatusCode/100 != 2 {
return fmt.Errorf("expected query frontend to be ready (%d), but it is not ready", resp.StatusCode)
}
return nil
}))

}
Loading