@@ -19,7 +19,9 @@ package discovery
1919import (
2020 "context"
2121 "fmt"
22+ "slices"
2223 "sync"
24+ "time"
2325
2426 "golang.org/x/sync/errgroup"
2527 "google.golang.org/protobuf/proto"
@@ -37,6 +39,11 @@ import (
3739 vschemapb "vitess.io/vitess/go/vt/proto/vschema"
3840)
3941
42+ var (
43+ // waitConsistentKeyspacesCheck is the amount of time to wait for between checks to verify the keyspace is consistent.
44+ waitConsistentKeyspacesCheck = 100 * time .Millisecond
45+ )
46+
4047// KeyspaceEventWatcher is an auxiliary watcher that watches all availability incidents
4148// for all keyspaces in a Vitess cell and notifies listeners when the events have been resolved.
4249// Right now this is capable of detecting the end of failovers, both planned and unplanned,
@@ -692,29 +699,53 @@ func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(ctx context.Context, tar
692699 return ks .beingResharded (target .Shard )
693700}
694701
695- // PrimaryIsNotServing checks if the reason why the given target is not accessible right now is
696- // that the primary tablet for that shard is not serving. This is possible during a Planned
697- // Reparent Shard operation. Just as the operation completes, a new primary will be elected, and
702+ // ShouldStartBufferingForTarget checks if we should be starting buffering for the given target.
703+ // We check the following things before we start buffering -
704+ // 1. The shard must have a primary.
705+ // 2. The primary must be non-serving.
706+ // 3. The keyspace must be marked inconsistent.
707+ //
708+ // This buffering is meant to kick in during a Planned Reparent Shard operation.
709+ // As part of that operation the old primary will become non-serving. At that point
710+ // this code should return true to start buffering requests.
711+ // Just as the PRS operation completes, a new primary will be elected, and
698712// it will send its own healthcheck stating that it is serving. We should buffer requests until
699- // that point. There are use cases where people do not run with a Primary server at all, so we must
713+ // that point.
714+ //
715+ // There are use cases where people do not run with a Primary server at all, so we must
700716// verify that we only start buffering when a primary was present, and it went not serving.
701717// The shard state keeps track of the current primary and the last externally reparented time, which
702718// we can use to determine that there was a serving primary which now became non serving. This is
703719// only possible in a DemotePrimary RPC which are only called from ERS and PRS. So buffering will
704- // stop when these operations succeed. We return the tablet alias of the primary if it is serving.
705- func (kew * KeyspaceEventWatcher ) PrimaryIsNotServing (ctx context.Context , target * querypb.Target ) (* topodatapb.TabletAlias , bool ) {
720+ // stop when these operations succeed. We also return the tablet alias of the primary if it is serving.
721+ func (kew * KeyspaceEventWatcher ) ShouldStartBufferingForTarget (ctx context.Context , target * querypb.Target ) (* topodatapb.TabletAlias , bool ) {
706722 if target .TabletType != topodatapb .TabletType_PRIMARY {
723+ // We don't support buffering for any target tablet type other than the primary.
707724 return nil , false
708725 }
709726 ks := kew .getKeyspaceStatus (ctx , target .Keyspace )
710727 if ks == nil {
728+ // If the keyspace status is nil, then the keyspace must be deleted.
729+ // The user query is trying to access a keyspace that has been deleted.
730+ // There is no reason to buffer this query.
711731 return nil , false
712732 }
713733 ks .mu .Lock ()
714734 defer ks .mu .Unlock ()
715735 if state , ok := ks .shards [target .Shard ]; ok {
716- // If the primary tablet was present then externallyReparented will be non-zero and
717- // currentPrimary will be not nil.
736+ // As described in the function comment, we only want to start buffering when all the following conditions are met -
737+ // 1. The shard must have a primary. We check this by checking the currentPrimary and externallyReparented fields being non-empty.
738+ // They are set the first time the shard registers an update from a serving primary and are never cleared out after that.
739+ // If the user has configured vtgates to wait for the primary tablet healthchecks before starting query service, this condition
740+ // will always be true.
741+ // 2. The primary must be non-serving. We check this by checking the serving field in the shard state.
742+ // When a primary becomes non-serving, it also marks the keyspace inconsistent. So the next check is only added
743+ // for being defensive against any bugs.
744+ // 3. The keyspace must be marked inconsistent. We check this by checking the consistent field in the keyspace state.
745+ //
746+ // The reason we need all the three checks is that we want to be very defensive in when we start buffering.
747+ // We don't want to start buffering when we don't know for sure if the primary
748+ // is not serving and we will receive an update that stops buffering soon.
718749 return state .currentPrimary , ! state .serving && ! ks .consistent && state .externallyReparented != 0 && state .currentPrimary != nil
719750 }
720751 return nil , false
@@ -766,3 +797,46 @@ func (kew *KeyspaceEventWatcher) MarkShardNotServing(ctx context.Context, keyspa
766797 }
767798 return true
768799}
800+
801+ // WaitForConsistentKeyspaces waits for the given set of keyspaces to be marked consistent.
802+ func (kew * KeyspaceEventWatcher ) WaitForConsistentKeyspaces (ctx context.Context , ksList []string ) error {
803+ // We don't want to change the original keyspace list that we receive so we clone it
804+ // before we empty it elements down below.
805+ keyspaces := slices .Clone (ksList )
806+ for {
807+ // We empty keyspaces as we find them to be consistent.
808+ allConsistent := true
809+ for i , ks := range keyspaces {
810+ if ks == "" {
811+ continue
812+ }
813+
814+ // Get the keyspace status and see it is consistent yet or not.
815+ kss := kew .getKeyspaceStatus (ctx , ks )
816+ // If kss is nil, then it must be deleted. In that case too it is fine for us to consider
817+ // it consistent since the keyspace has been deleted.
818+ if kss == nil || kss .consistent {
819+ keyspaces [i ] = ""
820+ } else {
821+ allConsistent = false
822+ }
823+ }
824+
825+ if allConsistent {
826+ // all the keyspaces are consistent.
827+ return nil
828+ }
829+
830+ // Unblock after the sleep or when the context has expired.
831+ select {
832+ case <- ctx .Done ():
833+ for _ , ks := range keyspaces {
834+ if ks != "" {
835+ log .Infof ("keyspace %v didn't become consistent" , ks )
836+ }
837+ }
838+ return ctx .Err ()
839+ case <- time .After (waitConsistentKeyspacesCheck ):
840+ }
841+ }
842+ }
0 commit comments