Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,22 @@ show-coverage: coverage-html
##@ Build

.PHONY: build
build: vet bin/vm-builder ## Build all neonvm binaries.
build: desugar vet bin/vm-builder ## Build all neonvm binaries.
GOOS=linux go build -o bin/controller neonvm-controller/cmd/*.go
GOOS=linux go build -o bin/vxlan-controller neonvm-vxlan-controller/cmd/*.go
GOOS=linux go build -o bin/runner neonvm-runner/cmd/*.go
GOOS=linux go build -o bin/daemon neonvm-daemon/cmd/*.go
GOOS=linux go build -o bin/autoscaler-agent autoscaler-agent/cmd/*.go
GOOS=linux go build -o bin/scheduler autoscale-scheduler/cmd/*.go

.PHONY: resugar
resugar:
./scripts/resugar.sh

.PHONY: desugar
desugar:
./scripts/desugar.sh

.PHONY: bin/vm-builder
bin/vm-builder: ## Build vm-builder binary.
CGO_ENABLED=0 go build -o bin/vm-builder -ldflags "-X main.Version=${GIT_INFO} -X main.NeonvmDaemonImage=${IMG_DAEMON}" vm-builder/main.go
Expand Down
3 changes: 1 addition & 2 deletions pkg/agent/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ func NewMetricsCollector(
) (*MetricsCollector, error) {
logger := parentLogger.Named("billing")

clients, err := createClients(ctx, logger, conf.Clients)
if err != nil {
clients := createClients(ctx, logger, conf.Clients) handle err {
return nil, err
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/agent/billing/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) (
}
if c := cfg.AzureBlob; c != nil {
generateKey := newBlobStorageKeyGenerator(c.PrefixInContainer)
client, err := reporting.NewAzureBlobStorageClient(c.AzureBlobStorageClientConfig, generateKey)
if err != nil {
client := reporting.NewAzureBlobStorageClient(c.AzureBlobStorageClientConfig, generateKey) handle err {
return nil, fmt.Errorf("error creating Azure Blob Storage client: %w", err)
}
logger.Info("Created Azure Blob Storage client for billing events", zap.Any("config", c))
Expand All @@ -74,8 +73,7 @@ func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) (
}
if c := cfg.S3; c != nil {
generateKey := newBlobStorageKeyGenerator(c.PrefixInBucket)
client, err := reporting.NewS3Client(ctx, c.S3ClientConfig, generateKey)
if err != nil {
client := reporting.NewS3Client(ctx, c.S3ClientConfig, generateKey) handle err {
return nil, fmt.Errorf("error creating S3 client: %w", err)
}
logger.Info("Created S3 client for billing events", zap.Any("config", c))
Expand Down
3 changes: 1 addition & 2 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ type NeonVMConfig struct {
}

func ReadConfig(path string) (*Config, error) {
file, err := os.Open(path)
if err != nil {
file := os.Open(path) handle err {
return nil, fmt.Errorf("Error opening config file %q: %w", path, err)
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/agent/core/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ type FromPrometheus interface {
// FromPrometheus to populate it before returning.
func ParseMetrics(content io.Reader, metrics FromPrometheus) error {
var parser promfmt.TextParser
mfs, err := parser.TextToMetricFamilies(content)
if err != nil {
mfs := parser.TextToMetricFamilies(content) handle err {
return fmt.Errorf("failed to parse content as prometheus text format: %w", err)
}

Expand Down Expand Up @@ -178,8 +177,7 @@ func extractWorkingSetSizeWindows(mfs map[string]*promtypes.MetricFamily) ([]flo
return nil, fmt.Errorf("metric missing label %q", durationLabel)
}

durationSeconds, err := strconv.Atoi(m.Label[durationIndex].GetValue())
if err != nil {
durationSeconds := strconv.Atoi(m.Label[durationIndex].GetValue()) handle err {
return nil, fmt.Errorf("couldn't parse metric's %q label as int: %w", durationLabel, err)
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/agent/core/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,7 @@ func doInitialPluginRequest(

// helper function to parse a duration
func duration(s string) time.Duration {
d, err := time.ParseDuration(s)
if err != nil {
d := time.ParseDuration(s) handle err {
panic(fmt.Errorf("failed to parse duration: %w", err))
}
return d
Expand Down
3 changes: 1 addition & 2 deletions pkg/agent/core/testhelpers/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ type FakeClock struct {

// NewFakeClock creates a new fake clock, with the initial time set to an unspecified, round number.
func NewFakeClock(t *testing.T) *FakeClock {
base, err := time.Parse(time.RFC3339, "2000-01-01T00:00:00Z") // a nice round number, to make things easier
if err != nil {
base := time.Parse(time.RFC3339, "2000-01-01T00:00:00Z") // a nice round number, to make things easier handle err {
panic(err)
}

Expand Down
15 changes: 5 additions & 10 deletions pkg/agent/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ func NewDispatcher(
}()

connectTimeout := time.Second * time.Duration(runner.global.config.Monitor.ConnectionTimeoutSeconds)
conn, protoVersion, err := connectToMonitor(ctx, logger, addr, connectTimeout)
if err != nil {
conn, protoVersion := connectToMonitor(ctx, logger, addr, connectTimeout) handle err {
return nil, err
}

Expand Down Expand Up @@ -239,8 +238,7 @@ func connectToMonitor(

// We do not need to close the response body according to docs.
// Doing so causes memory bugs.
c, _, err := websocket.Dial(ctx, addr, nil) //nolint:bodyclose // see comment above
if err != nil {
c, _ := websocket.Dial(ctx, addr, nil) //nolint:bodyclose // see comment above handle err {
return nil, nil, fmt.Errorf("error establishing websocket connection to %s: %w", addr, err)
}

Expand Down Expand Up @@ -321,8 +319,7 @@ func (disp *Dispatcher) lenWaiters() int {
// Send a message down the connection. Only call this method with types that
// SerializeMonitorMessage can handle.
func (disp *Dispatcher) send(ctx context.Context, logger *zap.Logger, id uint64, message any) error {
data, err := api.SerializeMonitorMessage(message, id)
if err != nil {
data := api.SerializeMonitorMessage(message, id) handle err {
return fmt.Errorf("error serializing message: %w", err)
}
// wsjson.Write serializes whatever is passed in, and go serializes []byte
Expand Down Expand Up @@ -448,15 +445,13 @@ func (disp *Dispatcher) HandleMessage(
return fmt.Errorf("Error deserializing message: %q", string(message))
}

typeStr, err := extractField[string](unstructured, "type")
if err != nil {
typeStr := extractField[string](unstructured, "type") handle err {
return fmt.Errorf("Error extracting 'type' field: %w", err)
}

// go thinks all json numbers are float64 so we first deserialize to that to
// avoid the type error, then cast to uint64
f, err := extractField[float64](unstructured, "id")
if err != nil {
f := extractField[float64](unstructured, "id") handle err {
return fmt.Errorf("Error extracting 'id field: %w", err)
}
id := uint64(*f)
Expand Down
6 changes: 2 additions & 4 deletions pkg/agent/dumpstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ type StateDump struct {
func (s *agentState) StartDumpStateServer(shutdownCtx context.Context, logger *zap.Logger, config *DumpStateConfig) error {
// Manually start the TCP listener so we can minimize errors in the background thread.
addr := net.TCPAddr{IP: net.IPv4zero, Port: int(config.Port)}
listener, err := net.ListenTCP("tcp", &addr)
if err != nil {
listener := net.ListenTCP("tcp", &addr) handle err {
return fmt.Errorf("Error binding to %v", addr)
}

Expand All @@ -41,8 +40,7 @@ func (s *agentState) StartDumpStateServer(shutdownCtx context.Context, logger *z
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

state, err := s.DumpState(ctx, shutdownCtx.Err() != nil)
if err != nil {
state := s.DumpState(ctx, shutdownCtx.Err() != nil) handle err {
if ctx.Err() != nil && errors.Is(ctx.Err(), context.DeadlineExceeded) {
totalDuration := time.Since(startTime)
return nil, 500, fmt.Errorf("timed out after %s while getting state", totalDuration)
Expand Down
15 changes: 5 additions & 10 deletions pkg/agent/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,19 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error {
watchMetrics := watch.NewMetrics("autoscaling_agent_watchers", globalPromReg)

logger.Info("Starting VM watcher")
vmWatchStore, err := startVMWatcher(ctx, logger, r.Config, r.VMClient, watchMetrics, perVMMetrics, r.EnvArgs.K8sNodeName, pushToQueue)
if err != nil {
vmWatchStore := startVMWatcher(ctx, logger, r.Config, r.VMClient, watchMetrics, perVMMetrics, r.EnvArgs.K8sNodeName, pushToQueue) handle err {
return fmt.Errorf("Error starting VM watcher: %w", err)
}
defer vmWatchStore.Stop()
logger.Info("VM watcher started")

schedTracker, err := schedwatch.StartSchedulerWatcher(ctx, logger, r.KubeClient, watchMetrics, r.Config.Scheduler.SchedulerName)
if err != nil {
schedTracker := schedwatch.StartSchedulerWatcher(ctx, logger, r.KubeClient, watchMetrics, r.Config.Scheduler.SchedulerName) handle err {
return fmt.Errorf("Starting scheduler watch server: %w", err)
}
defer schedTracker.Stop()

scalingEventsMetrics := scalingevents.NewPromMetrics(globalPromReg)
scalingReporter, err := scalingevents.NewReporter(ctx, logger, &r.Config.ScalingEvents, scalingEventsMetrics)
if err != nil {
scalingReporter := scalingevents.NewReporter(ctx, logger, &r.Config.ScalingEvents, scalingEventsMetrics) handle err {
return fmt.Errorf("Error creating scaling events reporter: %w", err)
}

Expand Down Expand Up @@ -88,8 +85,7 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error {
}
}

mc, err := billing.NewMetricsCollector(ctx, logger, &r.Config.Billing, billingMetrics)
if err != nil {
mc := billing.NewMetricsCollector(ctx, logger, &r.Config.Billing, billingMetrics) handle err {
return fmt.Errorf("error creating billing metrics collector: %w", err)
}

Expand All @@ -103,8 +99,7 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error {
tg.Go("main-loop", func(logger *zap.Logger) error {
logger.Info("Entering main loop")
for {
event, err := vmEventQueue.Wait(ctx)
if err != nil {
event := vmEventQueue.Wait(ctx) handle err {
if ctx.Err() != nil {
// treat context canceled as a "normal" exit (because it is)
return nil
Expand Down
21 changes: 7 additions & 14 deletions pkg/agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,7 @@ func (r *Runner) connectToMonitorLoop(
}

lastStart = time.Now()
dispatcher, err := NewDispatcher(ctx, logger, addr, r, callbacks.upscaleRequested)
if err != nil {
dispatcher := NewDispatcher(ctx, logger, addr, r, callbacks.upscaleRequested) handle err {
logger.Error("Failed to connect to vm-monitor", zap.String("addr", addr), zap.Error(err))
continue
}
Expand Down Expand Up @@ -695,8 +694,7 @@ func doMetricsRequest(
reqCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, url, bytes.NewReader(nil))
if err != nil {
req := http.NewRequestWithContext(reqCtx, http.MethodGet, url, bytes.NewReader(nil)) handle err {
panic(fmt.Errorf("Error constructing metrics request to %q: %w", url, err))
}

Expand Down Expand Up @@ -740,8 +738,7 @@ func (r *Runner) doNeonVMRequest(
Value: targetRevision,
}}

patchPayload, err := json.Marshal(patches)
if err != nil {
patchPayload := json.Marshal(patches) handle err {
panic(fmt.Errorf("Error marshalling JSON patch: %w", err))
}

Expand Down Expand Up @@ -868,8 +865,7 @@ func (r *Runner) DoSchedulerRequest(
return nil, err
}

reqBody, err := json.Marshal(reqData)
if err != nil {
reqBody := json.Marshal(reqData) handle err {
return nil, fmt.Errorf("Error encoding request JSON: %w", err)
}

Expand All @@ -879,16 +875,14 @@ func (r *Runner) DoSchedulerRequest(

url := fmt.Sprintf("http://%s:%d/", sched.IP, r.global.config.Scheduler.RequestPort)

request, err := http.NewRequestWithContext(reqCtx, http.MethodPost, url, bytes.NewReader(reqBody))
if err != nil {
request := http.NewRequestWithContext(reqCtx, http.MethodPost, url, bytes.NewReader(reqBody)) handle err {
return nil, fmt.Errorf("Error building request to %q: %w", url, err)
}
request.Header.Set("content-type", "application/json")

logger.Debug("Sending request to scheduler", zap.Any("request", reqData))

response, err := http.DefaultClient.Do(request)
if err != nil {
response := http.DefaultClient.Do(request) handle err {
description := fmt.Sprintf("[error doing request: %s]", util.RootError(err))
r.global.metrics.schedulerRequests.WithLabelValues(description).Inc()
return nil, fmt.Errorf("Error doing request: %w", err)
Expand All @@ -897,8 +891,7 @@ func (r *Runner) DoSchedulerRequest(

r.global.metrics.schedulerRequests.WithLabelValues(strconv.Itoa(response.StatusCode)).Inc()

respBody, err := io.ReadAll(response.Body)
if err != nil {
respBody := io.ReadAll(response.Body) handle err {
return nil, fmt.Errorf("Error reading body for response: %w", err)
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/agent/scalingevents/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) (

if c := cfg.AzureBlob; c != nil {
generateKey := newBlobStorageKeyGenerator(c.PrefixInContainer)
client, err := reporting.NewAzureBlobStorageClient(c.AzureBlobStorageClientConfig, generateKey)
if err != nil {
client := reporting.NewAzureBlobStorageClient(c.AzureBlobStorageClientConfig, generateKey) handle err {
return nil, fmt.Errorf("error creating Azure Blob Storage client: %w", err)
}
logger.Info("Created Azure Blob Storage client for scaling events", zap.Any("config", c))
Expand All @@ -50,8 +49,7 @@ func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) (
}
if c := cfg.S3; c != nil {
generateKey := newBlobStorageKeyGenerator(c.PrefixInBucket)
client, err := reporting.NewS3Client(ctx, c.S3ClientConfig, generateKey)
if err != nil {
client := reporting.NewS3Client(ctx, c.S3ClientConfig, generateKey) handle err {
return nil, fmt.Errorf("error creating S3 client: %w", err)
}
logger.Info("Created S3 client for scaling events", zap.Any("config", c))
Expand Down
3 changes: 1 addition & 2 deletions pkg/agent/scalingevents/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ func NewReporter(
) (*Reporter, error) {
logger := parentLogger.Named("scalingevents")

clients, err := createClients(ctx, logger, conf.Clients)
if err != nil {
clients := createClients(ctx, logger, conf.Clients) handle err {
return nil, err
}

Expand Down
12 changes: 4 additions & 8 deletions pkg/agent/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ func startVMWatcher(
setVMMetrics(perVMMetrics, vm, nodeName)

if vmIsOurResponsibility(vm, config, nodeName) {
event, err := makeVMEvent(logger, vm, vmEventAdded)
if err != nil {
event := makeVMEvent(logger, vm, vmEventAdded) handle err {
logger.Error(
"Failed to create vmEvent for added VM",
util.VMNameFields(vm), zap.Error(err),
Expand Down Expand Up @@ -128,8 +127,7 @@ func startVMWatcher(
eventKind = vmEventUpdated
}

event, err := makeVMEvent(logger, vmForEvent, eventKind)
if err != nil {
event := makeVMEvent(logger, vmForEvent, eventKind) handle err {
logger.Error(
"Failed to create vmEvent for updated VM",
util.VMNameFields(vmForEvent), zap.Error(err),
Expand All @@ -143,8 +141,7 @@ func startVMWatcher(
deleteVMMetrics(perVMMetrics, vm, nodeName)

if vmIsOurResponsibility(vm, config, nodeName) {
event, err := makeVMEvent(logger, vm, vmEventDeleted)
if err != nil {
event := makeVMEvent(logger, vm, vmEventDeleted) handle err {
logger.Error(
"Failed to create vmEvent for deleted VM",
util.VMNameFields(vm), zap.Error(err),
Expand All @@ -159,8 +156,7 @@ func startVMWatcher(
}

func makeVMEvent(logger *zap.Logger, vm *vmv1.VirtualMachine, kind vmEventKind) (vmEvent, error) {
info, err := api.ExtractVmInfo(logger, vm)
if err != nil {
info := api.ExtractVmInfo(logger, vm) handle err {
return vmEvent{}, fmt.Errorf("Error extracting VM info: %w", err)
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/api/vminfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ func (vm VmInfo) NamespacedName() util.NamespacedName {

func ExtractVmInfo(logger *zap.Logger, vm *vmv1.VirtualMachine) (*VmInfo, error) {
logger = logger.With(util.VMNameFields(vm))
info, err := extractVmInfoGeneric(logger, vm.Name, vm, vm.Spec.Resources())
if err != nil {
info := extractVmInfoGeneric(logger, vm.Name, vm, vm.Spec.Resources()) handle err {
return nil, fmt.Errorf("error extracting VM info: %w", err)
}

Expand All @@ -194,8 +193,7 @@ func ExtractVmInfo(logger *zap.Logger, vm *vmv1.VirtualMachine) (*VmInfo, error)
func ExtractVmInfoFromPod(logger *zap.Logger, pod *corev1.Pod) (*VmInfo, error) {
logger = logger.With(util.PodNameFields(pod))

resources, err := vmv1.VirtualMachineResourcesFromPod(pod)
if err != nil {
resources := vmv1.VirtualMachineResourcesFromPod(pod) handle err {
return nil, err
}

Expand Down
Loading
Loading