@@ -21,19 +21,18 @@ import (
2121 "strconv"
2222 "sync/atomic"
2323
24- "google.golang.org/grpc"
2524 "google.golang.org/grpc/codes"
2625 "google.golang.org/grpc/status"
2726
2827 core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2928 discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
3029 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
3130 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
32- streamv3 "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
31+ "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
3332)
3433
3534type Server interface {
36- StreamHandler (stream Stream , typeURL string ) error
35+ StreamHandler (stream stream. Stream , typeURL string ) error
3736}
3837
3938type Callbacks interface {
@@ -63,104 +62,11 @@ type server struct {
6362 streamCount int64
6463}
6564
66- // Generic RPC stream.
67- type Stream interface {
68- grpc.ServerStream
69-
70- Send (* discovery.DiscoveryResponse ) error
71- Recv () (* discovery.DiscoveryRequest , error )
72- }
73-
74- // watches for all xDS resource types
75- type watches struct {
76- endpoints chan cache.Response
77- clusters chan cache.Response
78- routes chan cache.Response
79- scopedRoutes chan cache.Response
80- listeners chan cache.Response
81- secrets chan cache.Response
82- runtimes chan cache.Response
83- extensionConfigs chan cache.Response
84-
85- endpointCancel func ()
86- clusterCancel func ()
87- routeCancel func ()
88- scopedRouteCancel func ()
89- listenerCancel func ()
90- secretCancel func ()
91- runtimeCancel func ()
92- extensionConfigCancel func ()
93-
94- endpointNonce string
95- clusterNonce string
96- routeNonce string
97- scopedRouteNonce string
98- listenerNonce string
99- secretNonce string
100- runtimeNonce string
101- extensionConfigNonce string
102-
103- // Opaque resources share a muxed channel. Nonces and watch cancellations are indexed by type URL.
104- responses chan cache.Response
105- cancellations map [string ]func ()
106- nonces map [string ]string
107- }
108-
109- // Discovery response that is sent over GRPC stream
110- // We need to record what resource names are already sent to a client
111- // So if the client requests a new name we can respond back
112- // regardless current snapshot version (even if it is not changed yet)
113- type lastDiscoveryResponse struct {
114- nonce string
115- resources map [string ]struct {}
116- }
117-
118- // Initialize all watches
119- func (values * watches ) Init () {
120- // muxed channel needs a buffer to release go-routines populating it
121- values .responses = make (chan cache.Response , 5 )
122- values .cancellations = make (map [string ]func ())
123- values .nonces = make (map [string ]string )
124- }
125-
12665// Token response value used to signal a watch failure in muxed watches.
12766var errorResponse = & cache.RawResponse {}
12867
129- // Cancel all watches
130- func (values * watches ) Cancel () {
131- if values .endpointCancel != nil {
132- values .endpointCancel ()
133- }
134- if values .clusterCancel != nil {
135- values .clusterCancel ()
136- }
137- if values .routeCancel != nil {
138- values .routeCancel ()
139- }
140- if values .scopedRouteCancel != nil {
141- values .scopedRouteCancel ()
142- }
143- if values .listenerCancel != nil {
144- values .listenerCancel ()
145- }
146- if values .secretCancel != nil {
147- values .secretCancel ()
148- }
149- if values .runtimeCancel != nil {
150- values .runtimeCancel ()
151- }
152- if values .extensionConfigCancel != nil {
153- values .extensionConfigCancel ()
154- }
155- for _ , cancel := range values .cancellations {
156- if cancel != nil {
157- cancel ()
158- }
159- }
160- }
161-
16268// process handles a bi-di stream request
163- func (s * server ) process (stream Stream , reqCh <- chan * discovery.DiscoveryRequest , defaultTypeURL string ) error {
69+ func (s * server ) process (stream stream. Stream , reqCh <- chan * discovery.DiscoveryRequest , defaultTypeURL string ) error {
16470 // increment stream count
16571 streamID := atomic .AddInt64 (& s .streamCount , 1 )
16672
@@ -172,10 +78,10 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest
17278 lastDiscoveryResponses := map [string ]lastDiscoveryResponse {}
17379
17480 // a collection of stack allocated watches per request type
175- var values watches
176- values . Init ()
81+ watches := newWatches ()
82+
17783 defer func () {
178- values .Cancel ()
84+ watches .Cancel ()
17985 if s .callbacks != nil {
18086 s .callbacks .OnStreamClosed (streamID )
18187 }
@@ -225,66 +131,18 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest
225131 case <- s .ctx .Done ():
226132 return nil
227133 // config watcher can send the requested resources types in any order
228- case resp , more := <- values .endpoints :
229- if ! more {
230- return status .Errorf (codes .Unavailable , "endpoints watch failed" )
231- }
232- nonce , err := send (resp )
233- if err != nil {
234- return err
235- }
236- values .endpointNonce = nonce
237-
238- case resp , more := <- values .clusters :
239- if ! more {
240- return status .Errorf (codes .Unavailable , "clusters watch failed" )
241- }
242- nonce , err := send (resp )
243- if err != nil {
244- return err
245- }
246- values .clusterNonce = nonce
247-
248- case resp , more := <- values .routes :
134+ case resp , more := <- watches .muxedResponses :
249135 if ! more {
250- return status .Errorf (codes .Unavailable , "routes watch failed" )
251- }
252- nonce , err := send (resp )
253- if err != nil {
254- return err
255- }
256- values .routeNonce = nonce
257-
258- case resp , more := <- values .scopedRoutes :
259- if ! more {
260- return status .Errorf (codes .Unavailable , "scopedRoutes watch failed" )
261- }
262- nonce , err := send (resp )
263- if err != nil {
264- return err
136+ break
265137 }
266- values .scopedRouteNonce = nonce
267138
268- case resp , more := <- values . listeners :
269- if ! more {
270- return status .Errorf (codes .Unavailable , "listeners watch failed" )
139+ typ := resp . GetRequest (). GetTypeUrl ()
140+ if resp == errorResponse {
141+ return status .Errorf (codes .Unavailable , typ + " watch failed" )
271142 }
272- nonce , err := send (resp )
273- if err != nil {
274- return err
275- }
276- values .listenerNonce = nonce
277-
278- case resp , more := <- values .secrets :
279- if ! more {
280- return status .Errorf (codes .Unavailable , "secrets watch failed" )
281- }
282- nonce , err := send (resp )
283- if err != nil {
284- return err
285- }
286- values .secretNonce = nonce
287143
144+ << << << < HEAD
145+ << << << < HEAD
288146 case resp , more := <- values .runtimes :
289147 if ! more {
290148 return status .Errorf (codes .Unavailable , "runtimes watch failed" )
@@ -317,7 +175,19 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest
317175 }
318176 values .nonces [typeURL ] = nonce
319177 }
178+ == == == =
179+ nonce , err := send (resp , typ )
180+ == == == =
181+ nonce , err := send (resp )
182+ >> >> >> > a31cdfc0 (rebase and fix errors )
183+ if err != nil {
184+ return err
185+ }
186+ >> >> >> > c635a6b7 (dedupe and linearization )
320187
188+ watch := watches .watches [typ ]
189+ watch .nonce = nonce
190+ watches .watches [typ ] = watch
321191 case req , more := <- reqCh :
322192 // input stream ended or errored out
323193 if ! more {
@@ -334,9 +204,6 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest
334204 req .Node = node
335205 }
336206
337- // nonces can be reused across streams; we verify nonce only if nonce is not initialized
338- nonce := req .GetResponseNonce ()
339-
340207 // type URL is required for ADS but is implicit for xDS
341208 if defaultTypeURL == resource .AnyType {
342209 if req .TypeUrl == "" {
@@ -352,6 +219,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest
352219 }
353220 }
354221
222+ << << << < HEAD
355223 if lastResponse , ok := lastDiscoveryResponses [req .TypeUrl ]; ok {
356224 if lastResponse .nonce == "" || lastResponse .nonce == nonce {
357225 // Let's record Resource names that a client has received.
@@ -434,13 +302,36 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest
434302 }
435303 values .cancellations [typeURL ] = s .cache .CreateWatch (req , streamState , values .responses )
436304 }
305+ == == == =
306+ typeURL := req .GetTypeUrl ()
307+
308+ // cancel existing watches to (re-)request a newer version
309+ watch , ok := watches .watches [typeURL ]
310+ // nonces can be reused across streams; we verify nonce only if nonce is not initialized
311+ if ! ok || watch .nonce == req .GetResponseNonce () {
312+ watch .Cancel ()
313+
314+ watch .responses = make (chan cache.Response , 1 )
315+ watch .cancel = s .cache .CreateWatch (req , watch .responses )
316+
317+ watches .watches [typeURL ] = watch
318+ go func () {
319+ resp , more := <- watch .responses
320+ if ! more {
321+ watches .muxedResponses <- errorResponse
322+ return
323+ }
324+
325+ watches .muxedResponses <- resp
326+ }()
327+ >> >> >> > c635a6b7 (dedupe and linearization )
437328 }
438329 }
439330 }
440331}
441332
442333// StreamHandler converts a blocking read call to channels and initiates stream processing
443- func (s * server ) StreamHandler (stream Stream , typeURL string ) error {
334+ func (s * server ) StreamHandler (stream stream . Stream , typeURL string ) error {
444335 // a channel for receiving incoming requests
445336 reqCh := make (chan * discovery.DiscoveryRequest )
446337 go func () {
0 commit comments