@@ -18,6 +18,7 @@ package sotw
1818import (
1919 "context"
2020 "errors"
21+ "reflect"
2122 "strconv"
2223 "sync/atomic"
2324
@@ -62,19 +63,25 @@ type server struct {
6263 streamCount int64
6364}
6465
65- // Token response value used to signal a watch failure in muxed watches.
66- var errorResponse = & cache.RawResponse {}
66+ // Discovery response that is sent over GRPC stream
67+ // We need to record what resource names are already sent to a client
68+ // So if the client requests a new name we can respond back
69+ // regardless current snapshot version (even if it is not changed yet)
70+ type lastDiscoveryResponse struct {
71+ nonce string
72+ resources map [string ]struct {}
73+ }
6774
6875// process handles a bi-di stream request
69- func (s * server ) process (stream stream.Stream , reqCh <- chan * discovery.DiscoveryRequest , defaultTypeURL string ) error {
76+ func (s * server ) process (str stream.Stream , reqCh <- chan * discovery.DiscoveryRequest , defaultTypeURL string ) error {
7077 // increment stream count
7178 streamID := atomic .AddInt64 (& s .streamCount , 1 )
7279
7380 // unique nonce generator for req-resp pairs per xDS stream; the server
7481 // ignores stale nonces. nonce is only modified within send() function.
7582 var streamNonce int64
7683
77- streamState := streamv3 .NewStreamState (false , map [string ]string {})
84+ streamState := stream .NewStreamState (false , map [string ]string {})
7885 lastDiscoveryResponses := map [string ]lastDiscoveryResponse {}
7986
8087 // a collection of stack allocated watches per request type
@@ -114,85 +121,47 @@ func (s *server) process(stream stream.Stream, reqCh <-chan *discovery.Discovery
114121 if s .callbacks != nil {
115122 s .callbacks .OnStreamResponse (resp .GetContext (), streamID , resp .GetRequest (), out )
116123 }
117- return out .Nonce , stream .Send (out )
124+ return out .Nonce , str .Send (out )
125+ }
126+
127+ open := func (w * watch , req * discovery.DiscoveryRequest , responder chan cache.Response ) {
128+ w .cancel = s .cache .CreateWatch (req , streamState , responder )
129+ watches .cases [w .index ] = reflect.SelectCase {
130+ Dir : reflect .SelectRecv ,
131+ Chan : reflect .ValueOf (responder ),
132+ }
118133 }
119134
120135 if s .callbacks != nil {
121- if err := s .callbacks .OnStreamOpen (stream .Context (), streamID , defaultTypeURL ); err != nil {
136+ if err := s .callbacks .OnStreamOpen (str .Context (), streamID , defaultTypeURL ); err != nil {
122137 return err
123138 }
124139 }
125140
126141 // node may only be set on the first discovery request
127142 var node = & core.Node {}
128143
144+ // recompute dynamic channels for this stream
145+ watches .RecomputeWatches (s .ctx , reqCh )
146+
129147 for {
130- select {
131- case <- s .ctx .Done ():
148+ // The list of select cases looks like this:
149+ // 0: <- ctx.Done
150+ // 1: <- reqCh
151+ // 2...: per type watches
152+ index , value , ok := reflect .Select (watches .cases )
153+ switch index {
154+ // ctx.Done() -> if we receive a value here we return as no further computation is needed
155+ case 0 :
132156 return nil
133- // config watcher can send the requested resources types in any order
134- case resp , more := <- watches .muxedResponses :
135- if ! more {
136- break
137- }
138-
139- typ := resp .GetRequest ().GetTypeUrl ()
140- if resp == errorResponse {
141- return status .Errorf (codes .Unavailable , typ + " watch failed" )
142- }
143-
144- << << << < HEAD
145- << << << < HEAD
146- case resp , more := <- values .runtimes :
147- if ! more {
148- return status .Errorf (codes .Unavailable , "runtimes watch failed" )
149- }
150- nonce , err := send (resp )
151- if err != nil {
152- return err
153- }
154- values .runtimeNonce = nonce
155-
156- case resp , more := <- values .extensionConfigs :
157- if ! more {
158- return status .Errorf (codes .Unavailable , "extensionConfigs watch failed" )
159- }
160- nonce , err := send (resp )
161- if err != nil {
162- return err
163- }
164- values .extensionConfigNonce = nonce
165-
166- case resp , more := <- values .responses :
167- if more {
168- if resp == errorResponse {
169- return status .Errorf (codes .Unavailable , "resource watch failed" )
170- }
171- typeURL := resp .GetRequest ().TypeUrl
172- nonce , err := send (resp )
173- if err != nil {
174- return err
175- }
176- values .nonces [typeURL ] = nonce
177- }
178- == == == =
179- nonce , err := send (resp , typ )
180- == == == =
181- nonce , err := send (resp )
182- >> >> >> > a31cdfc0 (rebase and fix errors )
183- if err != nil {
184- return err
185- }
186- >> >> >> > c635a6b7 (dedupe and linearization )
187-
188- watch := watches .watches [typ ]
189- watch .nonce = nonce
190- watches .watches [typ ] = watch
191- case req , more := <- reqCh :
157+ // Case 1 handles any request inbound on the stream and handles all initialization as needed
158+ case 1 :
192159 // input stream ended or errored out
193- if ! more {
160+ if ! ok {
194161 return nil
195162 }
163+
164+ req := value .Interface ().(* discovery.DiscoveryRequest )
196165 if req == nil {
197166 return status .Errorf (codes .Unavailable , "empty request" )
198167 }
@@ -204,6 +173,9 @@ func (s *server) process(stream stream.Stream, reqCh <-chan *discovery.Discovery
204173 req .Node = node
205174 }
206175
176+ // nonces can be reused across streams; we verify nonce only if nonce is not initialized
177+ nonce := req .GetResponseNonce ()
178+
207179 // type URL is required for ADS but is implicit for xDS
208180 if defaultTypeURL == resource .AnyType {
209181 if req .TypeUrl == "" {
@@ -219,113 +191,45 @@ func (s *server) process(stream stream.Stream, reqCh <-chan *discovery.Discovery
219191 }
220192 }
221193
222- << << << < HEAD
223194 if lastResponse , ok := lastDiscoveryResponses [req .TypeUrl ]; ok {
224195 if lastResponse .nonce == "" || lastResponse .nonce == nonce {
225196 // Let's record Resource names that a client has received.
226197 streamState .SetKnownResourceNames (req .TypeUrl , lastResponse .resources )
227198 }
228199 }
229200
230- // cancel existing watches to (re-)request a newer version
231- switch {
232- case req .TypeUrl == resource .EndpointType :
233- if values .endpointNonce == "" || values .endpointNonce == nonce {
234- if values .endpointCancel != nil {
235- values .endpointCancel ()
236- }
237- values .endpoints = make (chan cache.Response , 1 )
238- values .endpointCancel = s .cache .CreateWatch (req , streamState , values .endpoints )
239- }
240- case req .TypeUrl == resource .ClusterType :
241- if values .clusterNonce == "" || values .clusterNonce == nonce {
242- if values .clusterCancel != nil {
243- values .clusterCancel ()
244- }
245- values .clusters = make (chan cache.Response , 1 )
246- values .clusterCancel = s .cache .CreateWatch (req , streamState , values .clusters )
247- }
248- case req .TypeUrl == resource .RouteType :
249- if values .routeNonce == "" || values .routeNonce == nonce {
250- if values .routeCancel != nil {
251- values .routeCancel ()
252- }
253- values .routes = make (chan cache.Response , 1 )
254- values .routeCancel = s .cache .CreateWatch (req , streamState , values .routes )
255- }
256- case req .TypeUrl == resource .ScopedRouteType :
257- if values .scopedRouteNonce == "" || values .scopedRouteNonce == nonce {
258- if values .scopedRouteCancel != nil {
259- values .scopedRouteCancel ()
260- }
261- values .scopedRoutes = make (chan cache.Response , 1 )
262- values .scopedRouteCancel = s .cache .CreateWatch (req , streamState , values .scopedRoutes )
263- }
264- case req .TypeUrl == resource .ListenerType :
265- if values .listenerNonce == "" || values .listenerNonce == nonce {
266- if values .listenerCancel != nil {
267- values .listenerCancel ()
268- }
269- values .listeners = make (chan cache.Response , 1 )
270- values .listenerCancel = s .cache .CreateWatch (req , streamState , values .listeners )
271- }
272- case req .TypeUrl == resource .SecretType :
273- if values .secretNonce == "" || values .secretNonce == nonce {
274- if values .secretCancel != nil {
275- values .secretCancel ()
276- }
277- values .secrets = make (chan cache.Response , 1 )
278- values .secretCancel = s .cache .CreateWatch (req , streamState , values .secrets )
279- }
280- case req .TypeUrl == resource .RuntimeType :
281- if values .runtimeNonce == "" || values .runtimeNonce == nonce {
282- if values .runtimeCancel != nil {
283- values .runtimeCancel ()
284- }
285- values .runtimes = make (chan cache.Response , 1 )
286- values .runtimeCancel = s .cache .CreateWatch (req , streamState , values .runtimes )
287- }
288- case req .TypeUrl == resource .ExtensionConfigType :
289- if values .extensionConfigNonce == "" || values .extensionConfigNonce == nonce {
290- if values .extensionConfigCancel != nil {
291- values .extensionConfigCancel ()
292- }
293- values .extensionConfigs = make (chan cache.Response , 1 )
294- values .extensionConfigCancel = s .cache .CreateWatch (req , streamState , values .extensionConfigs )
295- }
296- default :
297- typeURL := req .TypeUrl
298- responseNonce , seen := values .nonces [typeURL ]
299- if ! seen || responseNonce == nonce {
300- if cancel , seen := values .cancellations [typeURL ]; seen && cancel != nil {
301- cancel ()
302- }
303- values .cancellations [typeURL ] = s .cache .CreateWatch (req , streamState , values .responses )
304- }
305- == == == =
306201 typeURL := req .GetTypeUrl ()
202+ responder := make (chan cache.Response , 1 )
203+ if w , ok := watches .responders [typeURL ]; ok {
204+ // We've found a pre-existing watch, lets check and update if needed.
205+ // If these requirements aren't satisfied, leave an open watch.
206+ if w .nonce == "" || w .nonce == nonce {
207+ w .Cancel ()
208+
209+ open (w , req , responder )
210+ }
211+ } else {
212+ // No pre-existing watch exists, let's create one.
213+ // We need to precompute the watches first then open a watch in the cache.
214+ watches .responders [typeURL ] = & watch {}
215+ w = watches .responders [typeURL ]
216+ watches .RecomputeWatches (s .ctx , reqCh )
307217
308- // cancel existing watches to (re-)request a newer version
309- watch , ok := watches .watches [typeURL ]
310- // nonces can be reused across streams; we verify nonce only if nonce is not initialized
311- if ! ok || watch .nonce == req .GetResponseNonce () {
312- watch .Cancel ()
313-
314- watch .responses = make (chan cache.Response , 1 )
315- watch .cancel = s .cache .CreateWatch (req , watch .responses )
316-
317- watches .watches [typeURL ] = watch
318- go func () {
319- resp , more := <- watch .responses
320- if ! more {
321- watches .muxedResponses <- errorResponse
322- return
323- }
324-
325- watches .muxedResponses <- resp
326- }()
327- >> >> >> > c635a6b7 (dedupe and linearization )
218+ open (w , req , responder )
328219 }
220+ default :
221+ // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
222+ if ! ok {
223+ return status .Errorf (codes .Unavailable , "resource watch failed" )
224+ }
225+
226+ res := value .Interface ().(cache.Response )
227+ nonce , err := send (value .Interface ().(cache.Response ))
228+ if err != nil {
229+ return err
230+ }
231+
232+ watches .responders [res .GetRequest ().TypeUrl ].nonce = nonce
329233 }
330234 }
331235}
0 commit comments