Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,10 @@ type commonConfig struct {
TopicNames ParamItem `refreshable:"true"`
TimeTicker ParamItem `refreshable:"true"`

JSONMaxLength ParamItem `refreshable:"false"`
JSONMaxLength ParamItem `refreshable:"false"`
DynamicFieldAvgLength ParamItem `refreshable:"true"`
SparseFloatVectorEstimateSize ParamItem `refreshable:"true"`
VarCharEstimateLengthAvg ParamItem `refreshable:"true"`

MetricsPort ParamItem `refreshable:"false"`

Expand Down
88 changes: 88 additions & 0 deletions pkg/util/typeutil/field_length_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package typeutil

import "sync/atomic"

const (
defaultVarCharEstimateLength = 256
defaultDynamicFieldEstimateLength = 512
defaultSparseFloatEstimateLength = 1200
)

var (
varCharEstimateLength atomic.Int64
dynamicFieldEstimateLength atomic.Int64
sparseEstimateLength atomic.Int64
)

func init() {
SetVarCharEstimateLength(defaultVarCharEstimateLength)
SetDynamicFieldEstimateLength(defaultDynamicFieldEstimateLength)
SetSparseFloatVectorEstimateLength(defaultSparseFloatEstimateLength)
}

// SetVarCharEstimateLength updates the global cap applied when estimating record sizes for VarChar fields.
func SetVarCharEstimateLength(length int) {
if length <= 0 {
length = defaultVarCharEstimateLength
}
varCharEstimateLength.Store(int64(length))
}

// GetVarCharEstimateLength returns the current cap used when estimating VarChar field sizes.
func GetVarCharEstimateLength() int {
length := int(varCharEstimateLength.Load())
if length <= 0 {
return defaultVarCharEstimateLength
}
return length
}

// SetDynamicFieldEstimateLength updates the global cap used for dynamic fields (JSON/Array/Geometry).
func SetDynamicFieldEstimateLength(length int) {
if length <= 0 {
length = defaultDynamicFieldEstimateLength
}
dynamicFieldEstimateLength.Store(int64(length))
}

// GetDynamicFieldEstimateLength returns the current cap for dynamic fields.
func GetDynamicFieldEstimateLength() int {
length := int(dynamicFieldEstimateLength.Load())
if length <= 0 {
return defaultDynamicFieldEstimateLength
}
return length
}

// SetSparseFloatVectorEstimateLength updates the fallback size used when estimating sparse float vector fields.
func SetSparseFloatVectorEstimateLength(length int) {
if length <= 0 {
length = defaultSparseFloatEstimateLength
}
sparseEstimateLength.Store(int64(length))
}

// GetSparseFloatVectorEstimateLength returns the current fallback size used for sparse float vector fields.
func GetSparseFloatVectorEstimateLength() int {
length := int(sparseEstimateLength.Load())
if length <= 0 {
return defaultSparseFloatEstimateLength
}
return length
}
13 changes: 6 additions & 7 deletions pkg/util/typeutil/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log"
)

const DynamicFieldMaxLength = 512

type getVariableFieldLengthPolicy int

const (
Expand Down Expand Up @@ -75,16 +73,17 @@ func getVarFieldLength(fieldSchema *schemapb.FieldSchema, policy getVariableFiel
// TODO this is a hack and may not accurate, we should rely on estimate size per record
// However we should report size and datacoord calculate based on size
// https://github.com/milvus-io/milvus/issues/17687
if maxLength > 256 {
return 256, nil
estimateLimit := GetVarCharEstimateLength()
if maxLength > estimateLimit {
return estimateLimit, nil
}
return maxLength, nil
default:
return 0, fmt.Errorf("unrecognized getVariableFieldLengthPolicy %v", policy)
}
// geometry field max length now consider the same as json field, which is 512 bytes
// geometry field max length now consider the same as json field, which is 512 bytes
case schemapb.DataType_Array, schemapb.DataType_JSON, schemapb.DataType_Geometry:
return DynamicFieldMaxLength, nil
return GetDynamicFieldEstimateLength(), nil
default:
return 0, fmt.Errorf("field %s is not a variable-length type", fieldSchema.DataType.String())
}
Expand Down Expand Up @@ -160,7 +159,7 @@ func estimateSizeBy(schema *schemapb.CollectionSchema, policy getVariableFieldLe
// varies depending on the number of non-zeros. Using sparse vector
// generated by SPLADE as reference and returning size of a sparse
// vector with 150 non-zeros.
res += 1200
res += GetSparseFloatVectorEstimateLength()
}
}
return res, nil
Expand Down
72 changes: 71 additions & 1 deletion pkg/util/typeutil/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,81 @@ func TestSchema(t *testing.T) {
}

t.Run("EstimateSizePerRecord", func(t *testing.T) {
limit := GetDynamicFieldEstimateLength()
size, err := EstimateSizePerRecord(schema)
assert.Equal(t, 680+DynamicFieldMaxLength*4, size)
assert.Equal(t, 680+limit*4, size)
assert.NoError(t, err)
})

t.Run("VarCharEstimateLengthLimit", func(t *testing.T) {
originalLimit := GetVarCharEstimateLength()
t.Cleanup(func() { SetVarCharEstimateLength(originalLimit) })

field := &schemapb.FieldSchema{
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.MaxLengthKey,
Value: "1024",
},
},
}

SetVarCharEstimateLength(128)
length, err := getVarFieldLength(field, custom)
assert.NoError(t, err)
assert.Equal(t, 128, length)

SetVarCharEstimateLength(4096)
length, err = getVarFieldLength(field, custom)
assert.NoError(t, err)
assert.Equal(t, 1024, length)
})

t.Run("DynamicFieldMaxLengthLimit", func(t *testing.T) {
originalLimit := GetDynamicFieldEstimateLength()
t.Cleanup(func() { SetDynamicFieldEstimateLength(originalLimit) })

field := &schemapb.FieldSchema{
DataType: schemapb.DataType_JSON,
}

SetDynamicFieldEstimateLength(2048)
length, err := getVarFieldLength(field, custom)
assert.NoError(t, err)
assert.Equal(t, 2048, length)

SetDynamicFieldEstimateLength(128)
length, err = getVarFieldLength(field, custom)
assert.NoError(t, err)
assert.Equal(t, 128, length)
})

t.Run("SparseFloatVectorEstimateSize", func(t *testing.T) {
original := GetSparseFloatVectorEstimateLength()
t.Cleanup(func() { SetSparseFloatVectorEstimateLength(original) })

schemaWithSparse := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 1200,
Name: "sparse",
DataType: schemapb.DataType_SparseFloatVector,
},
},
}

SetSparseFloatVectorEstimateLength(2048)
size, err := EstimateSizePerRecord(schemaWithSparse)
assert.NoError(t, err)
assert.Equal(t, 2048, size)

SetSparseFloatVectorEstimateLength(64)
size, err = EstimateSizePerRecord(schemaWithSparse)
assert.NoError(t, err)
assert.Equal(t, 64, size)
})

t.Run("SchemaHelper", func(t *testing.T) {
_, err := CreateSchemaHelper(nil)
assert.Error(t, err)
Expand Down
Loading