Skip to content

Commit

Permalink
attempt 5: read file in chunks instead of line by line
Browse files Browse the repository at this point in the history
  • Loading branch information
shraddhaag committed Jan 23, 2024
1 parent b7b1781 commit c26fea4
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 46 deletions.
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

| Attempt Number | Approach | Execution Time | Diff | Commit |
|-----------------|---|---|---|--|
|1| Naive Implementation: Read temperatures into a map of cities. Iterate serially over each key (city) in map to find min, max and average temperatures.| 6:13.15 | ||
|2| Evaluate each city in map concurrently using goroutines.|4:32.80|-100.35| [8bd5f43](https://github.com/shraddhaag/1brc/commit/8bd5f437e8cc231e3ee18348b83f4dc694137546)|
|3|Remove sorting float64 slices. Calculate min, max and average by iterating.|4:25.59|-7.21|[830e5df](https://github.com/shraddhaag/1brc/commit/830e5dfacff9fb7a41d12027e21399736bc34701)|
|4|Decouple reading and processing of file content. A buffered goroutine is used to communicate between the two processes.|5:22.83|+57.24|[2babf7d](https://github.com/shraddhaag/1brc/commit/2babf7dda72d92c72722b220b8b663e747075bd7)|
|5|Instead of sending each line to the channel, now sending 100 lines chunked together. Also, to minimise garbage collection, not freeing up memory when resetting a slice. |3:41.76|-161.07||
|0| Naive Implementation: Read temperatures into a map of cities. Iterate serially over each key (city) in map to find min, max and average temperatures.| 6:13.15 | ||
|1| Evaluate each city in map concurrently using goroutines.|4:32.80|-100.35| [8bd5f43](https://github.com/shraddhaag/1brc/commit/8bd5f437e8cc231e3ee18348b83f4dc694137546)|
|2|Remove sorting float64 slices. Calculate min, max and average by iterating.|4:25.59|-7.21|[830e5df](https://github.com/shraddhaag/1brc/commit/830e5dfacff9fb7a41d12027e21399736bc34701)|
|3|Decouple reading and processing of file content. A buffered goroutine is used to communicate between the two processes.|5:22.83|+57.24|[2babf7d](https://github.com/shraddhaag/1brc/commit/2babf7dda72d92c72722b220b8b663e747075bd7)|
|4|Instead of sending each line to the channel, now sending 100 lines chunked together. Also, to minimise garbage collection, not freeing up memory when resetting a slice. |3:41.76|-161.07|[b7b1781](https://github.com/shraddhaag/1brc/commit/b7b1781f58fd258a06940bd6c05eb404c8a14af6)|
|5|Read file in chunks of 100 MB instead of reading line by line. |3:32.62|-9.14||
123 changes: 82 additions & 41 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package main

import (
"bufio"
"errors"
"flag"
"fmt"
"io"
"log"
"math"
"os"
"runtime"
"runtime/pprof"
"runtime/trace"
"sort"
"strconv"
"strings"
Expand All @@ -14,39 +19,47 @@ import (

var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`")
var memprofile = flag.String("memprofile", "", "write memory profile to `file`")
var executionprofile = flag.String("execprofile", "", "write tarce execution to `file`")

func main() {

// trace.Start(os.Stderr)
// defer trace.Stop()

// flag.Parse()
// if *cpuprofile != "" {
// f, err := os.Create("./profiles/" + *cpuprofile)
// if err != nil {
// log.Fatal("could not create CPU profile: ", err)
// }
// defer f.Close() // error handling omitted for example
// if err := pprof.StartCPUProfile(f); err != nil {
// log.Fatal("could not start CPU profile: ", err)
// }
// defer pprof.StopCPUProfile()
// }

evaluate()
// fmt.Println(evaluate())

// if *memprofile != "" {
// f, err := os.Create("./profiles/" + *memprofile)
// if err != nil {
// log.Fatal("could not create memory profile: ", err)
// }
// defer f.Close() // error handling omitted for example
// runtime.GC() // get up-to-date statistics
// if err := pprof.WriteHeapProfile(f); err != nil {
// log.Fatal("could not write memory profile: ", err)
// }
// }
flag.Parse()

if *executionprofile != "" {
f, err := os.Create("./profiles/" + *executionprofile)
if err != nil {
log.Fatal("could not create trace execution profile: ", err)
}
defer f.Close()
trace.Start(f)
defer trace.Stop()
}

if *cpuprofile != "" {
f, err := os.Create("./profiles/" + *cpuprofile)
if err != nil {
log.Fatal("could not create CPU profile: ", err)
}
defer f.Close()
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal("could not start CPU profile: ", err)
}
defer pprof.StopCPUProfile()
}

fmt.Println(evaluate())

if *memprofile != "" {
f, err := os.Create("./profiles/" + *memprofile)
if err != nil {
log.Fatal("could not create memory profile: ", err)
}
defer f.Close()
runtime.GC()
if err := pprof.WriteHeapProfile(f); err != nil {
log.Fatal("could not write memory profile: ", err)
}
}
}

func evaluate() string {
Expand Down Expand Up @@ -109,19 +122,24 @@ func readFileLineByLineIntoAMap(filepath string) (map[string][]float64, error) {
chanOwner := func() <-chan []string {
resultStream := make(chan []string, 100)
toSend := make([]string, 100)
// reading 100MB per request
chunkSize := 100 * 1024 * 1024
buf := make([]byte, chunkSize)
var stringsBuilder strings.Builder
stringsBuilder.Grow(500)
var count int
go func() {
defer close(resultStream)
scanner := bufio.NewScanner(file)
var count int
for scanner.Scan() {
if count == 100 {
localCopy := make([]string, 100)
copy(localCopy, toSend)
resultStream <- localCopy
count = 0
for {
readTotal, err := file.Read(buf)
if err != nil {
if errors.Is(err, io.EOF) {
count = processReadChunk(buf, readTotal, count, &stringsBuilder, toSend, resultStream)
break
}
panic(err)
}
toSend[count] = scanner.Text()
count++
count = processReadChunk(buf, readTotal, count, &stringsBuilder, toSend, resultStream)
}
if count != 0 {
resultStream <- toSend[:count]
Expand Down Expand Up @@ -158,3 +176,26 @@ func convertStringToFloat(input string) float64 {
output, _ := strconv.ParseFloat(input, 64)
return output
}

func processReadChunk(buf []byte, readTotal, count int, stringsBuilder *strings.Builder, toSend []string, resultStream chan<- []string) int {
for _, char := range buf[:readTotal] {
if char == '\n' {
if stringsBuilder.Len() != 0 {
toSend[count] = stringsBuilder.String()
stringsBuilder.Reset()
count++

if count == 100 {
count = 0
localCopy := make([]string, 100)
copy(localCopy, toSend)
resultStream <- localCopy
}
}
} else {
stringsBuilder.WriteByte(char)
}
}

return count
}
Binary file added profiles/cpu-read-chunk-2.prof
Binary file not shown.
Binary file added profiles/cpu-read-chunk.prof
Binary file not shown.

0 comments on commit c26fea4

Please sign in to comment.