Skip to content

Commit 963901b

Browse files
feat: Add download all and all namespaces flags (#1917)
# Description Allow user to download all captures from a specific namespace or from all namespaces ## Related Issue #1836 ## Checklist - [x] I have read the [contributing documentation](https://retina.sh/docs/Contributing/overview). - [x] I signed and signed-off the commits (`git commit -S -s ...`). See [this documentation](https://docs.github.com/en/authentication/managing-commit-signature-verification/about-commit-signature-verification) on signing commits. - [ ] I have correctly attributed the author(s) of the code. - [x] I have tested the changes locally. - [ ] I have followed the project's style guidelines. - [x] I have updated the documentation, if necessary. - [x] I have added tests, if applicable. ## Screenshots (if applicable) or Testing Completed Please add any relevant screenshots or GIFs to showcase the changes made. ## Additional Notes Add any additional notes or context about the pull request here. --- Please refer to the [CONTRIBUTING.md](../CONTRIBUTING.md) file for more information on how to contribute to this project.
1 parent a33f34b commit 963901b

File tree

3 files changed

+549
-27
lines changed

3 files changed

+549
-27
lines changed

cli/cmd/capture/download.go

Lines changed: 258 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
package capture
55

66
import (
7+
"archive/tar"
78
"bytes"
9+
"compress/gzip"
810
"context"
911
"errors"
1012
"fmt"
@@ -20,11 +22,13 @@ import (
2022
"github.com/Azure/azure-sdk-for-go/storage"
2123
retinacmd "github.com/microsoft/retina/cli/cmd"
2224
captureConstants "github.com/microsoft/retina/pkg/capture/constants"
25+
captureFile "github.com/microsoft/retina/pkg/capture/file"
2326
captureUtils "github.com/microsoft/retina/pkg/capture/utils"
2427
captureLabels "github.com/microsoft/retina/pkg/label"
2528
"github.com/spf13/cobra"
2629
"github.com/spf13/viper"
2730
"go.uber.org/zap"
31+
batchv1 "k8s.io/api/batch/v1"
2832
corev1 "k8s.io/api/core/v1"
2933
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3034
"k8s.io/apimachinery/pkg/util/rand"
@@ -69,6 +73,8 @@ var (
6973
ErrNoBlobsFound = errors.New("no blobs found with prefix")
7074
captureName string
7175
outputPath string
76+
downloadAll bool
77+
downloadAllNamespaces bool
7278
)
7379

7480
var (
@@ -79,6 +85,8 @@ var (
7985
ErrEmptyDownloadOutput = errors.New("download command produced no output")
8086
ErrFailedToCreateDownloadPod = errors.New("failed to create download pod")
8187
ErrUnsupportedNodeOS = errors.New("unsupported node operating system")
88+
ErrMissingRequiredFlags = errors.New("either --name, --blob-url, or --all must be specified")
89+
ErrAllNamespacesRequiresAll = errors.New("--all-namespaces flag can only be used with --all flag")
8290
)
8391

8492
// DownloadCmd holds all OS-specific commands and configurations
@@ -98,6 +106,12 @@ type DownloadService struct {
98106
namespace string
99107
}
100108

109+
// Key represents a unique capture identifier
110+
type Key struct {
111+
Name string
112+
Namespace string
113+
}
114+
101115
// NewDownloadService creates a new download service with shared dependencies
102116
func NewDownloadService(kubeClient kubernetes.Interface, config *rest.Config, namespace string) *DownloadService {
103117
return &DownloadService{
@@ -198,6 +212,12 @@ var downloadExample = templates.Examples(i18n.T(`
198212
# Download the capture file(s) created using the capture name and define output location
199213
kubectl retina capture download --name <capture-name> -o <output-location>
200214
215+
# Download all available captures
216+
kubectl retina capture download --all
217+
218+
# Download all available captures from all namespaces
219+
kubectl retina capture download --all --all-namespaces
220+
201221
# Download capture file(s) from Blob Storage via Blob URL (Blob URL requires Read/List permissions)
202222
kubectl retina capture download --blob-url "<blob-url>"
203223
`))
@@ -248,49 +268,61 @@ func downloadFromCluster(ctx context.Context, config *rest.Config, namespace str
248268

249269
// DownloadFile downloads a capture file from a specific node
250270
func (ds *DownloadService) DownloadFile(ctx context.Context, nodeName, hostPath, fileName, captureName string) error {
271+
content, err := ds.DownloadFileContent(ctx, nodeName, hostPath, fileName, captureName)
272+
if err != nil {
273+
return err
274+
}
275+
276+
outputFile := filepath.Join(outputPath, captureName, fileName+".tar.gz")
277+
fmt.Printf("Bytes retrieved: %d\n", len(content))
278+
279+
err = os.WriteFile(outputFile, content, 0o600)
280+
if err != nil {
281+
return errors.Join(ErrWriteFileToHost, err)
282+
}
283+
284+
fmt.Printf("File written to: %s\n", outputFile)
285+
return nil
286+
}
287+
288+
// DownloadFileContent downloads a capture file from a specific node and returns the content
289+
func (ds *DownloadService) DownloadFileContent(ctx context.Context, nodeName, hostPath, fileName, captureName string) ([]byte, error) {
251290
node, err := ds.kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
252291
if err != nil {
253-
return errors.Join(ErrGetNodeInfo, err)
292+
return nil, errors.Join(ErrGetNodeInfo, err)
254293
}
255294

256295
downloadCmd, err := getDownloadCmd(node, hostPath, fileName)
257296
if err != nil {
258-
return err
297+
return nil, err
259298
}
260299

261300
fmt.Println("File to be downloaded: ", downloadCmd.SrcFilePath)
262301
downloadPod, err := ds.createDownloadPod(ctx, nodeName, hostPath, captureName, downloadCmd)
263302
if err != nil {
264-
return err
303+
return nil, err
265304
}
266305

306+
// Ensure cleanup
307+
defer func() {
308+
cleanupErr := ds.kubeClient.CoreV1().Pods(ds.namespace).Delete(ctx, downloadPod.Name, metav1.DeleteOptions{})
309+
if cleanupErr != nil {
310+
retinacmd.Logger.Warn("Failed to clean up debug pod", zap.String("name", downloadPod.Name), zap.Error(cleanupErr))
311+
}
312+
}()
313+
267314
fileExists, err := ds.verifyFileExists(ctx, downloadPod, downloadCmd)
268315
if err != nil || !fileExists {
269-
return err
316+
return nil, err
270317
}
271318

272319
fmt.Println("Obtaining file...")
273320
fileContent, err := ds.executeFileDownload(ctx, downloadPod, downloadCmd)
274321
if err != nil {
275-
return err
276-
}
277-
278-
outputFile := filepath.Join(outputPath, captureName, fileName+".tar.gz")
279-
fmt.Printf("Bytes retrieved: %d\n", len(fileContent))
280-
281-
err = os.WriteFile(outputFile, fileContent, 0o600)
282-
if err != nil {
283-
return errors.Join(ErrWriteFileToHost, err)
322+
return nil, err
284323
}
285324

286-
fmt.Printf("File written to: %s\n", outputFile)
287-
288-
// Ensure cleanup
289-
err = ds.kubeClient.CoreV1().Pods(ds.namespace).Delete(ctx, downloadPod.Name, metav1.DeleteOptions{})
290-
if err != nil {
291-
retinacmd.Logger.Warn("Failed to clean up debug pod", zap.String("name", downloadPod.Name), zap.Error(err))
292-
}
293-
return nil
325+
return fileContent, nil
294326
}
295327

296328
func getCapturePods(ctx context.Context, kubeClient kubernetes.Interface, captureName, namespace string) (*corev1.PodList, error) {
@@ -513,6 +545,195 @@ func downloadFromBlob() error {
513545
return nil
514546
}
515547

548+
func downloadAllCaptures(ctx context.Context, config *rest.Config, namespace string) error {
549+
if downloadAllNamespaces {
550+
fmt.Println("Downloading all captures from all namespaces...")
551+
} else {
552+
fmt.Printf("Downloading all captures for namespace %s...\n", namespace)
553+
}
554+
kubeClient, err := kubernetes.NewForConfig(config)
555+
if err != nil {
556+
return fmt.Errorf("failed to initialize k8s client: %w", err)
557+
}
558+
559+
// List all capture jobs with the capture app label
560+
captureJobSelector := &metav1.LabelSelector{
561+
MatchLabels: map[string]string{
562+
captureLabels.AppLabel: captureConstants.CaptureAppname,
563+
},
564+
}
565+
labelSelector, err := metav1.LabelSelectorAsSelector(captureJobSelector)
566+
if err != nil {
567+
return fmt.Errorf("failed to parse label selector: %w", err)
568+
}
569+
570+
var jobList *batchv1.JobList
571+
if downloadAllNamespaces {
572+
// Search across all namespaces
573+
jobList, err = kubeClient.BatchV1().Jobs("").List(ctx, metav1.ListOptions{
574+
LabelSelector: labelSelector.String(),
575+
})
576+
if err != nil {
577+
return fmt.Errorf("failed to list capture jobs across all namespaces: %w", err)
578+
}
579+
} else {
580+
// Search in specified namespace only
581+
jobList, err = kubeClient.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{
582+
LabelSelector: labelSelector.String(),
583+
})
584+
if err != nil {
585+
return fmt.Errorf("failed to list capture jobs: %w", err)
586+
}
587+
}
588+
589+
if len(jobList.Items) == 0 {
590+
if downloadAllNamespaces {
591+
fmt.Printf("No captures found across all namespaces\n")
592+
} else {
593+
fmt.Printf("No captures found in namespace %s\n", namespace)
594+
}
595+
return nil
596+
}
597+
598+
// Group jobs by capture name and namespace
599+
captureToJobs := make(map[Key][]batchv1.Job)
600+
for i := range jobList.Items {
601+
job := &jobList.Items[i]
602+
captureNameFromLabel, ok := job.Labels[captureLabels.CaptureNameLabel]
603+
if !ok {
604+
continue
605+
}
606+
key := Key{Name: captureNameFromLabel, Namespace: job.Namespace}
607+
captureToJobs[key] = append(captureToJobs[key], *job)
608+
}
609+
610+
fmt.Printf("Found %d capture(s) to download\n", len(captureToJobs))
611+
612+
// Create the final archive using streaming approach to avoid memory issues
613+
timestamp := captureFile.TimeToString(captureFile.Now())
614+
finalArchivePath := filepath.Join(outputPath, fmt.Sprintf("all-captures-%s.tar.gz", timestamp))
615+
616+
fmt.Printf("Creating final archive: %s\n", finalArchivePath)
617+
err = createStreamingTarGzArchive(ctx, finalArchivePath, captureToJobs, kubeClient, config)
618+
if err != nil {
619+
return fmt.Errorf("failed to create final archive: %w", err)
620+
}
621+
622+
fmt.Printf("Successfully created archive: %s\n", finalArchivePath)
623+
return nil
624+
}
625+
626+
// createStreamingTarGzArchive creates a tar.gz archive by streaming files one at a time to avoid memory issues
627+
func createStreamingTarGzArchive(ctx context.Context, outputPath string, captureToJobs map[Key][]batchv1.Job, kubeClient kubernetes.Interface, config *rest.Config) error {
628+
// Create the output file
629+
outFile, err := os.Create(outputPath)
630+
if err != nil {
631+
return fmt.Errorf("failed to create archive file: %w", err)
632+
}
633+
defer outFile.Close()
634+
635+
// Create gzip writer
636+
gzipWriter := gzip.NewWriter(outFile)
637+
defer gzipWriter.Close()
638+
639+
// Create tar writer
640+
tarWriter := tar.NewWriter(gzipWriter)
641+
defer tarWriter.Close()
642+
643+
// We'll create download services per namespace as needed
644+
downloadServices := make(map[string]*DownloadService)
645+
fileCount := 0
646+
647+
// Process each capture and stream files directly to archive
648+
for captureKey := range captureToJobs {
649+
currentCaptureName := captureKey.Name
650+
currentNamespace := captureKey.Namespace
651+
fmt.Printf("Processing capture: %s in namespace: %s\n", currentCaptureName, currentNamespace)
652+
653+
// Get or create download service for this namespace
654+
downloadService, exists := downloadServices[currentNamespace]
655+
if !exists {
656+
downloadService = NewDownloadService(kubeClient, config, currentNamespace)
657+
downloadServices[currentNamespace] = downloadService
658+
}
659+
660+
// Get pods for this capture and download files
661+
pods, podsErr := getCapturePods(ctx, kubeClient, currentCaptureName, currentNamespace)
662+
if podsErr != nil {
663+
fmt.Printf("Warning: Failed to get pods for capture %s in namespace %s: %v\n", currentCaptureName, currentNamespace, podsErr)
664+
continue
665+
}
666+
667+
for i := range pods.Items {
668+
pod := pods.Items[i]
669+
if pod.Status.Phase != corev1.PodSucceeded {
670+
fmt.Printf("Warning: Pod %s is not in Succeeded phase (status: %s), skipping\n", pod.Name, pod.Status.Phase)
671+
continue
672+
}
673+
674+
nodeName := pod.Spec.NodeName
675+
hostPath, ok := pod.Annotations[captureConstants.CaptureHostPathAnnotationKey]
676+
if !ok {
677+
fmt.Printf("Warning: Cannot obtain host path from pod annotations for %s\n", pod.Name)
678+
continue
679+
}
680+
fileName, ok := pod.Annotations[captureConstants.CaptureFilenameAnnotationKey]
681+
if !ok {
682+
fmt.Printf("Warning: Cannot obtain capture file name from pod annotations for %s\n", pod.Name)
683+
continue
684+
}
685+
686+
// Download file content (this is still done in memory per file, but not all files at once)
687+
content, err := downloadService.DownloadFileContent(ctx, nodeName, hostPath, fileName, currentCaptureName)
688+
if err != nil {
689+
fmt.Printf("Warning: Failed to download file from pod %s: %v\n", pod.Name, err)
690+
continue
691+
}
692+
693+
// Determine archive path based on whether we're using all namespaces
694+
var archivePath string
695+
if downloadAllNamespaces {
696+
// Include namespace in path: namespace/captureName/fileName.tar.gz
697+
archivePath = filepath.Join(currentNamespace, currentCaptureName, fileName+".tar.gz")
698+
} else {
699+
// Original path: captureName/fileName.tar.gz
700+
archivePath = filepath.Join(currentCaptureName, fileName+".tar.gz")
701+
}
702+
703+
// Stream file directly to archive
704+
header := &tar.Header{
705+
Name: archivePath,
706+
Mode: 0o600,
707+
Size: int64(len(content)),
708+
}
709+
710+
// Write header
711+
if err := tarWriter.WriteHeader(header); err != nil {
712+
return fmt.Errorf("failed to write header for %s: %w", archivePath, err)
713+
}
714+
715+
// Write content
716+
if _, err := tarWriter.Write(content); err != nil {
717+
return fmt.Errorf("failed to write content for %s: %w", archivePath, err)
718+
}
719+
720+
fileCount++
721+
fmt.Printf("Added %s (%d bytes) to archive\n", archivePath, len(content))
722+
}
723+
}
724+
725+
if fileCount == 0 {
726+
// Remove the empty archive file
727+
outFile.Close()
728+
os.Remove(outputPath)
729+
fmt.Println("No capture files were successfully downloaded")
730+
return nil
731+
}
732+
733+
fmt.Printf("Successfully added %d files to archive\n", fileCount)
734+
return nil
735+
}
736+
516737
func NewDownloadSubCommand() *cobra.Command {
517738
downloadCapture := &cobra.Command{
518739
Use: "download",
@@ -534,8 +755,13 @@ func NewDownloadSubCommand() *cobra.Command {
534755
captureNamespace = "default"
535756
}
536757

537-
if captureName == "" && blobURL == "" {
538-
return errors.New("either --name or --blob-url must be specified")
758+
if captureName == "" && blobURL == "" && !downloadAll {
759+
return ErrMissingRequiredFlags
760+
}
761+
762+
// Validate all-namespaces flag usage
763+
if downloadAllNamespaces && !downloadAll {
764+
return ErrAllNamespacesRequiresAll
539765
}
540766

541767
if captureName != "" {
@@ -552,12 +778,21 @@ func NewDownloadSubCommand() *cobra.Command {
552778
}
553779
}
554780

781+
if downloadAll {
782+
err = downloadAllCaptures(ctx, kubeConfig, captureNamespace)
783+
if err != nil {
784+
return err
785+
}
786+
}
787+
555788
return nil
556789
},
557790
}
558791

559792
downloadCapture.Flags().StringVar(&blobURL, "blob-url", "", "Blob URL from which to download")
560793
downloadCapture.Flags().StringVar(&captureName, "name", "", "The name of a the capture")
794+
downloadCapture.Flags().BoolVar(&downloadAll, "all", false, "Download all available captures for the specified namespace (or all namespaces if --all-namespaces flag is set)")
795+
downloadCapture.Flags().BoolVar(&downloadAllNamespaces, "all-namespaces", false, "Download captures from all namespaces (only works with --all flag)")
561796
downloadCapture.Flags().StringVarP(&outputPath, "output", "o", DefaultOutputPath, "Path to save the downloaded capture")
562797

563798
return downloadCapture

0 commit comments

Comments
 (0)