Skip to content

Commit 6b07eeb

Browse files
committed
node: Track pods by UID instead of only namespace/name
1 parent 31ffa20 commit 6b07eeb

File tree

5 files changed

+130
-113
lines changed

5 files changed

+130
-113
lines changed

node/pod.go

Lines changed: 35 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ package node
1717
import (
1818
"context"
1919
"fmt"
20-
"strings"
2120
"time"
2221

22+
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
2323
"github.com/virtual-kubelet/virtual-kubelet/internal/queue"
2424

2525
"github.com/google/go-cmp/cmp"
@@ -30,6 +30,7 @@ import (
3030
corev1 "k8s.io/api/core/v1"
3131
"k8s.io/apimachinery/pkg/api/errors"
3232
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/apimachinery/pkg/types"
3334
"k8s.io/apimachinery/pkg/util/wait"
3435
"k8s.io/client-go/tools/cache"
3536
)
@@ -58,6 +59,19 @@ func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) con
5859
})
5960
}
6061

62+
func (pc *PodController) getPodByUID(ctx context.Context, namespace, name string, uid types.UID) (*corev1.Pod, error) {
63+
pod, err := pc.podsLister.Pods(namespace).Get(name)
64+
switch {
65+
case errors.IsNotFound(err):
66+
return nil, errdefs.NotFoundf("pod %q not found: %w", objectName(namespace, name), err)
67+
case err != nil:
68+
return nil, err
69+
case pod.UID != uid:
70+
return nil, errdefs.NotFoundf("pod %q UID does not match (expected %q, got %q)", objectName(namespace, name), uid, pod.UID)
71+
}
72+
return pod, nil
73+
}
74+
6175
func (pc *PodController) createOrUpdatePod(ctx context.Context, pod *corev1.Pod) error {
6276

6377
ctx, span := trace.StartSpan(ctx, "createOrUpdatePod")
@@ -85,7 +99,7 @@ func (pc *PodController) createOrUpdatePod(ctx context.Context, pod *corev1.Pod)
8599
// Check if the pod is already known by the provider.
86100
// NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't.
87101
// Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod).
88-
if podFromProvider, _ := pc.provider.GetPod(ctx, pod.Namespace, pod.Name); podFromProvider != nil {
102+
if podFromProvider, _ := pc.provider.GetPodByUID(ctx, pod.Namespace, pod.Name, pod.UID); podFromProvider != nil {
89103
if !podsEqual(podFromProvider, podForProvider) {
90104
log.G(ctx).Debugf("Pod %s exists, updating pod in provider", podFromProvider.Name)
91105
if origErr := pc.provider.UpdatePod(ctx, podForProvider); origErr != nil {
@@ -203,7 +217,7 @@ func shouldSkipPodStatusUpdate(pod *corev1.Pod) bool {
203217
pod.Status.Phase == corev1.PodFailed
204218
}
205219

206-
func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes *corev1.Pod, key string) error {
220+
func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes *corev1.Pod, key podUIDKey) error {
207221
if shouldSkipPodStatusUpdate(podFromKubernetes) {
208222
return nil
209223
}
@@ -277,16 +291,11 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, pod *corev1
277291

278292
// TODO (Sargun): Make this asynchronousish. Right now, if we are not cache synced, and we receive notifications
279293
// from the provider for pods that do not exist yet in our known pods map, we can get into an awkward situation.
280-
key, err := cache.MetaNamespaceKeyFunc(pod)
281-
if err != nil {
282-
log.G(ctx).WithError(err).Error("Error getting pod meta namespace key")
283-
span.SetStatus(err)
284-
return
285-
}
294+
key := newPodUIDKey(pod)
286295
ctx = span.WithField(ctx, "key", key)
287296

288297
var obj interface{}
289-
err = wait.PollUntilContextCancel(ctx, notificationRetryPeriod, true, func(ctx context.Context) (bool, error) {
298+
err := wait.PollUntilContextCancel(ctx, notificationRetryPeriod, true, func(ctx context.Context) (bool, error) {
290299
var ok bool
291300
obj, ok = pc.knownPods.Load(key)
292301
if ok {
@@ -303,7 +312,7 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, pod *corev1
303312
// should happen, and the pod *actually* exists is the above -- where we haven't been able to finish sync
304313
// before context times out.
305314
// The other class of errors is non-transient
306-
_, err = pc.podsLister.Pods(pod.Namespace).Get(pod.Name)
315+
_, err := pc.getPodByUID(ctx, pod.Namespace, pod.Name, pod.UID)
307316
if err != nil {
308317
return false, err
309318
}
@@ -315,7 +324,7 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, pod *corev1
315324
})
316325

317326
if err != nil {
318-
if errors.IsNotFound(err) {
327+
if errdefs.IsNotFound(err) {
319328
err = fmt.Errorf("pod %q not found in pod lister: %w", key, err)
320329
log.G(ctx).WithError(err).Debug("not enqueuing pod status update")
321330
} else {
@@ -338,7 +347,7 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, pod *corev1
338347
pc.syncPodStatusFromProvider.Enqueue(ctx, key)
339348
}
340349

341-
func (pc *PodController) syncPodStatusFromProviderHandler(ctx context.Context, key string) (retErr error) {
350+
func (pc *PodController) syncPodStatusFromProviderHandler(ctx context.Context, key podUIDKey) (retErr error) {
342351
ctx, span := trace.StartSpan(ctx, "syncPodStatusFromProviderHandler")
343352
defer span.End()
344353

@@ -351,14 +360,13 @@ func (pc *PodController) syncPodStatusFromProviderHandler(ctx context.Context, k
351360
}
352361
}()
353362

354-
namespace, name, err := cache.SplitMetaNamespaceKey(key)
355-
if err != nil {
356-
return pkgerrors.Wrap(err, "error splitting cache key")
357-
}
363+
namespace := key.Namespace
364+
name := key.Name
365+
uid := key.UID
358366

359-
pod, err := pc.podsLister.Pods(namespace).Get(name)
367+
pod, err := pc.getPodByUID(ctx, namespace, name, uid)
360368
if err != nil {
361-
if errors.IsNotFound(err) {
369+
if errdefs.IsNotFound(err) {
362370
log.G(ctx).WithError(err).Debug("Skipping pod status update for pod missing in Kubernetes")
363371
return nil
364372
}
@@ -368,24 +376,19 @@ func (pc *PodController) syncPodStatusFromProviderHandler(ctx context.Context, k
368376
return pc.updatePodStatus(ctx, pod, key)
369377
}
370378

371-
func (pc *PodController) deletePodsFromKubernetesHandler(ctx context.Context, key string) (retErr error) {
379+
func (pc *PodController) deletePodsFromKubernetesHandler(ctx context.Context, key podUIDKey) (retErr error) {
372380
ctx, span := trace.StartSpan(ctx, "deletePodsFromKubernetesHandler")
373381
defer span.End()
374382

375-
uid, metaKey := getUIDAndMetaNamespaceKey(key)
376-
namespace, name, err := cache.SplitMetaNamespaceKey(metaKey)
383+
namespace := key.Namespace
384+
name := key.Name
385+
uid := key.UID
386+
377387
ctx = span.WithFields(ctx, log.Fields{
378388
"namespace": namespace,
379389
"name": name,
380390
})
381391

382-
if err != nil {
383-
// Log the error as a warning, but do not requeue the key as it is invalid.
384-
log.G(ctx).Warn(pkgerrors.Wrapf(err, "invalid resource key: %q", key))
385-
span.SetStatus(err)
386-
return nil
387-
}
388-
389392
defer func() {
390393
if retErr == nil {
391394
if w, ok := pc.provider.(syncWrapper); ok {
@@ -395,26 +398,22 @@ func (pc *PodController) deletePodsFromKubernetesHandler(ctx context.Context, ke
395398
}()
396399

397400
// If the pod has been deleted from API server, we don't need to do anything.
398-
k8sPod, err := pc.podsLister.Pods(namespace).Get(name)
399-
if errors.IsNotFound(err) {
401+
k8sPod, err := pc.getPodByUID(ctx, namespace, name, uid)
402+
if errdefs.IsNotFound(err) {
400403
return nil
401404
}
402405
if err != nil {
403406
span.SetStatus(err)
404407
return err
405408
}
406-
if string(k8sPod.UID) != uid {
407-
log.G(ctx).WithField("k8sPodUID", k8sPod.UID).WithField("uid", uid).Warn("Not deleting pod because remote pod has different UID")
408-
return nil
409-
}
410409
if running(&k8sPod.Status) {
411410
log.G(ctx).Error("Force deleting pod in running state")
412411
}
413412

414413
// We don't check with the provider before doing this delete. At this point, even if an outstanding pod status update
415414
// was in progress,
416415
deleteOptions := metav1.NewDeleteOptions(0)
417-
deleteOptions.Preconditions = metav1.NewUIDPreconditions(uid)
416+
deleteOptions.Preconditions = &metav1.Preconditions{UID: &uid}
418417
err = pc.client.Pods(namespace).Delete(ctx, name, *deleteOptions)
419418
if errors.IsNotFound(err) {
420419
log.G(ctx).Warnf("Not deleting pod because %v", err)
@@ -430,10 +429,3 @@ func (pc *PodController) deletePodsFromKubernetesHandler(ctx context.Context, ke
430429
}
431430
return nil
432431
}
433-
434-
func getUIDAndMetaNamespaceKey(key string) (string, string) {
435-
idx := strings.LastIndex(key, "/")
436-
uid := key[idx+1:]
437-
metaKey := key[:idx]
438-
return uid, metaKey
439-
}

node/pod_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ func TestPodStatusDelete(t *testing.T) {
318318
podCopy := pod.DeepCopy()
319319
deleteTime := v1.Time{Time: time.Now().Add(30 * time.Second)}
320320
podCopy.DeletionTimestamp = &deleteTime
321-
key := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
321+
key := newPodUIDKey(pod)
322322
c.knownPods.Store(key, &knownPod{lastPodStatusReceivedFromProvider: podCopy})
323323

324324
// test pod in provider delete
@@ -386,7 +386,7 @@ func TestReCreatePodRace(t *testing.T) {
386386
fk8s := &fake.Clientset{}
387387
c.client = fk8s
388388
c.PodController.client = fk8s.CoreV1()
389-
key := fmt.Sprintf("%s/%s/%s", pod.Namespace, pod.Name, pod.UID)
389+
key := newPodUIDKey(pod)
390390
c.knownPods.Store(key, &knownPod{lastPodStatusReceivedFromProvider: podCopy})
391391
c.deletePodsFromKubernetes.Enqueue(ctx, key)
392392
if err := c.podsInformer.Informer().GetStore().Add(pod); err != nil {

0 commit comments

Comments
 (0)