From 1038e10915f68345eb100eb3832c52d9614ec9a9 Mon Sep 17 00:00:00 2001 From: Przemyslaw Delewski <102958445+pdelewski@users.noreply.github.com> Date: Mon, 9 Sep 2024 09:36:14 +0200 Subject: [PATCH] nested fields encoding #4 (#729) This PR: - changes signature of `BuildIngestSQLStatements` to pass `*table` consequently - moves body of `GenerateSqlStatements` to `processInsertQuery` to assemble whole workflow in one place and open a way for new arrangement. --- quesma/clickhouse/alter_table_test.go | 12 ++-- quesma/clickhouse/clickhouse.go | 89 ++++++++++++--------------- quesma/clickhouse/clickhouse_test.go | 4 +- 3 files changed, 50 insertions(+), 55 deletions(-) diff --git a/quesma/clickhouse/alter_table_test.go b/quesma/clickhouse/alter_table_test.go index 73d01d52c..5ea52f563 100644 --- a/quesma/clickhouse/alter_table_test.go +++ b/quesma/clickhouse/alter_table_test.go @@ -35,18 +35,19 @@ func TestAlterTable(t *testing.T) { "{\"attributes_values\":{},\"attributes_metadata\":{},\"Test1\":1,\"Test2\":2}", } alters := []string{ - "ALTER TABLE \"\" ADD COLUMN IF NOT EXISTS \"Test1\" Nullable(Int64)", - "ALTER TABLE \"\" ADD COLUMN IF NOT EXISTS \"Test2\" Nullable(Int64)", + "ALTER TABLE \"tableName\" ADD COLUMN IF NOT EXISTS \"Test1\" Nullable(Int64)", + "ALTER TABLE \"tableName\" ADD COLUMN IF NOT EXISTS \"Test2\" Nullable(Int64)", } columns := []string{"Test1", "Test2"} table := &Table{ + Name: "tableName", Cols: map[string]*Column{}, } fieldsMap := concurrent.NewMapWith("tableName", table) lm := NewLogManager(fieldsMap, &config.QuesmaConfiguration{}) for i := range rowsToInsert { - insert, alter, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowsToInsert[i]), nil, chConfig) + insert, alter, err := lm.BuildIngestSQLStatements(table, types.MustJSON(rowsToInsert[i]), nil, chConfig) assert.Equal(t, expectedInsert[i], insert) assert.Equal(t, alters[i], alter[0]) // Table will grow with each iteration @@ -93,10 +94,11 @@ func TestAlterTableHeuristic(t *testing.T) { {1000, 1000, 1}, } for _, tc := range testcases { + const tableName = "tableName" table := &Table{ + Name: tableName, Cols: map[string]*Column{}, } - const tableName = "tableName" fieldsMap := concurrent.NewMapWith(tableName, table) lm := NewLogManager(fieldsMap, &config.QuesmaConfiguration{}) @@ -119,7 +121,7 @@ func TestAlterTableHeuristic(t *testing.T) { assert.Equal(t, int64(0), lm.ingestCounter) for i := range rowsToInsert { - _, _, err := lm.BuildIngestSQLStatements(tableName, types.MustJSON(rowsToInsert[i]), nil, chConfig) + _, _, err := lm.BuildIngestSQLStatements(table, types.MustJSON(rowsToInsert[i]), nil, chConfig) assert.NoError(t, err) } assert.Equal(t, tc.expected, len(table.Cols)) diff --git a/quesma/clickhouse/clickhouse.go b/quesma/clickhouse/clickhouse.go index 8d6953c28..6d4fad13b 100644 --- a/quesma/clickhouse/clickhouse.go +++ b/quesma/clickhouse/clickhouse.go @@ -602,18 +602,20 @@ func (lm *LogManager) shouldAlterColumns(table *Table, attrsMap map[string][]int return false, nil } -func (lm *LogManager) BuildIngestSQLStatements(tableName string, data types.JSON, inValidJson types.JSON, - config *ChTableConfig) (string, []string, error) { +func (lm *LogManager) BuildIngestSQLStatements(table *Table, + data types.JSON, + inValidJson types.JSON, + config *ChTableConfig, +) (string, []string, error) { - jsonData, err := json.Marshal(data) + jsonAsBytesSlice, err := json.Marshal(data) if err != nil { return "", nil, err } - jsonDataAsString := string(jsonData) // we find all non-schema fields - jsonMap, err := types.ParseJSON(jsonDataAsString) + jsonMap, err := types.ParseJSON(string(jsonAsBytesSlice)) if err != nil { return "", nil, err } @@ -624,7 +626,7 @@ func (lm *LogManager) BuildIngestSQLStatements(tableName string, data types.JSON // if we don't have any attributes, and it wasn't replaced, // we don't need to modify the json if !wasReplaced { - return jsonDataAsString, nil, nil + return string(jsonAsBytesSlice), nil, nil } rawBytes, err := jsonMap.Bytes() if err != nil { @@ -633,7 +635,6 @@ func (lm *LogManager) BuildIngestSQLStatements(tableName string, data types.JSON return string(rawBytes), nil, nil } - table := lm.FindTable(tableName) schemaFieldsJson, err := json.Marshal(jsonMap) if err != nil { @@ -642,8 +643,8 @@ func (lm *LogManager) BuildIngestSQLStatements(tableName string, data types.JSON mDiff := DifferenceMap(jsonMap, table) // TODO change to DifferenceMap(m, t) - if len(mDiff) == 0 && string(schemaFieldsJson) == jsonDataAsString && len(inValidJson) == 0 { // no need to modify, just insert 'js' - return jsonDataAsString, nil, nil + if len(mDiff) == 0 && string(schemaFieldsJson) == string(jsonAsBytesSlice) && len(inValidJson) == 0 { // no need to modify, just insert 'js' + return string(jsonAsBytesSlice), nil, nil } // check attributes precondition @@ -675,7 +676,6 @@ func (lm *LogManager) BuildIngestSQLStatements(tableName string, data types.JSON } onlySchemaFields := RemoveNonSchemaFields(jsonMap, table) - schemaFieldsJson, err = json.Marshal(onlySchemaFields) if err != nil { @@ -725,6 +725,8 @@ func (lm *LogManager) processInsertQuery(ctx context.Context, logger.ErrorWithCtx(ctx).Msgf("error ProcessInsertQuery, can't create table: %v", err) return nil, err } + // Set pointer to table after creating it + table = lm.FindTable(tableName) } else if !table.Created { err := lm.execute(ctx, table.createTableString()) if err != nil { @@ -734,10 +736,34 @@ func (lm *LogManager) processInsertQuery(ctx context.Context, } else { config = table.Config } - // TODO this is doing nested field encoding - // ---------------------- - return lm.GenerateSqlStatements(ctx, tableName, jsonData, config, transformer) - // ---------------------- + var jsonsReadyForInsertion []string + var alterCmd []string + var preprocessedJsons []types.JSON + var invalidJsons []types.JSON + preprocessedJsons, invalidJsons, err := lm.preprocessJsons(ctx, table.Name, jsonData, transformer) + if err != nil { + return nil, fmt.Errorf("error preprocessJsons: %v", err) + } + for i, preprocessedJson := range preprocessedJsons { + // TODO this is doing nested field encoding + // ---------------------- + insertJson, alter, err := lm.BuildIngestSQLStatements(table, preprocessedJson, + invalidJsons[i], config) + // ---------------------- + alterCmd = append(alterCmd, alter...) + if err != nil { + return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err) + } + jsonsReadyForInsertion = append(jsonsReadyForInsertion, insertJson) + } + + insertValues := strings.Join(jsonsReadyForInsertion, ", ") + insert := fmt.Sprintf("INSERT INTO \"%s\" FORMAT JSONEachRow %s", table.Name, insertValues) + + var statements []string + statements = append(statements, alterCmd...) + statements = append(statements, insert) + return statements, nil } func (lm *LogManager) ProcessInsertQuery(ctx context.Context, tableName string, @@ -814,41 +840,6 @@ func (lm *LogManager) preprocessJsons(ctx context.Context, return preprocessedJsons, invalidJsons, nil } -func (lm *LogManager) GenerateSqlStatements(ctx context.Context, - tableName string, jsons []types.JSON, - config *ChTableConfig, transformer jsonprocessor.IngestTransformer, -) ([]string, error) { - - var jsonsReadyForInsertion []string - var alterCmd []string - var preprocessedJsons []types.JSON - var invalidJsons []types.JSON - preprocessedJsons, invalidJsons, err := lm.preprocessJsons(ctx, tableName, jsons, transformer) - if err != nil { - return nil, fmt.Errorf("error preprocessJsons: %v", err) - } - for i, preprocessedJson := range preprocessedJsons { - // TODO this is doing nested field encoding - // ---------------------- - insertJson, alter, err := lm.BuildIngestSQLStatements(tableName, preprocessedJson, - invalidJsons[i], config) - // ---------------------- - alterCmd = append(alterCmd, alter...) - if err != nil { - return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", tableName, PrettyJson(insertJson), err) - } - jsonsReadyForInsertion = append(jsonsReadyForInsertion, insertJson) - } - - insertValues := strings.Join(jsonsReadyForInsertion, ", ") - insert := fmt.Sprintf("INSERT INTO \"%s\" FORMAT JSONEachRow %s", tableName, insertValues) - - var statements []string - statements = append(statements, alterCmd...) - statements = append(statements, insert) - return statements, nil -} - func (lm *LogManager) FindTable(tableName string) (result *Table) { tableNamePattern := index.TableNamePatternRegexp(tableName) lm.tableDiscovery.TableDefinitions(). diff --git a/quesma/clickhouse/clickhouse_test.go b/quesma/clickhouse/clickhouse_test.go index d81a264d0..c733bee31 100644 --- a/quesma/clickhouse/clickhouse_test.go +++ b/quesma/clickhouse/clickhouse_test.go @@ -46,9 +46,11 @@ func TestInsertNonSchemaFieldsToOthers_1(t *testing.T) { }, }) + tableName, exists := fieldsMap.Load("tableName") + assert.True(t, exists) f := func(t1, t2 TableMap) { lm := NewLogManager(fieldsMap, &config.QuesmaConfiguration{}) - j, alter, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowToInsert), nil, hasOthersConfig) + j, alter, err := lm.BuildIngestSQLStatements(tableName, types.MustJSON(rowToInsert), nil, hasOthersConfig) assert.NoError(t, err) assert.Equal(t, 0, len(alter)) m := make(SchemaMap)