Skip to content

Commit eadf1ae

Browse files
committed
Fix some bugs
Signed-off-by: Mohamed Hamza <[email protected]>
1 parent cde392c commit eadf1ae

File tree

2 files changed

+14
-15
lines changed

2 files changed

+14
-15
lines changed

go/vt/vtgate/balancer/session.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"sync"
2727

2828
"vitess.io/vitess/go/vt/discovery"
29+
"vitess.io/vitess/go/vt/log"
2930
querypb "vitess.io/vitess/go/vt/proto/query"
3031
"vitess.io/vitess/go/vt/proto/topodata"
3132
"vitess.io/vitess/go/vt/srvtopo"
@@ -57,6 +58,8 @@ type SessionBalancer struct {
5758

5859
// NewSessionBalancer creates a new session balancer.
5960
func NewSessionBalancer(ctx context.Context, localCell string, topoServer srvtopo.Server, hc discovery.HealthCheck) (TabletBalancer, error) {
61+
log.Info("session balancer: creating new session balancer")
62+
6063
b := &SessionBalancer{
6164
localCell: localCell,
6265
hc: hc,
@@ -127,20 +130,19 @@ func (b *SessionBalancer) DebugHandler(w http.ResponseWriter, r *http.Request) {
127130

128131
// watchHealthCheck watches the health check channel for tablet health changes, and updates hash rings accordingly.
129132
func (b *SessionBalancer) watchHealthCheck(ctx context.Context, hcChan chan *discovery.TabletHealth) {
130-
// Start watching health check channel for future tablet health changes
131133
for {
132134
select {
133135
case <-ctx.Done():
134136
b.hc.Unsubscribe(hcChan)
135137
return
136138
case tablet := <-hcChan:
137139
if tablet == nil {
138-
return
140+
continue
139141
}
140142

141143
// Ignore tablets we aren't supposed to watch
142144
if _, ok := tabletTypesToWatch[tablet.Target.TabletType]; !ok {
143-
return
145+
continue
144146
}
145147

146148
b.onTabletHealthChange(tablet)
@@ -184,9 +186,6 @@ func getOrCreateRing(rings map[discovery.KeyspaceShardTabletType]*hashRing, tabl
184186

185187
// print returns a string representation of the session balancer state for debugging.
186188
func (b *SessionBalancer) print() string {
187-
b.mu.RLock()
188-
defer b.mu.RUnlock()
189-
190189
sb := strings.Builder{}
191190

192191
sb.WriteString("Local rings:\n")

go/vt/vttablet/queryservice/wrapped.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ type wrappedService struct {
117117
}
118118

119119
func (ws *wrappedService) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (state TransactionState, err error) {
120-
opts := WrapOpts{InTransaction: false}
120+
opts := WrapOpts{InTransaction: false, Options: options}
121121
err = ws.wrapper(ctx, target, ws.impl, "Begin", opts, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
122122
var innerErr error
123123
state, innerErr = conn.Begin(ctx, target, options)
@@ -240,7 +240,7 @@ func (ws *wrappedService) UnresolvedTransactions(ctx context.Context, target *qu
240240

241241
func (ws *wrappedService) Execute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, transactionID, reservedID int64, options *querypb.ExecuteOptions) (qr *sqltypes.Result, err error) {
242242
inDedicatedConn := transactionID != 0 || reservedID != 0
243-
opts := WrapOpts{InTransaction: inDedicatedConn}
243+
opts := WrapOpts{InTransaction: inDedicatedConn, Options: options}
244244
err = ws.wrapper(ctx, target, ws.impl, "Execute", opts, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
245245
var innerErr error
246246
qr, innerErr = conn.Execute(ctx, target, query, bindVars, transactionID, reservedID, options)
@@ -254,7 +254,7 @@ func (ws *wrappedService) Execute(ctx context.Context, target *querypb.Target, q
254254
// StreamExecute implements the QueryService interface
255255
func (ws *wrappedService) StreamExecute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, transactionID int64, reservedID int64, options *querypb.ExecuteOptions, callback func(*sqltypes.Result) error) error {
256256
inDedicatedConn := transactionID != 0 || reservedID != 0
257-
opts := WrapOpts{InTransaction: inDedicatedConn}
257+
opts := WrapOpts{InTransaction: inDedicatedConn, Options: options}
258258
err := ws.wrapper(ctx, target, ws.impl, "StreamExecute", opts, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
259259
streamingStarted := false
260260
innerErr := conn.StreamExecute(ctx, target, query, bindVars, transactionID, reservedID, options, func(qr *sqltypes.Result) error {
@@ -270,7 +270,7 @@ func (ws *wrappedService) StreamExecute(ctx context.Context, target *querypb.Tar
270270

271271
func (ws *wrappedService) BeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, query string, bindVars map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (state TransactionState, qr *sqltypes.Result, err error) {
272272
inDedicatedConn := reservedID != 0
273-
opts := WrapOpts{InTransaction: inDedicatedConn}
273+
opts := WrapOpts{InTransaction: inDedicatedConn, Options: options}
274274
err = ws.wrapper(ctx, target, ws.impl, "BeginExecute", opts, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
275275
var innerErr error
276276
state, qr, innerErr = conn.BeginExecute(ctx, target, preQueries, query, bindVars, reservedID, options)
@@ -282,7 +282,7 @@ func (ws *wrappedService) BeginExecute(ctx context.Context, target *querypb.Targ
282282
// BeginStreamExecute implements the QueryService interface
283283
func (ws *wrappedService) BeginStreamExecute(ctx context.Context, target *querypb.Target, preQueries []string, query string, bindVars map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions, callback func(*sqltypes.Result) error) (state TransactionState, err error) {
284284
inDedicatedConn := reservedID != 0
285-
opts := WrapOpts{InTransaction: inDedicatedConn}
285+
opts := WrapOpts{InTransaction: inDedicatedConn, Options: options}
286286
err = ws.wrapper(ctx, target, ws.impl, "BeginStreamExecute", opts, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
287287
var innerErr error
288288
state, innerErr = conn.BeginStreamExecute(ctx, target, preQueries, query, bindVars, reservedID, options, callback)
@@ -355,7 +355,7 @@ func (ws *wrappedService) HandlePanic(err *error) {
355355

356356
// ReserveBeginExecute implements the QueryService interface
357357
func (ws *wrappedService) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (state ReservedTransactionState, res *sqltypes.Result, err error) {
358-
opts := WrapOpts{InTransaction: false}
358+
opts := WrapOpts{InTransaction: false, Options: options}
359359
err = ws.wrapper(ctx, target, ws.impl, "ReserveBeginExecute", opts, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
360360
var err error
361361
state, res, err = conn.ReserveBeginExecute(ctx, target, preQueries, postBeginQueries, sql, bindVariables, options)
@@ -367,7 +367,7 @@ func (ws *wrappedService) ReserveBeginExecute(ctx context.Context, target *query
367367

368368
// ReserveBeginStreamExecute implements the QueryService interface
369369
func (ws *wrappedService) ReserveBeginStreamExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(*sqltypes.Result) error) (state ReservedTransactionState, err error) {
370-
opts := WrapOpts{InTransaction: false}
370+
opts := WrapOpts{InTransaction: false, Options: options}
371371
err = ws.wrapper(ctx, target, ws.impl, "ReserveBeginStreamExecute", opts, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
372372
var innerErr error
373373
state, innerErr = conn.ReserveBeginStreamExecute(ctx, target, preQueries, postBeginQueries, sql, bindVariables, options, callback)
@@ -379,7 +379,7 @@ func (ws *wrappedService) ReserveBeginStreamExecute(ctx context.Context, target
379379
// ReserveExecute implements the QueryService interface
380380
func (ws *wrappedService) ReserveExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions) (state ReservedState, res *sqltypes.Result, err error) {
381381
inDedicatedConn := transactionID != 0
382-
opts := WrapOpts{InTransaction: inDedicatedConn}
382+
opts := WrapOpts{InTransaction: inDedicatedConn, Options: options}
383383
err = ws.wrapper(ctx, target, ws.impl, "ReserveExecute", opts, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
384384
var err error
385385
state, res, err = conn.ReserveExecute(ctx, target, preQueries, sql, bindVariables, transactionID, options)
@@ -392,7 +392,7 @@ func (ws *wrappedService) ReserveExecute(ctx context.Context, target *querypb.Ta
392392
// ReserveStreamExecute implements the QueryService interface
393393
func (ws *wrappedService) ReserveStreamExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions, callback func(*sqltypes.Result) error) (state ReservedState, err error) {
394394
inDedicatedConn := transactionID != 0
395-
opts := WrapOpts{InTransaction: inDedicatedConn}
395+
opts := WrapOpts{InTransaction: inDedicatedConn, Options: options}
396396
err = ws.wrapper(ctx, target, ws.impl, "ReserveStreamExecute", opts, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
397397
var innerErr error
398398
state, innerErr = conn.ReserveStreamExecute(ctx, target, preQueries, sql, bindVariables, transactionID, options, callback)

0 commit comments

Comments
 (0)