Skip to content

Commit fa732d0

Browse files
committed
Implement flags to control retry delays
Amend the client with two new flags, "--proxy.retry.initial-wait" and "--proxy.retry.max-wait", controlling how long to sleep after encountering a proxy poll failure (i.e. network timeout). An exponential, randomized backoff algorithm is used to prevent thundering herds. The initial wait is raised from 50ms to 1s. In practice waiting less than a second after a failure is rarely useful and prevents the server from being hammered with up to three requests in the second after a failure. Discussed in #82. Signed-off-by: Michael Hanselmann <[email protected]>
1 parent a68733f commit fa732d0

File tree

4 files changed

+31
-49
lines changed

4 files changed

+31
-49
lines changed

cmd/client/main.go

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"crypto/x509"
2222
"fmt"
2323
"io/ioutil"
24-
"math/rand"
2524
"net"
2625
"net/http"
2726
"net/url"
@@ -32,14 +31,15 @@ import (
3231
kingpin "gopkg.in/alecthomas/kingpin.v2"
3332

3433
"github.com/ShowMax/go-fqdn"
34+
"github.com/cenkalti/backoff/v4"
3535
"github.com/go-kit/kit/log"
3636
"github.com/go-kit/kit/log/level"
3737
"github.com/pkg/errors"
38+
"github.com/prometheus-community/pushprox/util"
3839
"github.com/prometheus/client_golang/prometheus"
3940
"github.com/prometheus/client_golang/prometheus/promhttp"
4041
"github.com/prometheus/common/promlog"
4142
"github.com/prometheus/common/promlog/flag"
42-
"github.com/prometheus-community/pushprox/util"
4343
)
4444

4545
var (
@@ -49,6 +49,9 @@ var (
4949
tlsCert = kingpin.Flag("tls.cert", "<cert> Client certificate file").String()
5050
tlsKey = kingpin.Flag("tls.key", "<key> Private key file").String()
5151
metricsAddr = kingpin.Flag("metrics-addr", "Serve Prometheus metrics at this address").Default(":9369").String()
52+
53+
retryInitialWait = kingpin.Flag("proxy.retry.initial-wait", "Amount of time to wait after proxy failure").Default("1s").Duration()
54+
retryMaxWait = kingpin.Flag("proxy.retry.max-wait", "Maximum amount of time to wait between proxy poll retries").Default("5s").Duration()
5255
)
5356

5457
var (
@@ -76,6 +79,15 @@ func init() {
7679
prometheus.MustRegister(pushErrorCounter, pollErrorCounter, scrapeErrorCounter)
7780
}
7881

82+
func newBackOffFromFlags() backoff.BackOff {
83+
b := backoff.NewExponentialBackOff()
84+
b.InitialInterval = *retryInitialWait
85+
b.Multiplier = 1.5
86+
b.MaxInterval = *retryMaxWait
87+
b.MaxElapsedTime = time.Duration(0)
88+
return b
89+
}
90+
7991
// Coordinator for scrape requests and responses
8092
type Coordinator struct {
8193
logger log.Logger
@@ -168,7 +180,7 @@ func (c *Coordinator) doPush(resp *http.Response, origRequest *http.Request, cli
168180
return nil
169181
}
170182

171-
func loop(c Coordinator, client *http.Client) error {
183+
func (c *Coordinator) doPoll(client *http.Client) error {
172184
base, err := url.Parse(*proxyURL)
173185
if err != nil {
174186
level.Error(c.logger).Log("msg", "Error parsing url:", "err", err)
@@ -201,35 +213,18 @@ func loop(c Coordinator, client *http.Client) error {
201213
return nil
202214
}
203215

204-
// decorrelated Jitter increases the maximum jitter based on the last random value.
205-
type decorrelatedJitter struct {
206-
duration time.Duration // sleep time
207-
min time.Duration // min sleep time
208-
cap time.Duration // max sleep time
209-
}
210-
211-
func newJitter() decorrelatedJitter {
212-
rand.Seed(time.Now().UnixNano())
213-
return decorrelatedJitter{
214-
min: 50 * time.Millisecond,
215-
cap: 5 * time.Second,
216+
func (c *Coordinator) loop(bo backoff.BackOff, client *http.Client) {
217+
op := func() error {
218+
return c.doPoll(client)
216219
}
217-
}
218220

219-
func (d *decorrelatedJitter) calc() time.Duration {
220-
change := rand.Float64() * float64(d.duration*time.Duration(3)-d.min)
221-
d.duration = d.min + time.Duration(change)
222-
if d.duration > d.cap {
223-
d.duration = d.cap
224-
}
225-
if d.duration < d.min {
226-
d.duration = d.min
221+
for {
222+
if err := backoff.RetryNotify(op, bo, func(err error, _ time.Duration) {
223+
pollErrorCounter.Inc()
224+
}); err != nil {
225+
level.Error(c.logger).Log("err", err)
226+
}
227227
}
228-
return d.duration
229-
}
230-
231-
func (d *decorrelatedJitter) sleep() {
232-
time.Sleep(d.calc())
233228
}
234229

235230
func main() {
@@ -299,14 +294,7 @@ func main() {
299294
TLSClientConfig: tlsConfig,
300295
}
301296

302-
jitter := newJitter()
303297
client := &http.Client{Transport: transport}
304-
for {
305-
err := loop(coordinator, client)
306-
if err != nil {
307-
pollErrorCounter.Inc()
308-
jitter.sleep()
309-
continue
310-
}
311-
}
298+
299+
coordinator.loop(newBackOffFromFlags(), client)
312300
}

cmd/client/main_test.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,6 @@ import (
2222
"github.com/pkg/errors"
2323
)
2424

25-
func TestJitter(t *testing.T) {
26-
jitter := newJitter()
27-
for i := 0; i < 100000; i++ {
28-
duration := jitter.calc()
29-
if !(jitter.min <= duration || duration <= jitter.cap) {
30-
t.Fatal("invalid jitter value: ", duration)
31-
}
32-
}
33-
}
34-
3525
type TestLogger struct{}
3626

3727
func (tl *TestLogger) Log(vars ...interface{}) error {
@@ -76,7 +66,7 @@ func TestHandleErr(t *testing.T) {
7666
func TestLoop(t *testing.T) {
7767
ts, c := prepareTest()
7868
defer ts.Close()
79-
if err := loop(c, ts.Client()); err != nil {
69+
if err := c.doPoll(ts.Client()); err != nil {
8070
t.Fatal(err)
8171
}
8272
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.13
44

55
require (
66
github.com/ShowMax/go-fqdn v0.0.0-20180501083314-6f60894d629f
7+
github.com/cenkalti/backoff/v4 v4.1.0
78
github.com/go-kit/kit v0.10.0
89
github.com/google/uuid v1.1.1
910
github.com/pkg/errors v0.9.1

go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
3434
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
3535
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
3636
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
37+
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
3738
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
39+
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
40+
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
3841
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
3942
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
4043
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=

0 commit comments

Comments
 (0)