Skip to content

Commit

Permalink
nested fields encoding #4 (#729)
Browse files Browse the repository at this point in the history
This PR:
- changes signature of `BuildIngestSQLStatements` to pass `*table`
consequently
- moves body of `GenerateSqlStatements` to `processInsertQuery` to
assemble whole workflow in one place and open a way for new arrangement.
  • Loading branch information
pdelewski authored Sep 9, 2024
1 parent 5659da8 commit 1038e10
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 55 deletions.
12 changes: 7 additions & 5 deletions quesma/clickhouse/alter_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,19 @@ func TestAlterTable(t *testing.T) {
"{\"attributes_values\":{},\"attributes_metadata\":{},\"Test1\":1,\"Test2\":2}",
}
alters := []string{
"ALTER TABLE \"\" ADD COLUMN IF NOT EXISTS \"Test1\" Nullable(Int64)",
"ALTER TABLE \"\" ADD COLUMN IF NOT EXISTS \"Test2\" Nullable(Int64)",
"ALTER TABLE \"tableName\" ADD COLUMN IF NOT EXISTS \"Test1\" Nullable(Int64)",
"ALTER TABLE \"tableName\" ADD COLUMN IF NOT EXISTS \"Test2\" Nullable(Int64)",
}
columns := []string{"Test1", "Test2"}
table := &Table{
Name: "tableName",
Cols: map[string]*Column{},
}
fieldsMap := concurrent.NewMapWith("tableName", table)

lm := NewLogManager(fieldsMap, &config.QuesmaConfiguration{})
for i := range rowsToInsert {
insert, alter, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowsToInsert[i]), nil, chConfig)
insert, alter, err := lm.BuildIngestSQLStatements(table, types.MustJSON(rowsToInsert[i]), nil, chConfig)
assert.Equal(t, expectedInsert[i], insert)
assert.Equal(t, alters[i], alter[0])
// Table will grow with each iteration
Expand Down Expand Up @@ -93,10 +94,11 @@ func TestAlterTableHeuristic(t *testing.T) {
{1000, 1000, 1},
}
for _, tc := range testcases {
const tableName = "tableName"
table := &Table{
Name: tableName,
Cols: map[string]*Column{},
}
const tableName = "tableName"
fieldsMap := concurrent.NewMapWith(tableName, table)
lm := NewLogManager(fieldsMap, &config.QuesmaConfiguration{})

Expand All @@ -119,7 +121,7 @@ func TestAlterTableHeuristic(t *testing.T) {

assert.Equal(t, int64(0), lm.ingestCounter)
for i := range rowsToInsert {
_, _, err := lm.BuildIngestSQLStatements(tableName, types.MustJSON(rowsToInsert[i]), nil, chConfig)
_, _, err := lm.BuildIngestSQLStatements(table, types.MustJSON(rowsToInsert[i]), nil, chConfig)
assert.NoError(t, err)
}
assert.Equal(t, tc.expected, len(table.Cols))
Expand Down
89 changes: 40 additions & 49 deletions quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,18 +602,20 @@ func (lm *LogManager) shouldAlterColumns(table *Table, attrsMap map[string][]int
return false, nil
}

func (lm *LogManager) BuildIngestSQLStatements(tableName string, data types.JSON, inValidJson types.JSON,
config *ChTableConfig) (string, []string, error) {
func (lm *LogManager) BuildIngestSQLStatements(table *Table,
data types.JSON,
inValidJson types.JSON,
config *ChTableConfig,
) (string, []string, error) {

jsonData, err := json.Marshal(data)
jsonAsBytesSlice, err := json.Marshal(data)

if err != nil {
return "", nil, err
}
jsonDataAsString := string(jsonData)

// we find all non-schema fields
jsonMap, err := types.ParseJSON(jsonDataAsString)
jsonMap, err := types.ParseJSON(string(jsonAsBytesSlice))
if err != nil {
return "", nil, err
}
Expand All @@ -624,7 +626,7 @@ func (lm *LogManager) BuildIngestSQLStatements(tableName string, data types.JSON
// if we don't have any attributes, and it wasn't replaced,
// we don't need to modify the json
if !wasReplaced {
return jsonDataAsString, nil, nil
return string(jsonAsBytesSlice), nil, nil
}
rawBytes, err := jsonMap.Bytes()
if err != nil {
Expand All @@ -633,7 +635,6 @@ func (lm *LogManager) BuildIngestSQLStatements(tableName string, data types.JSON
return string(rawBytes), nil, nil
}

table := lm.FindTable(tableName)
schemaFieldsJson, err := json.Marshal(jsonMap)

if err != nil {
Expand All @@ -642,8 +643,8 @@ func (lm *LogManager) BuildIngestSQLStatements(tableName string, data types.JSON

mDiff := DifferenceMap(jsonMap, table) // TODO change to DifferenceMap(m, t)

if len(mDiff) == 0 && string(schemaFieldsJson) == jsonDataAsString && len(inValidJson) == 0 { // no need to modify, just insert 'js'
return jsonDataAsString, nil, nil
if len(mDiff) == 0 && string(schemaFieldsJson) == string(jsonAsBytesSlice) && len(inValidJson) == 0 { // no need to modify, just insert 'js'
return string(jsonAsBytesSlice), nil, nil
}

// check attributes precondition
Expand Down Expand Up @@ -675,7 +676,6 @@ func (lm *LogManager) BuildIngestSQLStatements(tableName string, data types.JSON
}

onlySchemaFields := RemoveNonSchemaFields(jsonMap, table)

schemaFieldsJson, err = json.Marshal(onlySchemaFields)

if err != nil {
Expand Down Expand Up @@ -725,6 +725,8 @@ func (lm *LogManager) processInsertQuery(ctx context.Context,
logger.ErrorWithCtx(ctx).Msgf("error ProcessInsertQuery, can't create table: %v", err)
return nil, err
}
// Set pointer to table after creating it
table = lm.FindTable(tableName)
} else if !table.Created {
err := lm.execute(ctx, table.createTableString())
if err != nil {
Expand All @@ -734,10 +736,34 @@ func (lm *LogManager) processInsertQuery(ctx context.Context,
} else {
config = table.Config
}
// TODO this is doing nested field encoding
// ----------------------
return lm.GenerateSqlStatements(ctx, tableName, jsonData, config, transformer)
// ----------------------
var jsonsReadyForInsertion []string
var alterCmd []string
var preprocessedJsons []types.JSON
var invalidJsons []types.JSON
preprocessedJsons, invalidJsons, err := lm.preprocessJsons(ctx, table.Name, jsonData, transformer)
if err != nil {
return nil, fmt.Errorf("error preprocessJsons: %v", err)
}
for i, preprocessedJson := range preprocessedJsons {
// TODO this is doing nested field encoding
// ----------------------
insertJson, alter, err := lm.BuildIngestSQLStatements(table, preprocessedJson,
invalidJsons[i], config)
// ----------------------
alterCmd = append(alterCmd, alter...)
if err != nil {
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
}
jsonsReadyForInsertion = append(jsonsReadyForInsertion, insertJson)
}

insertValues := strings.Join(jsonsReadyForInsertion, ", ")
insert := fmt.Sprintf("INSERT INTO \"%s\" FORMAT JSONEachRow %s", table.Name, insertValues)

var statements []string
statements = append(statements, alterCmd...)
statements = append(statements, insert)
return statements, nil
}

func (lm *LogManager) ProcessInsertQuery(ctx context.Context, tableName string,
Expand Down Expand Up @@ -814,41 +840,6 @@ func (lm *LogManager) preprocessJsons(ctx context.Context,
return preprocessedJsons, invalidJsons, nil
}

func (lm *LogManager) GenerateSqlStatements(ctx context.Context,
tableName string, jsons []types.JSON,
config *ChTableConfig, transformer jsonprocessor.IngestTransformer,
) ([]string, error) {

var jsonsReadyForInsertion []string
var alterCmd []string
var preprocessedJsons []types.JSON
var invalidJsons []types.JSON
preprocessedJsons, invalidJsons, err := lm.preprocessJsons(ctx, tableName, jsons, transformer)
if err != nil {
return nil, fmt.Errorf("error preprocessJsons: %v", err)
}
for i, preprocessedJson := range preprocessedJsons {
// TODO this is doing nested field encoding
// ----------------------
insertJson, alter, err := lm.BuildIngestSQLStatements(tableName, preprocessedJson,
invalidJsons[i], config)
// ----------------------
alterCmd = append(alterCmd, alter...)
if err != nil {
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", tableName, PrettyJson(insertJson), err)
}
jsonsReadyForInsertion = append(jsonsReadyForInsertion, insertJson)
}

insertValues := strings.Join(jsonsReadyForInsertion, ", ")
insert := fmt.Sprintf("INSERT INTO \"%s\" FORMAT JSONEachRow %s", tableName, insertValues)

var statements []string
statements = append(statements, alterCmd...)
statements = append(statements, insert)
return statements, nil
}

func (lm *LogManager) FindTable(tableName string) (result *Table) {
tableNamePattern := index.TableNamePatternRegexp(tableName)
lm.tableDiscovery.TableDefinitions().
Expand Down
4 changes: 3 additions & 1 deletion quesma/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ func TestInsertNonSchemaFieldsToOthers_1(t *testing.T) {
},
})

tableName, exists := fieldsMap.Load("tableName")
assert.True(t, exists)
f := func(t1, t2 TableMap) {
lm := NewLogManager(fieldsMap, &config.QuesmaConfiguration{})
j, alter, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowToInsert), nil, hasOthersConfig)
j, alter, err := lm.BuildIngestSQLStatements(tableName, types.MustJSON(rowToInsert), nil, hasOthersConfig)
assert.NoError(t, err)
assert.Equal(t, 0, len(alter))
m := make(SchemaMap)
Expand Down

0 comments on commit 1038e10

Please sign in to comment.