diff --git a/examples/bulk_upsert/apache_arrow/main.go b/examples/bulk_upsert/apache_arrow/main.go index 21bbcaeca..039347f04 100644 --- a/examples/bulk_upsert/apache_arrow/main.go +++ b/examples/bulk_upsert/apache_arrow/main.go @@ -49,14 +49,14 @@ func main() { createSchema(ctx, db) - //fillTableWeather(ctx, db) + fillTableWeather(ctx, db) - //minTemperature, avgTemperature, maxTemperature, err := getWeatherStatsFromTable(ctx, db, 2014) - //if err != nil { - // panic(err) - //} + minTemperature, avgTemperature, maxTemperature, err := getWeatherStatsFromTable(ctx, db, 2014) + if err != nil { + panic(err) + } - //fmt.Println(minTemperature, avgTemperature, maxTemperature) + fmt.Println(minTemperature, avgTemperature, maxTemperature) fillTopicCommits(ctx, db) @@ -89,6 +89,7 @@ func fillTopicCommits(ctx context.Context, db *ydb.Driver) { scanner := bufio.NewScanner(bytes.NewReader(commitsJSON)) messages := make([]topicwriter.Message, 0, 1000) n := 0 + commits := 0 for scanner.Scan() { content := scanner.Bytes() @@ -97,6 +98,16 @@ func fillTopicCommits(ctx context.Context, db *ydb.Driver) { err = json.Unmarshal(content, &commit) if err == nil { messages = append(messages, topicwriter.Message{Data: bytes.NewReader(scanner.Bytes())}) + + date, err := time.Parse("2006-01-02 15:04:05", commit.Date) + if err != nil { + panic(err) + } + + if date.Year() == 2022 { + commits++ + } + n++ if n%1000 == 0 { err = writer.Write(ctx, messages...) @@ -110,6 +121,8 @@ func fillTopicCommits(ctx context.Context, db *ydb.Driver) { if err := scanner.Err(); err != nil { panic(err) } + + fmt.Printf("commits: %d\n", commits) } func fillTableWeather(ctx context.Context, db *ydb.Driver) {