Skip to content

Commit 3946fb8

Browse files
author
Alec Holmes
committed
handle creation of new watche more explicitly
Signed-off-by: Alec Holmes <[email protected]>
1 parent 8784bbf commit 3946fb8

File tree

1 file changed

+21
-15
lines changed

1 file changed

+21
-15
lines changed

pkg/server/sotw/v3/server.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,14 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
124124
return out.Nonce, str.Send(out)
125125
}
126126

127+
open := func(w *watch, req *discovery.DiscoveryRequest, responder chan cache.Response) {
128+
w.cancel = s.cache.CreateWatch(req, streamState, responder)
129+
watches.cases[w.index] = reflect.SelectCase{
130+
Dir: reflect.SelectRecv,
131+
Chan: reflect.ValueOf(responder),
132+
}
133+
}
134+
127135
if s.callbacks != nil {
128136
if err := s.callbacks.OnStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil {
129137
return err
@@ -191,25 +199,23 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
191199
}
192200

193201
typeURL := req.GetTypeUrl()
194-
195-
w, ok := watches.responders[typeURL]
196-
if !ok {
202+
responder := make(chan cache.Response, 1)
203+
if w, ok := watches.responders[typeURL]; ok {
204+
// We've found a pre-existing watch, lets check and update if needed.
205+
// If these requirements aren't satisfied, leave an open watch.
206+
if w.nonce == "" || w.nonce == nonce {
207+
w.Cancel()
208+
209+
open(w, req, responder)
210+
}
211+
} else {
212+
// No pre-existing watch exists, let's create one.
213+
// We need to precompute the watches first then open a watch in the cache.
197214
watches.responders[typeURL] = &watch{}
198-
199215
w = watches.responders[typeURL]
200216
watches.RecomputeWatches(s.ctx, reqCh)
201-
}
202-
if w.nonce == "" || w.nonce == nonce {
203-
if w.cancel != nil {
204-
w.cancel()
205-
}
206217

207-
responder := make(chan cache.Response, 1)
208-
w.cancel = s.cache.CreateWatch(req, streamState, responder)
209-
watches.cases[w.index] = reflect.SelectCase{
210-
Dir: reflect.SelectRecv,
211-
Chan: reflect.ValueOf(responder),
212-
}
218+
open(w, req, responder)
213219
}
214220
default:
215221
// Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL

0 commit comments

Comments
 (0)