Skip to content

Commit

Permalink
enhance: support dataType: array & json (#30076)
Browse files Browse the repository at this point in the history
issue: #30075

deal with the array<?> field data correctly

Signed-off-by: PowderLi <[email protected]>
  • Loading branch information
PowderLi authored Jan 21, 2024
1 parent f69f65f commit 4f44942
Show file tree
Hide file tree
Showing 3 changed files with 321 additions and 24 deletions.
5 changes: 2 additions & 3 deletions internal/distributed/proxy/httpserver/handler_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ func TestInsertForDataType(t *testing.T) {
"[success]kinds of data type": newCollectionSchema(generateCollectionSchema(schemapb.DataType_Int64, false)),
"[success]use binary vector": newCollectionSchema(generateCollectionSchema(schemapb.DataType_Int64, true)),
"[success]with dynamic field": withDynamicField(newCollectionSchema(generateCollectionSchema(schemapb.DataType_Int64, false))),
"[success]with array fields": withArrayField(newCollectionSchema(generateCollectionSchema(schemapb.DataType_Int64, false))),
}
for name, schema := range schemas {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -795,9 +796,7 @@ func TestInsertForDataType(t *testing.T) {
assert.Equal(t, "{\"code\":200,\"data\":{\"insertCount\":3,\"insertIds\":[1,2,3]}}", w.Body.String())
})
}
schemas = map[string]*schemapb.CollectionSchema{
"with unsupport field type": withUnsupportField(newCollectionSchema(generateCollectionSchema(schemapb.DataType_Int64, false))),
}
schemas = map[string]*schemapb.CollectionSchema{}
for name, schema := range schemas {
t.Run(name, func(t *testing.T) {
mp := mocks.NewMockProxy(t)
Expand Down
129 changes: 125 additions & 4 deletions internal/distributed/proxy/httpserver/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,113 @@ func checkAndSetData(body string, collSchema *schemapb.CollectionSchema) (error,
return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray
}
reallyData[fieldName] = result
case schemapb.DataType_Array:
switch field.ElementType {
case schemapb.DataType_Bool:
arr := make([]bool, 0)
err := json.Unmarshal([]byte(dataString), &arr)
if err != nil {
return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray
}
reallyData[fieldName] = &schemapb.ScalarField{
Data: &schemapb.ScalarField_BoolData{
BoolData: &schemapb.BoolArray{
Data: arr,
},
},
}
case schemapb.DataType_Int8:
arr := make([]int32, 0)
err := json.Unmarshal([]byte(dataString), &arr)
if err != nil {
return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray
}
reallyData[fieldName] = &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: arr,
},
},
}
case schemapb.DataType_Int16:
arr := make([]int32, 0)
err := json.Unmarshal([]byte(dataString), &arr)
if err != nil {
return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray
}
reallyData[fieldName] = &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: arr,
},
},
}
case schemapb.DataType_Int32:
arr := make([]int32, 0)
err := json.Unmarshal([]byte(dataString), &arr)
if err != nil {
return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray
}
reallyData[fieldName] = &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: arr,
},
},
}
case schemapb.DataType_Int64:
arr := make([]int64, 0)
err := json.Unmarshal([]byte(dataString), &arr)
if err != nil {
return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray
}
reallyData[fieldName] = &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: arr,
},
},
}
case schemapb.DataType_Float:
arr := make([]float32, 0)
err := json.Unmarshal([]byte(dataString), &arr)
if err != nil {
return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray
}
reallyData[fieldName] = &schemapb.ScalarField{
Data: &schemapb.ScalarField_FloatData{
FloatData: &schemapb.FloatArray{
Data: arr,
},
},
}
case schemapb.DataType_Double:
arr := make([]float64, 0)
err := json.Unmarshal([]byte(dataString), &arr)
if err != nil {
return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray
}
reallyData[fieldName] = &schemapb.ScalarField{
Data: &schemapb.ScalarField_DoubleData{
DoubleData: &schemapb.DoubleArray{
Data: arr,
},
},
}
case schemapb.DataType_VarChar:
arr := make([]string, 0)
err := json.Unmarshal([]byte(dataString), &arr)
if err != nil {
return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray
}
reallyData[fieldName] = &schemapb.ScalarField{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{
Data: arr,
},
},
}
}
case schemapb.DataType_JSON:
reallyData[fieldName] = []byte(dataString)
case schemapb.DataType_Float:
Expand Down Expand Up @@ -429,6 +536,8 @@ func anyToColumns(rows []map[string]interface{}, sch *schemapb.CollectionSchema)
data = make([]string, 0, rowsLen)
case schemapb.DataType_VarChar:
data = make([]string, 0, rowsLen)
case schemapb.DataType_Array:
data = make([]*schemapb.ScalarField, 0, rowsLen)
case schemapb.DataType_JSON:
data = make([][]byte, 0, rowsLen)
case schemapb.DataType_FloatVector:
Expand Down Expand Up @@ -491,6 +600,8 @@ func anyToColumns(rows []map[string]interface{}, sch *schemapb.CollectionSchema)
nameColumns[field.Name] = append(nameColumns[field.Name].([]string), candi.v.Interface().(string))
case schemapb.DataType_VarChar:
nameColumns[field.Name] = append(nameColumns[field.Name].([]string), candi.v.Interface().(string))
case schemapb.DataType_Array:
nameColumns[field.Name] = append(nameColumns[field.Name].([]*schemapb.ScalarField), candi.v.Interface().(*schemapb.ScalarField))
case schemapb.DataType_JSON:
nameColumns[field.Name] = append(nameColumns[field.Name].([][]byte), candi.v.Interface().([]byte))
case schemapb.DataType_FloatVector:
Expand Down Expand Up @@ -610,11 +721,21 @@ func anyToColumns(rows []map[string]interface{}, sch *schemapb.CollectionSchema)
},
},
}
case schemapb.DataType_Array:
colData.Field = &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_ArrayData{
ArrayData: &schemapb.ArrayArray{
Data: column.([]*schemapb.ScalarField),
},
},
},
}
case schemapb.DataType_JSON:
colData.Field = &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_BytesData{
BytesData: &schemapb.BytesArray{
Data: &schemapb.ScalarField_JsonData{
JsonData: &schemapb.JSONArray{
Data: column.([][]byte),
},
},
Expand Down Expand Up @@ -747,10 +868,10 @@ func buildQueryResp(rowsNum int64, needFields []string, fieldDataList []*schemap
rowsNum = int64(len(fieldDataList[0].GetScalars().GetStringData().Data))
case schemapb.DataType_VarChar:
rowsNum = int64(len(fieldDataList[0].GetScalars().GetStringData().Data))
case schemapb.DataType_JSON:
rowsNum = int64(len(fieldDataList[0].GetScalars().GetJsonData().Data))
case schemapb.DataType_Array:
rowsNum = int64(len(fieldDataList[0].GetScalars().GetArrayData().Data))
case schemapb.DataType_JSON:
rowsNum = int64(len(fieldDataList[0].GetScalars().GetJsonData().Data))
case schemapb.DataType_BinaryVector:
rowsNum = int64(len(fieldDataList[0].GetVectors().GetBinaryVector())*8) / fieldDataList[0].GetVectors().GetDim()
case schemapb.DataType_FloatVector:
Expand Down
Loading

0 comments on commit 4f44942

Please sign in to comment.