Skip to content

Commit a7d65c6

Browse files
yuval-kCopilot
andauthored
feat: log+metric on envoy xDS errors (#13003)
Signed-off-by: Yuval Kohavi <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent c9d53c9 commit a7d65c6

File tree

6 files changed

+411
-2
lines changed

6 files changed

+411
-2
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ require (
611611
google.golang.org/api v0.246.0 // indirect
612612
google.golang.org/genproto v0.0.0-20250603155806-513f23925822 // indirect
613613
google.golang.org/genproto/googleapis/api v0.0.0-20251020155222-88f65dc88635 // indirect
614-
google.golang.org/genproto/googleapis/rpc v0.0.0-20251020155222-88f65dc88635 // indirect
614+
google.golang.org/genproto/googleapis/rpc v0.0.0-20251020155222-88f65dc88635
615615
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
616616
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect
617617
gopkg.in/inf.v0 v0.9.1 // indirect

hack/utils/oss_compliance/osa_provided.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ Name|Version|License
4040
[go.uber.org/zap](https://go.uber.org/zap)|v1.27.0|MIT License
4141
[x/exp](https://golang.org/x/exp)|v0.0.0-20251017212417-90e834f514db|BSD 3-clause "New" or "Revised" License
4242
[x/time](https://golang.org/x/time)|v0.14.0|BSD 3-clause "New" or "Revised" License
43+
[googleapis/rpc](https://google.golang.org/genproto/googleapis/rpc)|v0.0.0-20251020155222-88f65dc88635|Apache License 2.0
4344
[google.golang.org/grpc](https://google.golang.org/grpc)|v1.76.0|Apache License 2.0
4445
[google.golang.org/protobuf](https://google.golang.org/protobuf)|v1.36.10|BSD 3-clause "New" or "Revised" License
4546
[helm/v3](https://helm.sh/helm/v3)|v3.19.2|Apache License 2.0
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package setup
2+
3+
import (
4+
"context"
5+
6+
envoycorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
7+
discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
8+
xdsserver "github.com/envoyproxy/go-control-plane/pkg/server/v3"
9+
)
10+
11+
func chainCallbacks(cbs ...xdsserver.Callbacks) xdsserver.Callbacks {
12+
return &callbacksChain{Callbacks: cbs}
13+
}
14+
15+
type callbacksChain struct {
16+
Callbacks []xdsserver.Callbacks
17+
}
18+
19+
func (c *callbacksChain) OnDeltaStreamClosed(s int64, n *envoycorev3.Node) {
20+
for _, cb := range c.Callbacks {
21+
cb.OnDeltaStreamClosed(s, n)
22+
}
23+
}
24+
25+
func (c *callbacksChain) OnDeltaStreamOpen(ctx context.Context, streamID int64, typeURL string) error {
26+
for _, cb := range c.Callbacks {
27+
if err := cb.OnDeltaStreamOpen(ctx, streamID, typeURL); err != nil {
28+
return err
29+
}
30+
}
31+
return nil
32+
}
33+
34+
func (c *callbacksChain) OnFetchRequest(ctx context.Context, req *discoveryv3.DiscoveryRequest) error {
35+
for _, cb := range c.Callbacks {
36+
if err := cb.OnFetchRequest(ctx, req); err != nil {
37+
return err
38+
}
39+
}
40+
return nil
41+
}
42+
43+
func (c *callbacksChain) OnFetchResponse(req *discoveryv3.DiscoveryRequest, resp *discoveryv3.DiscoveryResponse) {
44+
for _, cb := range c.Callbacks {
45+
cb.OnFetchResponse(req, resp)
46+
}
47+
}
48+
49+
func (c *callbacksChain) OnStreamClosed(streamID int64, node *envoycorev3.Node) {
50+
for _, cb := range c.Callbacks {
51+
cb.OnStreamClosed(streamID, node)
52+
}
53+
}
54+
55+
func (c *callbacksChain) OnStreamDeltaRequest(streamID int64, deltaReq *discoveryv3.DeltaDiscoveryRequest) error {
56+
for _, cb := range c.Callbacks {
57+
if err := cb.OnStreamDeltaRequest(streamID, deltaReq); err != nil {
58+
return err
59+
}
60+
}
61+
return nil
62+
}
63+
64+
func (c *callbacksChain) OnStreamDeltaResponse(streamID int64, deltaReq *discoveryv3.DeltaDiscoveryRequest, deltaResp *discoveryv3.DeltaDiscoveryResponse) {
65+
for _, cb := range c.Callbacks {
66+
cb.OnStreamDeltaResponse(streamID, deltaReq, deltaResp)
67+
}
68+
}
69+
70+
func (c *callbacksChain) OnStreamOpen(ctx context.Context, streamID int64, typeURL string) error {
71+
for _, cb := range c.Callbacks {
72+
if err := cb.OnStreamOpen(ctx, streamID, typeURL); err != nil {
73+
return err
74+
}
75+
}
76+
return nil
77+
}
78+
79+
func (c *callbacksChain) OnStreamRequest(streamID int64, req *discoveryv3.DiscoveryRequest) error {
80+
for _, cb := range c.Callbacks {
81+
if err := cb.OnStreamRequest(streamID, req); err != nil {
82+
return err
83+
}
84+
}
85+
return nil
86+
}
87+
88+
func (c *callbacksChain) OnStreamResponse(ctx context.Context, streamID int64, req *discoveryv3.DiscoveryRequest, resp *discoveryv3.DiscoveryResponse) {
89+
for _, cb := range c.Callbacks {
90+
cb.OnStreamResponse(ctx, streamID, req, resp)
91+
}
92+
}

internal/kgateway/setup/controlplane.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,16 @@ func NewControlPlane(
100100
) envoycache.SnapshotCache {
101101
baseLogger := slog.Default().With("component", "envoy-controlplane")
102102
envoyLoggerAdapter := &slogAdapterForEnvoy{logger: baseLogger}
103+
lnc := newLogNackCallback()
104+
allCallbacks := chainCallbacks(callbacks, lnc)
103105

104106
// Create separate gRPC servers for each listener
105107
serverOpts := getGRPCServerOpts(authenticators, xdsAuth, certWatcher, baseLogger)
106108
kgwGRPCServer := grpc.NewServer(serverOpts...)
107109

108110
snapshotCache := envoycache.NewSnapshotCache(true, xds.NewNodeRoleHasher(), envoyLoggerAdapter)
109111

110-
xdsServer := xdsserver.NewServer(ctx, snapshotCache, callbacks)
112+
xdsServer := xdsserver.NewServer(ctx, snapshotCache, allCallbacks)
111113

112114
// Register reflection and services on both servers
113115
reflection.Register(kgwGRPCServer)
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package setup
2+
3+
import (
4+
"strings"
5+
"sync"
6+
7+
envoycorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
8+
discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
9+
xdsserver "github.com/envoyproxy/go-control-plane/pkg/server/v3"
10+
"google.golang.org/genproto/googleapis/rpc/status"
11+
12+
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/xds"
13+
"github.com/kgateway-dev/kgateway/v2/pkg/logging"
14+
"github.com/kgateway-dev/kgateway/v2/pkg/metrics"
15+
)
16+
17+
const (
18+
gwNameLabel = "gateway_name"
19+
gwNamespaceLabel = "gateway_namespace"
20+
typeURLLabel = "type_url"
21+
)
22+
23+
var (
24+
logger = logging.New("xds/envoy")
25+
envoyXdsSubsystem = "envoy_xds"
26+
xdsRejectsTotal = metrics.NewCounter(
27+
metrics.CounterOpts{
28+
Subsystem: envoyXdsSubsystem,
29+
Name: "rejects_total",
30+
Help: "Total number of xDS responses rejected by envoy proxy",
31+
}, []string{gwNamespaceLabel, gwNameLabel, typeURLLabel})
32+
xdsRejectsCurrent = metrics.NewGauge(
33+
metrics.GaugeOpts{
34+
Subsystem: envoyXdsSubsystem,
35+
Name: "rejects_active",
36+
Help: "Number of xDS responses currently rejected by envoy proxy",
37+
}, []string{gwNamespaceLabel, gwNameLabel, typeURLLabel})
38+
)
39+
40+
type resourceKey struct {
41+
Namespace string
42+
Name string
43+
ResourceTypeUrl string
44+
}
45+
46+
type resourceState struct {
47+
errors map[resourceKey]struct{}
48+
}
49+
50+
func newResourceState() resourceState {
51+
return resourceState{
52+
errors: make(map[resourceKey]struct{}),
53+
}
54+
}
55+
56+
type logNackCallback struct {
57+
xdsserver.CallbackFuncs
58+
streamState map[int64]resourceState
59+
60+
lock sync.Mutex
61+
}
62+
63+
var _ xdsserver.Callbacks = (*logNackCallback)(nil)
64+
65+
func newLogNackCallback() *logNackCallback {
66+
return &logNackCallback{
67+
streamState: make(map[int64]resourceState),
68+
}
69+
}
70+
71+
// OnStreamClosed implements server.Callbacks.
72+
func (l *logNackCallback) OnStreamClosed(streamID int64, node *envoycorev3.Node) {
73+
l.lock.Lock()
74+
streamState := l.streamState[streamID]
75+
delete(l.streamState, streamID)
76+
l.lock.Unlock()
77+
78+
for k := range streamState.errors {
79+
l.onErrorGone(k)
80+
}
81+
}
82+
83+
// OnStreamRequest implements server.Callbacks.
84+
func (l *logNackCallback) OnStreamRequest(streamID int64, req *discoveryv3.DiscoveryRequest) error {
85+
// get gateway and typeURL from request
86+
role := req.GetNode().GetMetadata().GetFields()[xds.RoleKey].GetStringValue()
87+
parts := strings.SplitN(role, xds.KeyDelimiter, 3)
88+
if len(parts) != 3 {
89+
return nil
90+
}
91+
namespace := parts[1]
92+
name := parts[2]
93+
94+
// note, with locality, name will include name~hash~ns
95+
if localityParts := strings.SplitN(name, xds.KeyDelimiter, 3); len(localityParts) == 3 {
96+
name = localityParts[0]
97+
}
98+
99+
typeUrl := req.GetTypeUrl()
100+
key := resourceKey{
101+
Namespace: namespace,
102+
Name: name,
103+
ResourceTypeUrl: strings.TrimPrefix(typeUrl, "type.googleapis.com/"),
104+
}
105+
106+
if req.ErrorDetail != nil {
107+
if !l.handleError(streamID, key) {
108+
// Log NACK only once per resource
109+
return nil
110+
}
111+
l.onNewError(key, req.ErrorDetail)
112+
} else {
113+
errorGone := l.handleNoError(streamID, key)
114+
if errorGone {
115+
l.onErrorGone(key)
116+
}
117+
}
118+
return nil
119+
}
120+
121+
func (l *logNackCallback) onNewError(key resourceKey, err *status.Status) {
122+
labels := toLabels(key)
123+
xdsRejectsTotal.Inc(labels...)
124+
xdsRejectsCurrent.Add(1, labels...)
125+
logger.Warn("xds error", "gateway_name", key.Name, "gateway_ns", key.Namespace, "resource", key.ResourceTypeUrl, "error", err.Message)
126+
}
127+
128+
func (l *logNackCallback) onErrorGone(key resourceKey) {
129+
xdsRejectsCurrent.Add(-1, toLabels(key)...)
130+
}
131+
132+
func (l *logNackCallback) handleNoError(streamID int64, key resourceKey) bool {
133+
l.lock.Lock()
134+
defer l.lock.Unlock()
135+
streamState := l.streamState[streamID]
136+
_, hadKey := streamState.errors[key]
137+
delete(streamState.errors, key)
138+
return hadKey
139+
}
140+
141+
func (l *logNackCallback) handleError(streamID int64, key resourceKey) bool {
142+
l.lock.Lock()
143+
defer l.lock.Unlock()
144+
streamState := l.streamState[streamID]
145+
if streamState.errors == nil {
146+
streamState = newResourceState()
147+
l.streamState[streamID] = streamState
148+
}
149+
if _, exists := streamState.errors[key]; exists {
150+
return false
151+
}
152+
streamState.errors[key] = struct{}{}
153+
return true
154+
}
155+
156+
func toLabels(key resourceKey) []metrics.Label {
157+
return []metrics.Label{
158+
{
159+
Name: gwNamespaceLabel,
160+
Value: key.Namespace,
161+
},
162+
{
163+
Name: gwNameLabel,
164+
Value: key.Name,
165+
},
166+
{
167+
Name: typeURLLabel,
168+
Value: key.ResourceTypeUrl,
169+
},
170+
}
171+
}

0 commit comments

Comments
 (0)