Skip to content

Commit

Permalink
attempt 9: process each chunk read into a map
Browse files Browse the repository at this point in the history
  • Loading branch information
shraddhaag committed Jan 29, 2024
1 parent 067f2a4 commit d4153ac
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 36 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@
|5|Read file in chunks of 100 MB instead of reading line by line. |3:32.62|-9.14|[c26fea4](https://github.com/shraddhaag/1brc/commit/c26fea40019552a7e4fc1c864236f433b1b686f0)|
|6|Convert temperature from `string` to `int64`, process in `int64` and convert to `float64` at the end. |2:51.50|-41.14|[7812da4](https://github.com/shraddhaag/1brc/commit/7812da4d0be07dd4686d5f9b9df1e93b08cd0dd1)|
|7|In the city <> temperatures map, replaced the value for each key (city) to preprocessed min, max, count and sum of all temperatures instead of storing all recorded temperatures for the city.|1:39.81|-71.79|[e5213a8](https://github.com/shraddhaag/1brc/commit/e5213a836b17bec0a858474a11f07c902e724bba)|
|8|Use producer consumer pattern to read file in chunks and process the chunks in parallel.|1:43.82|+14.01||
|8|Use producer consumer pattern to read file in chunks and process the chunks in parallel.|1:43.82|+14.01|[067f2a4](https://github.com/shraddhaag/1brc/commit/067f2a44c0d6b3bb7cc073639364f733bce09e3e)|
|9|Reduce memory allocation by processing each read chunk into a map. Result channel now can collate the smaller processed chunk maps.|0:28.544|-75.286||
72 changes: 37 additions & 35 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func readFileLineByLineIntoAMap(filepath string) (map[string]cityTemperatureInfo
defer file.Close()

mapOfTemp := make(map[string]cityTemperatureInfo)
resultStream := make(chan []string, 100)
resultStream := make(chan map[string]cityTemperatureInfo, 10)
chunkStream := make(chan []byte, 15)
chunkSize := 64 * 1024 * 1024
var wg sync.WaitGroup
Expand Down Expand Up @@ -155,31 +155,20 @@ func readFileLineByLineIntoAMap(filepath string) (map[string]cityTemperatureInfo

// process all city temperatures derived after processing the file chunks
for t := range resultStream {
for _, text := range t {
index := strings.Index(text, ";")
if index == -1 {
continue
}
city := text[:index]
temp := convertStringToInt64(text[index+1:])
for city, tempInfo := range t {
if val, ok := mapOfTemp[city]; ok {
val.count++
val.sum += temp
if temp < val.min {
val.min = temp
val.count += tempInfo.count
val.sum += tempInfo.sum
if tempInfo.min < val.min {
val.min = tempInfo.min
}

if temp > val.max {
val.max = temp
if tempInfo.max > val.max {
val.max = tempInfo.max
}
mapOfTemp[city] = val
} else {
mapOfTemp[city] = cityTemperatureInfo{
count: 1,
min: temp,
max: temp,
sum: temp,
}
mapOfTemp[city] = tempInfo
}
}
}
Expand All @@ -193,32 +182,45 @@ func convertStringToInt64(input string) int64 {
return output
}

func processReadChunk(buf []byte, resultStream chan<- []string) {
var count int
func processReadChunk(buf []byte, resultStream chan<- map[string]cityTemperatureInfo) {
var stringsBuilder strings.Builder
toSend := make([]string, 100)
toSend := make(map[string]cityTemperatureInfo)
var city string

for _, char := range buf {
if char == '\n' {
if stringsBuilder.Len() != 0 {
toSend[count] = stringsBuilder.String()
if char == ';' {
city = stringsBuilder.String()
stringsBuilder.Reset()
} else if char == '\n' {
if stringsBuilder.Len() != 0 && len(city) != 0 {
temp := convertStringToInt64(stringsBuilder.String())
stringsBuilder.Reset()
count++

if count == 100 {
count = 0
localCopy := make([]string, 100)
copy(localCopy, toSend)
resultStream <- localCopy
if val, ok := toSend[city]; ok {
val.count++
val.sum += temp
if temp < val.min {
val.min = temp
}

if temp > val.max {
val.max = temp
}
toSend[city] = val
} else {
toSend[city] = cityTemperatureInfo{
count: 1,
min: temp,
max: temp,
sum: temp,
}
}
}
} else {
stringsBuilder.WriteByte(char)
}
}
if count != 0 {
resultStream <- toSend[:count]
}
resultStream <- toSend
}

func round(x float64) float64 {
Expand Down
Binary file added profiles/cpu-map.prof
Binary file not shown.

0 comments on commit d4153ac

Please sign in to comment.