Skip to content

Commit af734f1

Browse files
authored
enhance: skip adding stopping node to resource group in handleNodeUp (#45969)
Related to #45960 Follow-up to #45961 After #45961 ensured that handleNodeUp is always called for nodes discovered during rewatchNodes (including stopping nodes), this change adds a safeguard in ResourceManager.handleNodeUp to skip adding stopping nodes to resource groups. 1. **resource_manager.go**: Add check for IsStoppingState() in handleNodeUp to prevent stopping nodes from being added to incomingNode set and assigned to resource groups. 2. **server.go**: - Delete processed nodes from sessionMap to avoid duplicate processing in the subsequent loop - Add warning logs for stopping state transitions during rewatch Signed-off-by: Congqi Xia <[email protected]>
1 parent 3901f11 commit af734f1

File tree

2 files changed

+15
-5
lines changed

2 files changed

+15
-5
lines changed

internal/querycoordv2/meta/resource_manager.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,12 @@ func (rm *ResourceManager) HandleNodeUp(ctx context.Context, node int64) {
532532
}
533533

534534
func (rm *ResourceManager) handleNodeUp(ctx context.Context, node int64) {
535-
if nodeInfo := rm.nodeMgr.Get(node); nodeInfo == nil || nodeInfo.IsEmbeddedQueryNodeInStreamingNode() {
535+
nodeInfo := rm.nodeMgr.Get(node)
536+
if nodeInfo == nil || nodeInfo.IsEmbeddedQueryNodeInStreamingNode() {
537+
return
538+
}
539+
if nodeInfo.IsStoppingState() {
540+
log.Warn("node is stopping, skip handle node up in resource manager", zap.Int64("node", node))
536541
return
537542
}
538543
rm.incomingNode.Insert(node)

internal/querycoordv2/server.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -715,10 +715,14 @@ func (s *Server) rewatchNodes(sessions map[string]*sessionutil.Session) error {
715715
// node in node manager but session not exist, means it's offline
716716
s.nodeMgr.Remove(node.ID())
717717
s.handleNodeDown(node.ID())
718-
} else if nodeSession.Stopping && !node.IsStoppingState() {
719-
// node in node manager but session is stopping, means it's stopping
720-
s.nodeMgr.Stopping(node.ID())
721-
s.handleNodeStopping(node.ID())
718+
} else {
719+
if nodeSession.Stopping && !node.IsStoppingState() {
720+
// node in node manager but session is stopping, means it's stopping
721+
log.Warn("rewatch found old querynode in stopping state", zap.Int64("nodeID", nodeSession.ServerID))
722+
s.nodeMgr.Stopping(node.ID())
723+
s.handleNodeStopping(node.ID())
724+
}
725+
delete(sessionMap, node.ID())
722726
}
723727
}
724728

@@ -739,6 +743,7 @@ func (s *Server) rewatchNodes(sessions map[string]*sessionutil.Session) error {
739743
s.handleNodeUp(nodeSession.GetServerID())
740744

741745
if nodeSession.Stopping {
746+
log.Warn("rewatch found new querynode in stopping state", zap.Int64("nodeID", nodeSession.ServerID))
742747
s.nodeMgr.Stopping(nodeSession.ServerID)
743748
s.handleNodeStopping(nodeSession.ServerID)
744749
}

0 commit comments

Comments
 (0)