Skip to content

Commit 43e8106

Browse files
committed
Add nullable vector support for proxy and querynode layer
This commit extends nullable vector support to the proxy layer, querynode, and adds comprehensive validation, search reduce, and field data handling for nullable vectors with sparse storage. Proxy layer changes: - Update validate_util.go checkAligned() with getExpectedVectorRows() helper to validate nullable vector field alignment using valid data count - Update checkFloatVectorFieldData/checkSparseFloatVectorFieldData for nullable vector validation with proper row count expectations - Add FieldDataIdxComputer in typeutil/schema.go for logical-to-physical index translation during search reduce operations - Update search_reduce_util.go reduceSearchResultData to use idxComputers for correct field data indexing with nullable vectors - Update task.go, task_query.go, task_upsert.go for nullable vector handling - Update msg_pack.go with nullable vector field data processing QueryNode layer changes: - Update segments/result.go for nullable vector result handling - Update segments/search_reduce.go with nullable vector offset translation Storage and index changes: - Update data_codec.go and utils.go for nullable vector serialization - Update indexcgowrapper/dataset.go and index.go for nullable vector indexing Utility changes: - Add FieldDataIdxComputer struct with Compute() method for efficient logical-to-physical index mapping across multiple field data - Update EstimateEntitySize() and AppendFieldData() with fieldIdxs parameter - Update funcutil.go with nullable vector support functions Signed-off-by: marcelo-cjl <[email protected]>
1 parent d9ea4a0 commit 43e8106

File tree

27 files changed

+3756
-243
lines changed

27 files changed

+3756
-243
lines changed

client/column/columns.go

Lines changed: 190 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type Column interface {
4646
SetNullable(bool)
4747
ValidateNullable() error
4848
CompactNullableValues()
49+
ValidCount() int
4950
}
5051

5152
var errFieldDataTypeNotMatch = errors.New("FieldData type not matched")
@@ -239,10 +240,39 @@ func FieldDataColumn(fd *schemapb.FieldData, begin, end int) (Column, error) {
239240
}
240241
data := x.FloatVector.GetData()
241242
dim := int(vectors.GetDim())
243+
244+
if len(validData) > 0 {
245+
if end < 0 {
246+
end = len(validData)
247+
}
248+
vector := make([][]float32, 0, end-begin)
249+
dataIdx := 0
250+
for i := 0; i < begin; i++ {
251+
if validData[i] {
252+
dataIdx++
253+
}
254+
}
255+
for i := begin; i < end; i++ {
256+
if validData[i] {
257+
v := make([]float32, dim)
258+
copy(v, data[dataIdx*dim:(dataIdx+1)*dim])
259+
vector = append(vector, v)
260+
dataIdx++
261+
} else {
262+
vector = append(vector, nil)
263+
}
264+
}
265+
col := NewColumnFloatVector(fd.GetFieldName(), dim, vector)
266+
col.withValidData(validData[begin:end])
267+
col.nullable = true
268+
col.sparseMode = true
269+
return col, nil
270+
}
271+
242272
if end < 0 {
243273
end = len(data) / dim
244274
}
245-
vector := make([][]float32, 0, end-begin) // shall not have remanunt
275+
vector := make([][]float32, 0, end-begin)
246276
for i := begin; i < end; i++ {
247277
v := make([]float32, dim)
248278
copy(v, data[i*dim:(i+1)*dim])
@@ -262,6 +292,35 @@ func FieldDataColumn(fd *schemapb.FieldData, begin, end int) (Column, error) {
262292
}
263293
dim := int(vectors.GetDim())
264294
blen := dim / 8
295+
296+
if len(validData) > 0 {
297+
if end < 0 {
298+
end = len(validData)
299+
}
300+
vector := make([][]byte, 0, end-begin)
301+
dataIdx := 0
302+
for i := 0; i < begin; i++ {
303+
if validData[i] {
304+
dataIdx++
305+
}
306+
}
307+
for i := begin; i < end; i++ {
308+
if validData[i] {
309+
v := make([]byte, blen)
310+
copy(v, data[dataIdx*blen:(dataIdx+1)*blen])
311+
vector = append(vector, v)
312+
dataIdx++
313+
} else {
314+
vector = append(vector, nil)
315+
}
316+
}
317+
col := NewColumnBinaryVector(fd.GetFieldName(), dim, vector)
318+
col.withValidData(validData[begin:end])
319+
col.nullable = true
320+
col.sparseMode = true
321+
return col, nil
322+
}
323+
265324
if end < 0 {
266325
end = len(data) / blen
267326
}
@@ -281,13 +340,43 @@ func FieldDataColumn(fd *schemapb.FieldData, begin, end int) (Column, error) {
281340
}
282341
data := x.Float16Vector
283342
dim := int(vectors.GetDim())
343+
bytePerRow := dim * 2
344+
345+
if len(validData) > 0 {
346+
if end < 0 {
347+
end = len(validData)
348+
}
349+
vector := make([][]byte, 0, end-begin)
350+
dataIdx := 0
351+
for i := 0; i < begin; i++ {
352+
if validData[i] {
353+
dataIdx++
354+
}
355+
}
356+
for i := begin; i < end; i++ {
357+
if validData[i] {
358+
v := make([]byte, bytePerRow)
359+
copy(v, data[dataIdx*bytePerRow:(dataIdx+1)*bytePerRow])
360+
vector = append(vector, v)
361+
dataIdx++
362+
} else {
363+
vector = append(vector, nil)
364+
}
365+
}
366+
col := NewColumnFloat16Vector(fd.GetFieldName(), dim, vector)
367+
col.withValidData(validData[begin:end])
368+
col.nullable = true
369+
col.sparseMode = true
370+
return col, nil
371+
}
372+
284373
if end < 0 {
285-
end = len(data) / dim / 2
374+
end = len(data) / bytePerRow
286375
}
287376
vector := make([][]byte, 0, end-begin)
288377
for i := begin; i < end; i++ {
289-
v := make([]byte, dim*2)
290-
copy(v, data[i*dim*2:(i+1)*dim*2])
378+
v := make([]byte, bytePerRow)
379+
copy(v, data[i*bytePerRow:(i+1)*bytePerRow])
291380
vector = append(vector, v)
292381
}
293382
return NewColumnFloat16Vector(fd.GetFieldName(), dim, vector), nil
@@ -300,13 +389,43 @@ func FieldDataColumn(fd *schemapb.FieldData, begin, end int) (Column, error) {
300389
}
301390
data := x.Bfloat16Vector
302391
dim := int(vectors.GetDim())
392+
bytePerRow := dim * 2
393+
394+
if len(validData) > 0 {
395+
if end < 0 {
396+
end = len(validData)
397+
}
398+
vector := make([][]byte, 0, end-begin)
399+
dataIdx := 0
400+
for i := 0; i < begin; i++ {
401+
if validData[i] {
402+
dataIdx++
403+
}
404+
}
405+
for i := begin; i < end; i++ {
406+
if validData[i] {
407+
v := make([]byte, bytePerRow)
408+
copy(v, data[dataIdx*bytePerRow:(dataIdx+1)*bytePerRow])
409+
vector = append(vector, v)
410+
dataIdx++
411+
} else {
412+
vector = append(vector, nil)
413+
}
414+
}
415+
col := NewColumnBFloat16Vector(fd.GetFieldName(), dim, vector)
416+
col.withValidData(validData[begin:end])
417+
col.nullable = true
418+
col.sparseMode = true
419+
return col, nil
420+
}
421+
303422
if end < 0 {
304-
end = len(data) / dim / 2
423+
end = len(data) / bytePerRow
305424
}
306-
vector := make([][]byte, 0, end-begin) // shall not have remanunt
425+
vector := make([][]byte, 0, end-begin)
307426
for i := begin; i < end; i++ {
308-
v := make([]byte, dim*2)
309-
copy(v, data[i*dim*2:(i+1)*dim*2])
427+
v := make([]byte, bytePerRow)
428+
copy(v, data[i*bytePerRow:(i+1)*bytePerRow])
310429
vector = append(vector, v)
311430
}
312431
return NewColumnBFloat16Vector(fd.GetFieldName(), dim, vector), nil
@@ -317,6 +436,37 @@ func FieldDataColumn(fd *schemapb.FieldData, begin, end int) (Column, error) {
317436
return nil, errFieldDataTypeNotMatch
318437
}
319438
data := sparseVectors.Contents
439+
440+
if len(validData) > 0 {
441+
if end < 0 {
442+
end = len(validData)
443+
}
444+
vectors := make([]entity.SparseEmbedding, 0, end-begin)
445+
dataIdx := 0
446+
for i := 0; i < begin; i++ {
447+
if validData[i] {
448+
dataIdx++
449+
}
450+
}
451+
for i := begin; i < end; i++ {
452+
if validData[i] {
453+
vector, err := entity.DeserializeSliceSparseEmbedding(data[dataIdx])
454+
if err != nil {
455+
return nil, err
456+
}
457+
vectors = append(vectors, vector)
458+
dataIdx++
459+
} else {
460+
vectors = append(vectors, nil)
461+
}
462+
}
463+
col := NewColumnSparseVectors(fd.GetFieldName(), vectors)
464+
col.withValidData(validData[begin:end])
465+
col.nullable = true
466+
col.sparseMode = true
467+
return col, nil
468+
}
469+
320470
if end < 0 {
321471
end = len(data)
322472
}
@@ -339,11 +489,41 @@ func FieldDataColumn(fd *schemapb.FieldData, begin, end int) (Column, error) {
339489
}
340490
data := x.Int8Vector
341491
dim := int(vectors.GetDim())
492+
493+
if len(validData) > 0 {
494+
if end < 0 {
495+
end = len(validData)
496+
}
497+
vector := make([][]int8, 0, end-begin)
498+
dataIdx := 0
499+
for i := 0; i < begin; i++ {
500+
if validData[i] {
501+
dataIdx++
502+
}
503+
}
504+
for i := begin; i < end; i++ {
505+
if validData[i] {
506+
v := make([]int8, dim)
507+
for j := 0; j < dim; j++ {
508+
v[j] = int8(data[dataIdx*dim+j])
509+
}
510+
vector = append(vector, v)
511+
dataIdx++
512+
} else {
513+
vector = append(vector, nil)
514+
}
515+
}
516+
col := NewColumnInt8Vector(fd.GetFieldName(), dim, vector)
517+
col.withValidData(validData[begin:end])
518+
col.nullable = true
519+
col.sparseMode = true
520+
return col, nil
521+
}
522+
342523
if end < 0 {
343524
end = len(data) / dim
344525
}
345-
vector := make([][]int8, 0, end-begin) // shall not have remanunt
346-
// TODO caiyd: has better way to convert []byte to []int8 ?
526+
vector := make([][]int8, 0, end-begin)
347527
for i := begin; i < end; i++ {
348528
v := make([]int8, dim)
349529
for j := 0; j < dim; j++ {

client/column/generic_base.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,19 @@ func (c *genericColumnBase[T]) CompactNullableValues() {
301301
c.values = c.values[0:cnt]
302302
}
303303

304+
func (c *genericColumnBase[T]) ValidCount() int {
305+
if !c.nullable || len(c.validData) == 0 {
306+
return len(c.values)
307+
}
308+
count := 0
309+
for _, v := range c.validData {
310+
if v {
311+
count++
312+
}
313+
}
314+
return count
315+
}
316+
304317
func (c *genericColumnBase[T]) withValidData(validData []bool) {
305318
if len(validData) > 0 {
306319
c.nullable = true

client/column/nullable.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616

1717
package column
1818

19+
import (
20+
"github.com/cockroachdb/errors"
21+
22+
"github.com/milvus-io/milvus/client/v2/entity"
23+
)
24+
1925
var (
2026
// scalars
2127
NewNullableColumnBool NullableColumnCreateFunc[bool, *ColumnBool] = NewNullableColumnCreator(NewColumnBool).New
@@ -41,6 +47,76 @@ var (
4147
NewNullableColumnDoubleArray NullableColumnCreateFunc[[]float64, *ColumnDoubleArray] = NewNullableColumnCreator(NewColumnDoubleArray).New
4248
)
4349

50+
func NewNullableColumnFloatVector(fieldName string, dim int, values [][]float32, validData []bool) (*ColumnFloatVector, error) {
51+
if len(values) != getValidCount(validData) {
52+
return nil, errors.Newf("values length (%d) must equal valid count (%d) in validData", len(values), getValidCount(validData))
53+
}
54+
col := NewColumnFloatVector(fieldName, dim, values)
55+
col.withValidData(validData)
56+
col.nullable = true
57+
return col, nil
58+
}
59+
60+
func NewNullableColumnBinaryVector(fieldName string, dim int, values [][]byte, validData []bool) (*ColumnBinaryVector, error) {
61+
if len(values) != getValidCount(validData) {
62+
return nil, errors.Newf("values length (%d) must equal valid count (%d) in validData", len(values), getValidCount(validData))
63+
}
64+
col := NewColumnBinaryVector(fieldName, dim, values)
65+
col.withValidData(validData)
66+
col.nullable = true
67+
return col, nil
68+
}
69+
70+
func NewNullableColumnFloat16Vector(fieldName string, dim int, values [][]byte, validData []bool) (*ColumnFloat16Vector, error) {
71+
if len(values) != getValidCount(validData) {
72+
return nil, errors.Newf("values length (%d) must equal valid count (%d) in validData", len(values), getValidCount(validData))
73+
}
74+
col := NewColumnFloat16Vector(fieldName, dim, values)
75+
col.withValidData(validData)
76+
col.nullable = true
77+
return col, nil
78+
}
79+
80+
func NewNullableColumnBFloat16Vector(fieldName string, dim int, values [][]byte, validData []bool) (*ColumnBFloat16Vector, error) {
81+
if len(values) != getValidCount(validData) {
82+
return nil, errors.Newf("values length (%d) must equal valid count (%d) in validData", len(values), getValidCount(validData))
83+
}
84+
col := NewColumnBFloat16Vector(fieldName, dim, values)
85+
col.withValidData(validData)
86+
col.nullable = true
87+
return col, nil
88+
}
89+
90+
func NewNullableColumnInt8Vector(fieldName string, dim int, values [][]int8, validData []bool) (*ColumnInt8Vector, error) {
91+
if len(values) != getValidCount(validData) {
92+
return nil, errors.Newf("values length (%d) must equal valid count (%d) in validData", len(values), getValidCount(validData))
93+
}
94+
col := NewColumnInt8Vector(fieldName, dim, values)
95+
col.withValidData(validData)
96+
col.nullable = true
97+
return col, nil
98+
}
99+
100+
func NewNullableColumnSparseFloatVector(fieldName string, values []entity.SparseEmbedding, validData []bool) (*ColumnSparseFloatVector, error) {
101+
if len(values) != getValidCount(validData) {
102+
return nil, errors.Newf("values length (%d) must equal valid count (%d) in validData", len(values), getValidCount(validData))
103+
}
104+
col := NewColumnSparseVectors(fieldName, values)
105+
col.withValidData(validData)
106+
col.nullable = true
107+
return col, nil
108+
}
109+
110+
func getValidCount(validData []bool) int {
111+
count := 0
112+
for _, v := range validData {
113+
if v {
114+
count++
115+
}
116+
}
117+
return count
118+
}
119+
44120
type NullableColumnCreateFunc[T any, Col interface {
45121
Column
46122
Data() []T

client/column/sparse.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,15 @@ func NewColumnSparseVectors(name string, values []entity.SparseEmbedding) *Colum
3838

3939
func (c *ColumnSparseFloatVector) FieldData() *schemapb.FieldData {
4040
fd := c.vectorBase.FieldData()
41-
max := lo.MaxBy(c.values, func(a, b entity.SparseEmbedding) bool {
42-
return a.Dim() > b.Dim()
43-
})
4441
vectors := fd.GetVectors()
45-
vectors.Dim = int64(max.Dim())
42+
if len(c.values) > 0 {
43+
max := lo.MaxBy(c.values, func(a, b entity.SparseEmbedding) bool {
44+
return a.Dim() > b.Dim()
45+
})
46+
vectors.Dim = int64(max.Dim())
47+
} else {
48+
vectors.Dim = 0
49+
}
4650
return fd
4751
}
4852

client/column/struct.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,7 @@ func (c *columnStructArray) CompactNullableValues() {
136136
field.CompactNullableValues()
137137
}
138138
}
139+
140+
func (c *columnStructArray) ValidCount() int {
141+
return c.Len()
142+
}

0 commit comments

Comments
 (0)