Skip to content

Commit 9c2fe03

Browse files
kakkoyunbwplotka
authored andcommitted
.*: Add new http-grace-period flag (#1680)
* Add new http-grace-period flag Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Update CHANGELOG Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Update docs Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Update pkg/server/http.go Co-Authored-By: Bartlomiej Plotka <bwplotka@gmail.com> Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com> * Rename initializer for HTTP server Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
1 parent 66b3d21 commit 9c2fe03

File tree

17 files changed

+226
-74
lines changed

17 files changed

+226
-74
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
1515

1616
- [#1660](https://github.com/thanos-io/thanos/pull/1660) Add a new `--prometheus.ready_timeout` CLI option to the sidecar to set how long to wait until Prometheus starts up.
1717
- [#1573](https://github.com/thanos-io/thanos/pull/1573) `AliYun OSS` object storage, see [documents](docs/storage.md#aliyun-oss-configuration) for further information.
18+
- [#1680](https://github.com/thanos-io/thanos/pull/1680) Add a new `--http-grace-period` CLI option to components which serve HTTP to set how long to wait until HTTP Server shuts down.
1819

1920
### Fixed
2021

cmd/thanos/compact.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/thanos-io/thanos/pkg/extflag"
14+
"github.com/thanos-io/thanos/pkg/server"
1415

1516
"github.com/go-kit/kit/log"
1617
"github.com/go-kit/kit/log/level"
@@ -79,6 +80,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
7980
Hidden().Default("false").Bool()
8081

8182
httpAddr := regHTTPAddrFlag(cmd)
83+
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
8284

8385
dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process compactions.").
8486
Default("./data").String()
@@ -116,6 +118,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
116118
m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
117119
return runCompact(g, logger, reg,
118120
*httpAddr,
121+
time.Duration(*httpGracePeriod),
119122
*dataDir,
120123
objStoreConfig,
121124
time.Duration(*consistencyDelay),
@@ -143,6 +146,7 @@ func runCompact(
143146
logger log.Logger,
144147
reg *prometheus.Registry,
145148
httpBindAddr string,
149+
httpGracePeriod time.Duration,
146150
dataDir string,
147151
objStoreConfig *extflag.PathOrContent,
148152
consistencyDelay time.Duration,
@@ -175,9 +179,12 @@ func runCompact(
175179

176180
statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
177181
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
178-
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, component); err != nil {
179-
return errors.Wrap(err, "schedule HTTP server with probes")
180-
}
182+
srv := server.NewHTTP(logger, reg, component, statusProber,
183+
server.WithListen(httpBindAddr),
184+
server.WithGracePeriod(httpGracePeriod),
185+
)
186+
187+
g.Add(srv.ListenAndServe, srv.Shutdown)
181188

182189
confContentYaml, err := objStoreConfig.Content()
183190
if err != nil {

cmd/thanos/downsample.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/thanos-io/thanos/pkg/extflag"
10+
"github.com/thanos-io/thanos/pkg/server"
1011

1112
"github.com/go-kit/kit/log"
1213
"github.com/go-kit/kit/log/level"
@@ -34,14 +35,15 @@ func registerDownsample(m map[string]setupFunc, app *kingpin.Application) {
3435
cmd := app.Command(comp.String(), "continuously downsamples blocks in an object store bucket")
3536

3637
httpAddr := regHTTPAddrFlag(cmd)
38+
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
3739

3840
dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings.").
3941
Default("./data").String()
4042

4143
objStoreConfig := regCommonObjStoreFlags(cmd, "", true)
4244

4345
m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
44-
return runDownsample(g, logger, reg, *httpAddr, *dataDir, objStoreConfig, comp)
46+
return runDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, comp)
4547
}
4648
}
4749

@@ -73,6 +75,7 @@ func runDownsample(
7375
logger log.Logger,
7476
reg *prometheus.Registry,
7577
httpBindAddr string,
78+
httpGracePeriod time.Duration,
7679
dataDir string,
7780
objStoreConfig *extflag.PathOrContent,
7881
comp component.Component,
@@ -123,9 +126,11 @@ func runDownsample(
123126
}
124127

125128
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
126-
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil {
127-
return errors.Wrap(err, "schedule HTTP server with probe")
128-
}
129+
srv := server.NewHTTP(logger, reg, comp, statusProber,
130+
server.WithListen(httpBindAddr),
131+
server.WithGracePeriod(httpGracePeriod),
132+
)
133+
g.Add(srv.ListenAndServe, srv.Shutdown)
129134

130135
level.Info(logger).Log("msg", "starting downsample node")
131136
return nil

cmd/thanos/flags.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@ import (
1010
"gopkg.in/alecthomas/kingpin.v2"
1111
)
1212

13+
func modelDuration(flags *kingpin.FlagClause) *model.Duration {
14+
value := new(model.Duration)
15+
flags.SetValue(value)
16+
17+
return value
18+
}
19+
1320
func regGRPCFlags(cmd *kingpin.CmdClause) (
1421
grpcBindAddr *string,
1522
grpcTLSSrvCert *string,
@@ -33,11 +40,8 @@ func regHTTPAddrFlag(cmd *kingpin.CmdClause) *string {
3340
return cmd.Flag("http-address", "Listen host:port for HTTP endpoints.").Default("0.0.0.0:10902").String()
3441
}
3542

36-
func modelDuration(flags *kingpin.FlagClause) *model.Duration {
37-
value := new(model.Duration)
38-
flags.SetValue(value)
39-
40-
return value
43+
func regHTTPGracePeriodFlag(cmd *kingpin.CmdClause) *model.Duration {
44+
return modelDuration(cmd.Flag("http-grace-period", "Time to wait after an interrupt received for HTTP Server.").Default("5s"))
4145
}
4246

4347
func regCommonObjStoreFlags(cmd *kingpin.CmdClause, suffix string, required bool, extraDesc ...string) *extflag.PathOrContent {

cmd/thanos/main.go

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ import (
88
"io"
99
"io/ioutil"
1010
"math"
11-
"net"
12-
"net/http"
13-
"net/http/pprof"
1411
"os"
1512
"os/signal"
1613
"path/filepath"
@@ -29,11 +26,7 @@ import (
2926
"github.com/opentracing/opentracing-go"
3027
"github.com/pkg/errors"
3128
"github.com/prometheus/client_golang/prometheus"
32-
"github.com/prometheus/client_golang/prometheus/promhttp"
3329
"github.com/prometheus/common/version"
34-
"github.com/thanos-io/thanos/pkg/component"
35-
"github.com/thanos-io/thanos/pkg/prober"
36-
"github.com/thanos-io/thanos/pkg/runutil"
3730
"github.com/thanos-io/thanos/pkg/store/storepb"
3831
"github.com/thanos-io/thanos/pkg/tracing"
3932
"github.com/thanos-io/thanos/pkg/tracing/client"
@@ -230,18 +223,6 @@ func interrupt(logger log.Logger, cancel <-chan struct{}) error {
230223
}
231224
}
232225

233-
func registerProfile(mux *http.ServeMux) {
234-
mux.HandleFunc("/debug/pprof/", pprof.Index)
235-
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
236-
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
237-
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
238-
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
239-
}
240-
241-
func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) {
242-
mux.Handle("/metrics", promhttp.HandlerFor(g, promhttp.HandlerOpts{}))
243-
}
244-
245226
func defaultGRPCTLSServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) {
246227
opts := []grpc.ServerOption{}
247228
tlsCfg, err := defaultTLSServerOpts(log.With(logger, "protocol", "gRPC"), cert, key, clientCA)
@@ -383,30 +364,3 @@ func newStoreGRPCServer(logger log.Logger, reg prometheus.Registerer, tracer ope
383364

384365
return s
385366
}
386-
387-
// scheduleHTTPServer starts a run.Group that servers HTTP endpoint with default endpoints providing Prometheus metrics,
388-
// profiling and liveness/readiness probes.
389-
func scheduleHTTPServer(g *run.Group, logger log.Logger, reg *prometheus.Registry, readinessProber *prober.Prober, httpBindAddr string, handler http.Handler, comp component.Component) error {
390-
mux := http.NewServeMux()
391-
registerMetrics(mux, reg)
392-
registerProfile(mux)
393-
readinessProber.RegisterInMux(mux)
394-
if handler != nil {
395-
mux.Handle("/", handler)
396-
}
397-
398-
l, err := net.Listen("tcp", httpBindAddr)
399-
if err != nil {
400-
return errors.Wrap(err, "listen metrics address")
401-
}
402-
403-
g.Add(func() error {
404-
level.Info(logger).Log("msg", "listening for requests and metrics", "component", comp.String(), "address", httpBindAddr)
405-
readinessProber.SetHealthy()
406-
return errors.Wrapf(http.Serve(l, mux), "serve %s and metrics", comp.String())
407-
}, func(err error) {
408-
readinessProber.SetNotHealthy(err)
409-
runutil.CloseWithLogOnErr(logger, l, "%s and metric listener", comp.String())
410-
})
411-
return nil
412-
}

cmd/thanos/query.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/thanos-io/thanos/pkg/query"
3232
v1 "github.com/thanos-io/thanos/pkg/query/api"
3333
"github.com/thanos-io/thanos/pkg/runutil"
34+
"github.com/thanos-io/thanos/pkg/server"
3435
"github.com/thanos-io/thanos/pkg/store"
3536
"github.com/thanos-io/thanos/pkg/tracing"
3637
"github.com/thanos-io/thanos/pkg/ui"
@@ -45,6 +46,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
4546
cmd := app.Command(comp.String(), "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")
4647

4748
httpBindAddr := regHTTPAddrFlag(cmd)
49+
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
4850
grpcBindAddr, srvCert, srvKey, srvClientCA := regGRPCFlags(cmd)
4951

5052
secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool()
@@ -140,6 +142,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
140142
*caCert,
141143
*serverName,
142144
*httpBindAddr,
145+
time.Duration(*httpGracePeriod),
143146
*webRoutePrefix,
144147
*webExternalPrefix,
145148
*webPrefixHeaderName,
@@ -222,6 +225,7 @@ func runQuery(
222225
caCert string,
223226
serverName string,
224227
httpBindAddr string,
228+
httpGracePeriod time.Duration,
225229
webRoutePrefix string,
226230
webExternalPrefix string,
227231
webPrefixHeaderName string,
@@ -376,9 +380,13 @@ func runQuery(
376380
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)
377381

378382
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
379-
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, router, comp); err != nil {
380-
return errors.Wrap(err, "schedule HTTP server with probes")
381-
}
383+
srv := server.NewHTTP(logger, reg, comp, statusProber,
384+
server.WithListen(httpBindAddr),
385+
server.WithGracePeriod(httpGracePeriod),
386+
)
387+
srv.Handle("/", router)
388+
389+
g.Add(srv.ListenAndServe, srv.Shutdown)
382390
}
383391
// Start query (proxy) gRPC StoreAPI.
384392
{

cmd/thanos/receive.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/thanos-io/thanos/pkg/extflag"
12+
"github.com/thanos-io/thanos/pkg/server"
1213

1314
"github.com/go-kit/kit/log"
1415
"github.com/go-kit/kit/log/level"
@@ -36,6 +37,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
3637
cmd := app.Command(comp.String(), "Accept Prometheus remote write API requests and write to local tsdb (EXPERIMENTAL, this may change drastically without notice)")
3738

3839
httpBindAddr := regHTTPAddrFlag(cmd)
40+
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
3941
grpcBindAddr, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd)
4042

4143
rwAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests.").
@@ -109,6 +111,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
109111
*grpcKey,
110112
*grpcClientCA,
111113
*httpBindAddr,
114+
time.Duration(*httpGracePeriod),
112115
*rwAddress,
113116
*rwServerCert,
114117
*rwServerKey,
@@ -142,6 +145,7 @@ func runReceive(
142145
grpcKey string,
143146
grpcClientCA string,
144147
httpBindAddr string,
148+
httpGracePeriod time.Duration,
145149
rwAddress string,
146150
rwServerCert string,
147151
rwServerKey string,
@@ -335,9 +339,11 @@ func runReceive(
335339

336340
level.Debug(logger).Log("msg", "setting up http server")
337341
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
338-
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil {
339-
return errors.Wrap(err, "schedule HTTP server with probes")
340-
}
342+
srv := server.NewHTTP(logger, reg, comp, statusProber,
343+
server.WithListen(httpBindAddr),
344+
server.WithGracePeriod(httpGracePeriod),
345+
)
346+
g.Add(srv.ListenAndServe, srv.Shutdown)
341347

342348
level.Debug(logger).Log("msg", "setting up grpc server")
343349
{

cmd/thanos/rule.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"time"
1919

2020
"github.com/thanos-io/thanos/pkg/extflag"
21+
"github.com/thanos-io/thanos/pkg/server"
2122

2223
"github.com/go-kit/kit/log"
2324
"github.com/go-kit/kit/log/level"
@@ -62,6 +63,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) {
6263
cmd := app.Command(comp.String(), "ruler evaluating Prometheus rules against given Query nodes, exposing Store API and storing old blocks in bucket")
6364

6465
httpBindAddr := regHTTPAddrFlag(cmd)
66+
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
6567
grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd)
6668

6769
labelStrs := cmd.Flag("label", "Labels to be applied to all generated metrics (repeated). Similar to external labels for Prometheus, used to identify ruler and its blocks as unique source.").
@@ -161,6 +163,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) {
161163
*key,
162164
*clientCA,
163165
*httpBindAddr,
166+
time.Duration(*httpGracePeriod),
164167
*webRoutePrefix,
165168
*webExternalPrefix,
166169
*webPrefixHeaderName,
@@ -196,6 +199,7 @@ func runRule(
196199
key string,
197200
clientCA string,
198201
httpBindAddr string,
202+
httpGracePeriod time.Duration,
199203
webRoutePrefix string,
200204
webExternalPrefix string,
201205
webPrefixHeaderName string,
@@ -548,9 +552,13 @@ func runRule(
548552
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)
549553

550554
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
551-
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, router, comp); err != nil {
552-
return errors.Wrap(err, "schedule HTTP server with probes")
553-
}
555+
srv := server.NewHTTP(logger, reg, comp, statusProber,
556+
server.WithListen(httpBindAddr),
557+
server.WithGracePeriod(httpGracePeriod),
558+
)
559+
srv.Handle("/", router)
560+
561+
g.Add(srv.ListenAndServe, srv.Shutdown)
554562
}
555563

556564
confContentYaml, err := objStoreConfig.Content()

cmd/thanos/sidecar.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/thanos-io/thanos/pkg/extflag"
12+
"github.com/thanos-io/thanos/pkg/server"
1213

1314
"github.com/go-kit/kit/log"
1415
"github.com/go-kit/kit/log/level"
@@ -36,6 +37,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {
3637
cmd := app.Command(component.Sidecar.String(), "sidecar for Prometheus server")
3738

3839
httpBindAddr := regHTTPAddrFlag(cmd)
40+
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
3941
grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd)
4042

4143
promURL := cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network.").
@@ -81,6 +83,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {
8183
*key,
8284
*clientCA,
8385
*httpBindAddr,
86+
time.Duration(*httpGracePeriod),
8487
*promURL,
8588
*promReadyTimeout,
8689
*dataDir,
@@ -103,6 +106,7 @@ func runSidecar(
103106
key string,
104107
clientCA string,
105108
httpBindAddr string,
109+
httpGracePeriod time.Duration,
106110
promURL *url.URL,
107111
promReadyTimeout time.Duration,
108112
dataDir string,
@@ -134,11 +138,14 @@ func runSidecar(
134138
uploads = false
135139
}
136140

137-
statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
138141
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
139-
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil {
140-
return errors.Wrap(err, "schedule HTTP server with probes")
141-
}
142+
statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
143+
srv := server.NewHTTP(logger, reg, comp, statusProber,
144+
server.WithListen(httpBindAddr),
145+
server.WithGracePeriod(httpGracePeriod),
146+
)
147+
148+
g.Add(srv.ListenAndServe, srv.Shutdown)
142149

143150
// Setup all the concurrent groups.
144151
{

0 commit comments

Comments
 (0)