Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* [BUGFIX] Scheduler: Fix memory leak by properly cleaning up query fragment registry. #7148
* [BUGFIX] Compactor: Add back deletion of partition group info file even if not complete #7157
* [BUGFIX] Query Frontend: Add Native Histogram extraction logic in results cache #7167
* [ENHANCEMENT] Distributor/Ingester: Add `type` label to `cortex_discarded_samples_total` metric to distinguish between float and native histogram samples. #6221

## 1.20.1 2025-12-03

Expand Down
59 changes: 47 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,12 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
}

if errors.Is(err, ha.TooManyReplicaGroupsError{}) {
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numFloatSamples + numHistogramSamples))
if numFloatSamples > 0 {
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID, validation.SampleTypeFloat).Add(float64(numFloatSamples))
}
if numHistogramSamples > 0 {
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID, validation.SampleTypeHistogram).Add(float64(numHistogramSamples))
}
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}
return nil, err
Expand Down Expand Up @@ -805,7 +810,12 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
totalSamples := validatedFloatSamples + validatedHistogramSamples
totalN := totalSamples + validatedExemplars + len(validatedMetadata)
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(totalSamples))
if validatedFloatSamples > 0 {
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID, validation.SampleTypeFloat).Add(float64(validatedFloatSamples))
}
if validatedHistogramSamples > 0 {
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID, validation.SampleTypeHistogram).Add(float64(validatedHistogramSamples))
}
d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
// Return a 429 here to tell the client it is going too fast.
Expand All @@ -820,7 +830,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
var nativeHistogramErr error

if !d.nativeHistogramIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) {
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramRateLimited, userID).Add(float64(validatedHistogramSamples))
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramRateLimited, userID, validation.SampleTypeHistogram).Add(float64(validatedHistogramSamples))
nativeHistogramErr = httpgrpc.Errorf(http.StatusTooManyRequests, "native histogram ingestion rate limit (%v) exceeded while adding %d native histogram samples", d.nativeHistogramIngestionRateLimiter.Limit(now, userID), validatedHistogramSamples)
validatedHistogramSamples = 0
} else {
Expand Down Expand Up @@ -1043,7 +1053,12 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(len(ts.Samples) + len(ts.Histograms)))
}
if errors.Is(err, ha.TooManyReplicaGroupsError{}) {
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(len(ts.Samples) + len(ts.Histograms)))
if len(ts.Samples) > 0 {
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID, validation.SampleTypeFloat).Add(float64(len(ts.Samples)))
}
if len(ts.Histograms) > 0 {
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID, validation.SampleTypeHistogram).Add(float64(len(ts.Histograms)))
}
}

continue
Expand All @@ -1067,10 +1082,20 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
l, _ := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...)
if l.Len() == 0 {
// all labels are gone, samples will be discarded
d.validateMetrics.DiscardedSamples.WithLabelValues(
validation.DroppedByRelabelConfiguration,
userID,
).Add(float64(len(ts.Samples) + len(ts.Histograms)))
if len(ts.Samples) > 0 {
d.validateMetrics.DiscardedSamples.WithLabelValues(
validation.DroppedByRelabelConfiguration,
userID,
validation.SampleTypeFloat,
).Add(float64(len(ts.Samples)))
}
if len(ts.Histograms) > 0 {
d.validateMetrics.DiscardedSamples.WithLabelValues(
validation.DroppedByRelabelConfiguration,
userID,
validation.SampleTypeHistogram,
).Add(float64(len(ts.Histograms)))
}

// all labels are gone, exemplars will be discarded
d.validateMetrics.DiscardedExemplars.WithLabelValues(
Expand All @@ -1097,10 +1122,20 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
removeEmptyLabels(&ts.Labels)

if len(ts.Labels) == 0 {
d.validateMetrics.DiscardedSamples.WithLabelValues(
validation.DroppedByUserConfigurationOverride,
userID,
).Add(float64(len(ts.Samples) + len(ts.Histograms)))
if len(ts.Samples) > 0 {
d.validateMetrics.DiscardedSamples.WithLabelValues(
validation.DroppedByUserConfigurationOverride,
userID,
validation.SampleTypeFloat,
).Add(float64(len(ts.Samples)))
}
if len(ts.Histograms) > 0 {
d.validateMetrics.DiscardedSamples.WithLabelValues(
validation.DroppedByUserConfigurationOverride,
userID,
validation.SampleTypeHistogram,
).Add(float64(len(ts.Histograms)))
}

d.validateMetrics.DiscardedExemplars.WithLabelValues(
validation.DroppedByUserConfigurationOverride,
Expand Down
18 changes: 9 additions & 9 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1618,32 +1618,32 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
i.metrics.ingestedExemplarsFail.Add(float64(failedExemplarsCount))

if sampleOutOfBoundsCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(sampleOutOfBounds, userID).Add(float64(sampleOutOfBoundsCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(sampleOutOfBounds, userID, validation.SampleTypeFloat).Add(float64(sampleOutOfBoundsCount))
}
if sampleOutOfOrderCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Add(float64(sampleOutOfOrderCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID, validation.SampleTypeFloat).Add(float64(sampleOutOfOrderCount))
}
if sampleTooOldCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(sampleTooOld, userID).Add(float64(sampleTooOldCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(sampleTooOld, userID, validation.SampleTypeFloat).Add(float64(sampleTooOldCount))
}
if newValueForTimestampCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Add(float64(newValueForTimestampCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID, validation.SampleTypeFloat).Add(float64(newValueForTimestampCount))
}
if perUserSeriesLimitCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(perUserSeriesLimit, userID).Add(float64(perUserSeriesLimitCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(perUserSeriesLimit, userID, validation.SampleTypeFloat).Add(float64(perUserSeriesLimitCount))
}
if perUserNativeHistogramSeriesLimitCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(perUserNativeHistogramSeriesLimit, userID).Add(float64(perUserNativeHistogramSeriesLimitCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(perUserNativeHistogramSeriesLimit, userID, validation.SampleTypeHistogram).Add(float64(perUserNativeHistogramSeriesLimitCount))
}
if perMetricSeriesLimitCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID, validation.SampleTypeFloat).Add(float64(perMetricSeriesLimitCount))
}
if perLabelSetSeriesLimitCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(perLabelsetSeriesLimit, userID).Add(float64(perLabelSetSeriesLimitCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(perLabelsetSeriesLimit, userID, validation.SampleTypeFloat).Add(float64(perLabelSetSeriesLimitCount))
}

if !i.limits.EnableNativeHistograms(userID) && discardedNativeHistogramCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramSample, userID).Add(float64(discardedNativeHistogramCount))
i.validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramSample, userID, validation.SampleTypeHistogram).Add(float64(discardedNativeHistogramCount))
}

for h, counter := range reasonCounter.counters {
Expand Down
50 changes: 27 additions & 23 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ const (
// The combined length of the label names and values of an Exemplar's LabelSet MUST NOT exceed 128 UTF-8 characters
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars
ExemplarMaxLabelSetLength = 128

// Sample type constants for discarded samples metric
SampleTypeFloat = "float"
SampleTypeHistogram = "histogram"
)

type ValidateMetrics struct {
Expand Down Expand Up @@ -109,7 +113,7 @@ func NewValidateMetrics(r prometheus.Registerer) *ValidateMetrics {
Name: "cortex_discarded_samples_total",
Help: "The total number of samples that were discarded.",
},
[]string{discardReasonLabel, "user"},
[]string{discardReasonLabel, "user", "type"},
)
registerCollector(r, discardedSamples)
discardedSamplesPerLabelSet := prometheus.NewCounterVec(
Expand Down Expand Up @@ -192,14 +196,14 @@ func NewValidateMetrics(r prometheus.Registerer) *ValidateMetrics {
// Used in test only for now.
func (m *ValidateMetrics) updateSamplesDiscardedForSeries(userID, reason string, labelSetLimits []LimitsPerLabelSet, lbls labels.Labels, count int) {
matchedLimits := LimitsPerLabelSetsForSeries(labelSetLimits, lbls)
m.updateSamplesDiscarded(userID, reason, matchedLimits, count)
m.updateSamplesDiscarded(userID, reason, SampleTypeFloat, matchedLimits, count)
}

// updateSamplesDiscarded updates discarded samples and discarded samples per labelset for the provided reason.
// The provided label set needs to be pre-filtered to match the series if applicable.
// Used in test only for now.
func (m *ValidateMetrics) updateSamplesDiscarded(userID, reason string, labelSetLimits []LimitsPerLabelSet, count int) {
m.DiscardedSamples.WithLabelValues(reason, userID).Add(float64(count))
func (m *ValidateMetrics) updateSamplesDiscarded(userID, reason, sampleType string, labelSetLimits []LimitsPerLabelSet, count int) {
m.DiscardedSamples.WithLabelValues(reason, userID, sampleType).Add(float64(count))
for _, limit := range labelSetLimits {
m.LabelSetTracker.Track(userID, limit.Hash, limit.LabelSet)
m.DiscardedSamplesPerLabelSet.WithLabelValues(reason, userID, limit.LabelSet.String()).Add(float64(count))
Expand Down Expand Up @@ -227,12 +231,12 @@ func ValidateSampleTimestamp(validateMetrics *ValidateMetrics, limits *Limits, u
unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls)

if limits.RejectOldSamples && model.Time(timestampMs) < model.Now().Add(-time.Duration(limits.RejectOldSamplesMaxAge)) {
validateMetrics.DiscardedSamples.WithLabelValues(greaterThanMaxSampleAge, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(greaterThanMaxSampleAge, userID, SampleTypeFloat).Inc()
return newSampleTimestampTooOldError(unsafeMetricName, timestampMs)
}

if model.Time(timestampMs) > model.Now().Add(time.Duration(limits.CreationGracePeriod)) {
validateMetrics.DiscardedSamples.WithLabelValues(tooFarInFuture, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(tooFarInFuture, userID, SampleTypeFloat).Inc()
return newSampleTimestampTooNewError(unsafeMetricName, timestampMs)
}

Expand Down Expand Up @@ -282,19 +286,19 @@ func ValidateLabels(validateMetrics *ValidateMetrics, limits *Limits, userID str
if limits.EnforceMetricName {
unsafeMetricName, err := extract.UnsafeMetricNameFromLabelAdapters(ls)
if err != nil {
validateMetrics.DiscardedSamples.WithLabelValues(missingMetricName, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(missingMetricName, userID, SampleTypeFloat).Inc()
return newNoMetricNameError()
}

if !nameValidationScheme.IsValidMetricName(unsafeMetricName) {
validateMetrics.DiscardedSamples.WithLabelValues(invalidMetricName, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(invalidMetricName, userID, SampleTypeFloat).Inc()
return newInvalidMetricNameError(unsafeMetricName)
}
}

numLabelNames := len(ls)
if numLabelNames > limits.MaxLabelNamesPerSeries {
validateMetrics.DiscardedSamples.WithLabelValues(maxLabelNamesPerSeries, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(maxLabelNamesPerSeries, userID, SampleTypeFloat).Inc()
return newTooManyLabelsError(ls, limits.MaxLabelNamesPerSeries)
}

Expand All @@ -306,21 +310,21 @@ func ValidateLabels(validateMetrics *ValidateMetrics, limits *Limits, userID str

for _, l := range ls {
if !skipLabelNameValidation && !nameValidationScheme.IsValidLabelName(l.Name) {
validateMetrics.DiscardedSamples.WithLabelValues(invalidLabel, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(invalidLabel, userID, SampleTypeFloat).Inc()
return newInvalidLabelError(ls, l.Name)
} else if len(l.Name) > maxLabelNameLength {
validateMetrics.DiscardedSamples.WithLabelValues(labelNameTooLong, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(labelNameTooLong, userID, SampleTypeFloat).Inc()
return newLabelNameTooLongError(ls, l.Name, maxLabelNameLength)
} else if len(l.Value) > maxLabelValueLength {
validateMetrics.DiscardedSamples.WithLabelValues(labelValueTooLong, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(labelValueTooLong, userID, SampleTypeFloat).Inc()
return newLabelValueTooLongError(ls, l.Name, l.Value, maxLabelValueLength)
} else if cmp := strings.Compare(lastLabelName, l.Name); cmp >= 0 {
if cmp == 0 {
validateMetrics.DiscardedSamples.WithLabelValues(duplicateLabelNames, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(duplicateLabelNames, userID, SampleTypeFloat).Inc()
return newDuplicatedLabelError(ls, l.Name)
}

validateMetrics.DiscardedSamples.WithLabelValues(labelsNotSorted, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(labelsNotSorted, userID, SampleTypeFloat).Inc()
return newLabelsNotSortedError(ls, l.Name)
}

Expand All @@ -329,7 +333,7 @@ func ValidateLabels(validateMetrics *ValidateMetrics, limits *Limits, userID str
}
validateMetrics.LabelSizeBytes.WithLabelValues(userID).Observe(float64(labelsSizeBytes))
if maxLabelsSizeBytes > 0 && labelsSizeBytes > maxLabelsSizeBytes {
validateMetrics.DiscardedSamples.WithLabelValues(labelsSizeBytesExceeded, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(labelsSizeBytesExceeded, userID, SampleTypeFloat).Inc()
return labelSizeBytesExceededError(ls, labelsSizeBytes, maxLabelsSizeBytes)
}
return nil
Expand Down Expand Up @@ -371,13 +375,13 @@ func ValidateMetadata(validateMetrics *ValidateMetrics, cfg *Limits, userID stri
func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, userID string, ls []cortexpb.LabelAdapter, histogramSample cortexpb.Histogram) (cortexpb.Histogram, error) {
// sample size validation for native histogram
if limits.MaxNativeHistogramSampleSizeBytes > 0 && histogramSample.Size() > limits.MaxNativeHistogramSampleSizeBytes {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramSampleSizeBytesExceeded, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramSampleSizeBytesExceeded, userID, SampleTypeHistogram).Inc()
return cortexpb.Histogram{}, newNativeHistogramSampleSizeBytesExceededError(ls, histogramSample.Size(), limits.MaxNativeHistogramSampleSizeBytes)
}

// schema validation for native histogram
if histogramSample.Schema < histogram.ExponentialSchemaMin || histogramSample.Schema > histogram.ExponentialSchemaMax {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramInvalidSchema, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramInvalidSchema, userID, SampleTypeHistogram).Inc()
return cortexpb.Histogram{}, newNativeHistogramSchemaInvalidError(ls, int(histogramSample.Schema))
}

Expand All @@ -388,7 +392,7 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u
if histogramSample.IsFloatHistogram() {
fh := cortexpb.FloatHistogramProtoToFloatHistogram(histogramSample)
if err := fh.Validate(); err != nil {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramInvalid, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramInvalid, userID, SampleTypeHistogram).Inc()
return cortexpb.Histogram{}, newNativeHistogramInvalidError(ls, err)
}

Expand All @@ -404,14 +408,14 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u

// Exceed limit.
if histogramSample.Schema <= histogram.ExponentialSchemaMin {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID, SampleTypeHistogram).Inc()
return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets)
}

oBuckets := len(fh.PositiveBuckets) + len(fh.NegativeBuckets)
for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > limits.MaxNativeHistogramBuckets {
if fh.Schema <= histogram.ExponentialSchemaMin {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID, SampleTypeHistogram).Inc()
return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets)
}
fh = fh.ReduceResolution(fh.Schema - 1)
Expand All @@ -425,7 +429,7 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u

h := cortexpb.HistogramProtoToHistogram(histogramSample)
if err := h.Validate(); err != nil {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramInvalid, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramInvalid, userID, SampleTypeHistogram).Inc()
return cortexpb.Histogram{}, newNativeHistogramInvalidError(ls, err)
}

Expand All @@ -440,13 +444,13 @@ func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, u
}
// Exceed limit.
if histogramSample.Schema <= histogram.ExponentialSchemaMin {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID, SampleTypeHistogram).Inc()
return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets)
}
oBuckets := len(h.PositiveBuckets) + len(h.NegativeBuckets)
for len(h.PositiveBuckets)+len(h.NegativeBuckets) > limits.MaxNativeHistogramBuckets {
if h.Schema <= histogram.ExponentialSchemaMin {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc()
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID, SampleTypeHistogram).Inc()
return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets)
}
h = h.ReduceResolution(h.Schema - 1)
Expand Down
Loading