diff --git a/A-pipeline-context-cancellation.md b/A-pipeline-context-cancellation.md index 506ef1e09..880423801 100644 --- a/A-pipeline-context-cancellation.md +++ b/A-pipeline-context-cancellation.md @@ -25,14 +25,14 @@ Ok langsung saja, pertama yang perlu dipersiapkan adalah tulis dulu kode program package main import ( - "fmt" - "io/ioutil" - "log" - "math/rand" - "os" - "path/filepath" - "sync" - "time" + "fmt" + "io/ioutil" + "log" + "math/rand" + "os" + "path/filepath" + "sync" + "time" ) const totalFile = 3000 @@ -45,10 +45,10 @@ var tempPath = filepath.Join(os.Getenv("TEMP"), "chapter-A.61-pipeline-cancellat ```go type FileInfo struct { - Index int - FileName string - WorkerIndex int - Err error + Index int + FileName string + WorkerIndex int + Err error } ``` @@ -56,13 +56,13 @@ type FileInfo struct { ```go func main() { - log.Println("start") - start := time.Now() + log.Println("start") + start := time.Now() - generateFiles() + generateFiles() - duration := time.Since(start) - log.Println("done in", duration.Seconds(), "seconds") + duration := time.Since(start) + log.Println("done in", duration.Seconds(), "seconds") } ``` @@ -70,16 +70,16 @@ func main() { ```go func randomString(length int) string { - randomizer := rand.New(rand.NewSource(time.Now().Unix())) - letters := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + randomizer := rand.New(rand.NewSource(time.Now().Unix())) + letters := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - b := make([]rune, length) - for i := range b { - s := randomizer.Intn(len(letters)) - b[i] = letters[s] - } + b := make([]rune, length) + for i := range b { + s := randomizer.Intn(len(letters)) + b[i] = letters[s] + } - return string(b) + return string(b) } ``` @@ -87,48 +87,48 @@ func randomString(length int) string { ```go func generateFiles() { - os.RemoveAll(tempPath) - os.MkdirAll(tempPath, os.ModePerm) - - // pipeline 1: job distribution - chanFileIndex := generateFileIndexes() - - // pipeline 2: the main logic (creating files) - createFilesWorker := 100 - chanFileResult := createFiles(chanFileIndex, createFilesWorker) - - // track and print output - counterTotal := 0 - counterSuccess := 0 - for fileResult := range chanFileResult { - if fileResult.Err != nil { - log.Printf("error creating file %s. stack trace: %s", fileResult.FileName, fileResult.Err) - } else { - counterSuccess++ - } - counterTotal++ - } - - log.Printf("%d/%d of total files created", counterSuccess, counterTotal) + os.RemoveAll(tempPath) + os.MkdirAll(tempPath, os.ModePerm) + + // pipeline 1: job distribution + chanFileIndex := generateFileIndexes() + + // pipeline 2: the main logic (creating files) + createFilesWorker := 100 + chanFileResult := createFiles(chanFileIndex, createFilesWorker) + + // track and print output + counterTotal := 0 + counterSuccess := 0 + for fileResult := range chanFileResult { + if fileResult.Err != nil { + log.Printf("error creating file %s. stack trace: %s", fileResult.FileName, fileResult.Err) + } else { + counterSuccess++ + } + counterTotal++ + } + + log.Printf("%d/%d of total files created", counterSuccess, counterTotal) } ``` #### • Fungsi `generateFileIndexes()` ```go func generateFileIndexes() <-chan FileInfo { - chanOut := make(chan FileInfo) - - go func() { - for i := 0; i < totalFile; i++ { - chanOut <- FileInfo{ - Index: i, - FileName: fmt.Sprintf("file-%d.txt", i), - } - } - close(chanOut) - }() - - return chanOut + chanOut := make(chan FileInfo) + + go func() { + for i := 0; i < totalFile; i++ { + chanOut <- FileInfo{ + Index: i, + FileName: fmt.Sprintf("file-%d.txt", i), + } + } + close(chanOut) + }() + + return chanOut } ``` @@ -136,39 +136,39 @@ func generateFileIndexes() <-chan FileInfo { ```go func createFiles(chanIn <-chan FileInfo, numberOfWorkers int) <-chan FileInfo { - chanOut := make(chan FileInfo) - - wg := new(sync.WaitGroup) - wg.Add(numberOfWorkers) - - go func() { - for workerIndex := 0; workerIndex < numberOfWorkers; workerIndex++ { - go func(workerIndex int) { - for job := range chanIn { - filePath := filepath.Join(tempPath, job.FileName) - content := randomString(contentLength) - err := ioutil.WriteFile(filePath, []byte(content), os.ModePerm) - - log.Println("worker", workerIndex, "working on", job.FileName, "file generation") - - chanOut <- FileInfo{ - FileName: job.FileName, - WorkerIndex: workerIndex, - Err: err, - } - } - - wg.Done() - }(workerIndex) - } - }() - - go func() { - wg.Wait() - close(chanOut) - }() - - return chanOut + chanOut := make(chan FileInfo) + + wg := new(sync.WaitGroup) + wg.Add(numberOfWorkers) + + go func() { + for workerIndex := 0; workerIndex < numberOfWorkers; workerIndex++ { + go func(workerIndex int) { + for job := range chanIn { + filePath := filepath.Join(tempPath, job.FileName) + content := randomString(contentLength) + err := ioutil.WriteFile(filePath, []byte(content), os.ModePerm) + + log.Println("worker", workerIndex, "working on", job.FileName, "file generation") + + chanOut <- FileInfo{ + FileName: job.FileName, + WorkerIndex: workerIndex, + Err: err, + } + } + + wg.Done() + }(workerIndex) + } + }() + + go func() { + wg.Wait() + close(chanOut) + }() + + return chanOut } ``` @@ -186,7 +186,7 @@ Tambahkan package `context` dalam block import packages. ```go import ( - "context" + "context" // ... ) @@ -241,9 +241,9 @@ Cara pembuatan object context sendiri sebenarnya ada 3: 2. Menggunakan fungsi `context.TODO()`. Fungsi ini menghasilkan objek context baru seperti `context.Background()`. Context buatan fungsi TODO ini biasanya digunakan dalam situasi ketika belum jelas nantinya harus menggunakan jenis context apa (apakah dengan timeout, apakah dengan cancel). 3. Menggunakan fungsi `context.With...`. Fungsi ini sebenarnya bukan digunakan untuk inisialisasi objek konteks baru, tapi digunakan untuk menambahkan informasi tertentu pada *copied* context yang disisipkan di parameter pertama pemanggilan fungsi. Ada 3 buah fungsi `context.With...` yang bisa digunakan, yaitu: - - Fungsi `context.WithCancel(ctx) (ctx, cancel)`. Fungsi ini digunakan untuk menambahkan fasilitas *cancellable* pada context yang disisipkan sebagai parameter pertama pemanggilan fungsi. Lewat nilai balik kedua, yaitu `cancel` yang tipenya `context.CancelFunc`, kita bisa secara paksa meng-*cancel* context ini. - - Fungsi `context.WithDeadline(ctx, time.Time) (ctx, cancel)`. Fungsi ini juga menambahkan fitur *cancellable* pada context, tapi selain itu juga menambahkan informasi deadline yang di mana jika waktu sekarang sudah melebihi deadline yang sudah ditentukan maka context otomatis di-cancel secara paksa. - - Fungsi `context.WithTimeout(ctx, time.Duration) (ctx, cancel)`. Fungsi ini sama seperti `context.WithDeadline()`, bedanya pada parameter kedua argument bertipe durasi (bukan objek `time.Time`). + - Fungsi `context.WithCancel(ctx) (ctx, cancel)`. Fungsi ini digunakan untuk menambahkan fasilitas *cancellable* pada context yang disisipkan sebagai parameter pertama pemanggilan fungsi. Lewat nilai balik kedua, yaitu `cancel` yang tipenya `context.CancelFunc`, kita bisa secara paksa meng-*cancel* context ini. + - Fungsi `context.WithDeadline(ctx, time.Time) (ctx, cancel)`. Fungsi ini juga menambahkan fitur *cancellable* pada context, tapi selain itu juga menambahkan informasi deadline yang di mana jika waktu sekarang sudah melebihi deadline yang sudah ditentukan maka context otomatis di-cancel secara paksa. + - Fungsi `context.WithTimeout(ctx, time.Duration) (ctx, cancel)`. Fungsi ini sama seperti `context.WithDeadline()`, bedanya pada parameter kedua argument bertipe durasi (bukan objek `time.Time`). Kesamaan dari ketiga fungsi `context.With...` adalah sama-sama menambahkan fasilitas *cancellable* yang bisa dieksekusi lewat nilai balik kedua fungsi tersebut (yang tipenya `context.CancelFunc`). @@ -260,7 +260,7 @@ Isi dari fungsi `generateFiles()` kita ubah menjadi pemanggilan fungsi `generate ```go func generateFiles() { - generateFilesWithContext(context.Background()) + generateFilesWithContext(context.Background()) } ``` @@ -268,39 +268,39 @@ Pada fungsi `generateFilesWithContext()` sendiri, isinya adalah isi `generateFil ```go func generateFilesWithContext(ctx context.Context) { - os.RemoveAll(tempPath) - os.MkdirAll(tempPath, os.ModePerm) - - done := make(chan int) - - go func() { - // pipeline 1: job distribution - chanFileIndex := generateFileIndexes(ctx) - - // pipeline 2: the main logic (creating files) - createFilesWorker := 100 - chanFileResult := createFiles(ctx, chanFileIndex, createFilesWorker) - - // track and print output - counterSuccess := 0 - for fileResult := range chanFileResult { - if fileResult.Err != nil { - log.Printf("error creating file %s. stack trace: %s", fileResult.FileName, fileResult.Err) - } else { - counterSuccess++ - } - } - - // notify that the process is complete - done <- counterSuccess - }() - - select { - case <-ctx.Done(): - log.Printf("generation process stopped. %s", ctx.Err()) - case counterSuccess := <-done: - log.Printf("%d/%d of total files created", counterSuccess, totalFile) - } + os.RemoveAll(tempPath) + os.MkdirAll(tempPath, os.ModePerm) + + done := make(chan int) + + go func() { + // pipeline 1: job distribution + chanFileIndex := generateFileIndexes(ctx) + + // pipeline 2: the main logic (creating files) + createFilesWorker := 100 + chanFileResult := createFiles(ctx, chanFileIndex, createFilesWorker) + + // track and print output + counterSuccess := 0 + for fileResult := range chanFileResult { + if fileResult.Err != nil { + log.Printf("error creating file %s. stack trace: %s", fileResult.FileName, fileResult.Err) + } else { + counterSuccess++ + } + } + + // notify that the process is complete + done <- counterSuccess + }() + + select { + case <-ctx.Done(): + log.Printf("generation process stopped. %s", ctx.Err()) + case counterSuccess := <-done: + log.Printf("%d/%d of total files created", counterSuccess, totalFile) + } } ``` @@ -330,24 +330,24 @@ Silakan tulis kode berikut pada fungsi `generateFileIndexes()`. ```go func generateFileIndexes(ctx context.Context) <-chan FileInfo { - chanOut := make(chan FileInfo) - - go func() { - for i := 0; i < totalFile; i++ { - select { - case <-ctx.Done(): - break - default: - chanOut <- FileInfo{ - Index: i, - FileName: fmt.Sprintf("file-%d.txt", i), - } - } - } - close(chanOut) - }() - - return chanOut + chanOut := make(chan FileInfo) + + go func() { + for i := 0; i < totalFile; i++ { + select { + case <-ctx.Done(): + break + default: + chanOut <- FileInfo{ + Index: i, + FileName: fmt.Sprintf("file-%d.txt", i), + } + } + } + close(chanOut) + }() + + return chanOut } ``` @@ -366,44 +366,44 @@ Silakan tulis kode berikut. ```go func createFiles(ctx context.Context, chanIn <-chan FileInfo, numberOfWorkers int) <-chan FileInfo { - chanOut := make(chan FileInfo) - - wg := new(sync.WaitGroup) - wg.Add(numberOfWorkers) - - go func() { - for workerIndex := 0; workerIndex < numberOfWorkers; workerIndex++ { - go func(workerIndex int) { - for job := range chanIn { - select { - case <-ctx.Done(): - break - default: - filePath := filepath.Join(tempPath, job.FileName) - content := randomString(contentLength) - err := ioutil.WriteFile(filePath, []byte(content), os.ModePerm) - - log.Println("worker", workerIndex, "working on", job.FileName, "file generation") - - chanOut <- FileInfo{ - FileName: job.FileName, - WorkerIndex: workerIndex, - Err: err, - } - } - } - - wg.Done() - }(workerIndex) - } - }() - - go func() { - wg.Wait() - close(chanOut) - }() - - return chanOut + chanOut := make(chan FileInfo) + + wg := new(sync.WaitGroup) + wg.Add(numberOfWorkers) + + go func() { + for workerIndex := 0; workerIndex < numberOfWorkers; workerIndex++ { + go func(workerIndex int) { + for job := range chanIn { + select { + case <-ctx.Done(): + break + default: + filePath := filepath.Join(tempPath, job.FileName) + content := randomString(contentLength) + err := ioutil.WriteFile(filePath, []byte(content), os.ModePerm) + + log.Println("worker", workerIndex, "working on", job.FileName, "file generation") + + chanOut <- FileInfo{ + FileName: job.FileName, + WorkerIndex: workerIndex, + Err: err, + } + } + } + + wg.Done() + }(workerIndex) + } + }() + + go func() { + wg.Wait() + close(chanOut) + }() + + return chanOut } ``` diff --git a/A-simplified-fan-in-fan-out-pipeline.md b/A-simplified-fan-in-fan-out-pipeline.md index ee85da0c6..58ade4211 100644 --- a/A-simplified-fan-in-fan-out-pipeline.md +++ b/A-simplified-fan-in-fan-out-pipeline.md @@ -26,13 +26,13 @@ Siapkan folder project baru, isinya satu buah file `1-generate-dummy-files-seque package main import ( - "fmt" - "io/ioutil" - "log" - "math/rand" - "os" - "path/filepath" - "time" + "fmt" + "io/ioutil" + "log" + "math/rand" + "os" + "path/filepath" + "time" ) const totalFile = 3000 @@ -45,13 +45,13 @@ var tempPath = filepath.Join(os.Getenv("TEMP"), "chapter-A.60-worker-pool") ```go func main() { - log.Println("start") - start := time.Now() + log.Println("start") + start := time.Now() - generateFiles() + generateFiles() - duration := time.Since(start) - log.Println("done in", duration.Seconds(), "seconds") + duration := time.Since(start) + log.Println("done in", duration.Seconds(), "seconds") } ``` @@ -59,15 +59,15 @@ func main() { ```go func randomString(length int) string { - randomizer := rand.New(rand.NewSource(time.Now().Unix())) - letters := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + randomizer := rand.New(rand.NewSource(time.Now().Unix())) + letters := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - b := make([]rune, length) - for i := range b { - b[i] = letters[randomizer.Intn(len(letters))] - } + b := make([]rune, length) + for i := range b { + b[i] = letters[randomizer.Intn(len(letters))] + } - return string(b) + return string(b) } ``` @@ -76,21 +76,21 @@ func randomString(length int) string { ```go func generateFiles() { - os.RemoveAll(tempPath) - os.MkdirAll(tempPath, os.ModePerm) + os.RemoveAll(tempPath) + os.MkdirAll(tempPath, os.ModePerm) - for i := 0; i < totalFile; i++ { - filename := filepath.Join(tempPath, fmt.Sprintf("file-%d.txt", i)) - content := randomString(contentLength) - err := ioutil.WriteFile(filename, []byte(content), os.ModePerm) - if err != nil { - log.Println("Error writing file", filename) - } + for i := 0; i < totalFile; i++ { + filename := filepath.Join(tempPath, fmt.Sprintf("file-%d.txt", i)) + content := randomString(contentLength) + err := ioutil.WriteFile(filename, []byte(content), os.ModePerm) + if err != nil { + log.Println("Error writing file", filename) + } - log.Println(i, "files created") - } + log.Println(i, "files created") + } - log.Printf("%d of total files created", totalFile) + log.Printf("%d of total files created", totalFile) } ``` @@ -112,14 +112,14 @@ Import beberapa hal pada file baru ini, lalu definisikan beberapa variabel juga. package main import ( - "fmt" - "io/ioutil" - "log" - "math/rand" - "os" - "path/filepath" - "sync" - "time" + "fmt" + "io/ioutil" + "log" + "math/rand" + "os" + "path/filepath" + "sync" + "time" ) const totalFile = 3000 @@ -134,10 +134,10 @@ Kita perlu siapkan struct baru bernama `FileInfo`, struct ini digunakan sebagai ```go type FileInfo struct { - Index int - FileName string - WorkerIndex int - Err error + Index int + FileName string + WorkerIndex int + Err error } ``` @@ -149,13 +149,13 @@ type FileInfo struct { ```go func main() { - log.Println("start") - start := time.Now() + log.Println("start") + start := time.Now() - generateFiles() + generateFiles() - duration := time.Since(start) - log.Println("done in", duration.Seconds(), "seconds") + duration := time.Since(start) + log.Println("done in", duration.Seconds(), "seconds") } ``` @@ -163,15 +163,15 @@ func main() { ```go func randomString(length int) string { - randomizer := rand.New(rand.NewSource(time.Now().Unix())) - letters := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + randomizer := rand.New(rand.NewSource(time.Now().Unix())) + letters := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - b := make([]rune, length) - for i := range b { - b[i] = letters[randomizer.Intn(len(letters))] - } + b := make([]rune, length) + for i := range b { + b[i] = letters[randomizer.Intn(len(letters))] + } - return string(b) + return string(b) } ``` @@ -179,29 +179,29 @@ func randomString(length int) string { ```go func generateFiles() { - os.RemoveAll(tempPath) - os.MkdirAll(tempPath, os.ModePerm) - - // pipeline 1: job distribution - chanFileIndex := generateFileIndexes() - - // pipeline 2: the main logic (creating files) - createFilesWorker := 100 - chanFileResult := createFiles(chanFileIndex, createFilesWorker) - - // track and print output - counterTotal := 0 - counterSuccess := 0 - for fileResult := range chanFileResult { - if fileResult.Err != nil { - log.Printf("error creating file %s. stack trace: %s", fileResult.FileName, fileResult.Err) - } else { - counterSuccess++ - } - counterTotal++ - } - - log.Printf("%d/%d of total files created", counterSuccess, counterTotal) + os.RemoveAll(tempPath) + os.MkdirAll(tempPath, os.ModePerm) + + // pipeline 1: job distribution + chanFileIndex := generateFileIndexes() + + // pipeline 2: the main logic (creating files) + createFilesWorker := 100 + chanFileResult := createFiles(chanFileIndex, createFilesWorker) + + // track and print output + counterTotal := 0 + counterSuccess := 0 + for fileResult := range chanFileResult { + if fileResult.Err != nil { + log.Printf("error creating file %s. stack trace: %s", fileResult.FileName, fileResult.Err) + } else { + counterSuccess++ + } + counterTotal++ + } + + log.Printf("%d/%d of total files created", counterSuccess, counterTotal) } ``` @@ -223,19 +223,19 @@ Fungsi ini merupakan fungsi Fan-out distribusi jobs. Di dalamnya dilakukan perul ```go func generateFileIndexes() <-chan FileInfo { - chanOut := make(chan FileInfo) - - go func() { - for i := 0; i < totalFile; i++ { - chanOut <- FileInfo{ - Index: i, - FileName: fmt.Sprintf("file-%d.txt", i), - } - } - close(chanOut) - }() - - return chanOut + chanOut := make(chan FileInfo) + + go func() { + for i := 0; i < totalFile; i++ { + chanOut <- FileInfo{ + Index: i, + FileName: fmt.Sprintf("file-%d.txt", i), + } + } + close(chanOut) + }() + + return chanOut } ``` @@ -249,53 +249,53 @@ Mungkin lebih enak silakan tulis dulu fungsinya, lalu kita bahas satu per satu s ```go func createFiles(chanIn <-chan FileInfo, numberOfWorkers int) <-chan FileInfo { - chanOut := make(chan FileInfo) + chanOut := make(chan FileInfo) - // wait group to control the workers - wg := new(sync.WaitGroup) + // wait group to control the workers + wg := new(sync.WaitGroup) - // allocate N of workers - wg.Add(numberOfWorkers) + // allocate N of workers + wg.Add(numberOfWorkers) - go func() { + go func() { - // dispatch N workers - for workerIndex := 0; workerIndex < numberOfWorkers; workerIndex++ { - go func(workerIndex int) { + // dispatch N workers + for workerIndex := 0; workerIndex < numberOfWorkers; workerIndex++ { + go func(workerIndex int) { - // listen to `chanIn` channel for incoming jobs - for job := range chanIn { + // listen to `chanIn` channel for incoming jobs + for job := range chanIn { - // do the jobs - filePath := filepath.Join(tempPath, job.FileName) - content := randomString(contentLength) - err := ioutil.WriteFile(filePath, []byte(content), os.ModePerm) + // do the jobs + filePath := filepath.Join(tempPath, job.FileName) + content := randomString(contentLength) + err := ioutil.WriteFile(filePath, []byte(content), os.ModePerm) - log.Println("worker", workerIndex, "working on", job.FileName, "file generation") + log.Println("worker", workerIndex, "working on", job.FileName, "file generation") - // construct the job's result, and send it to `chanOut` - chanOut <- FileInfo{ - FileName: job.FileName, - WorkerIndex: workerIndex, - Err: err, - } - } + // construct the job's result, and send it to `chanOut` + chanOut <- FileInfo{ + FileName: job.FileName, + WorkerIndex: workerIndex, + Err: err, + } + } - // if `chanIn` is closed, and the remaining jobs are finished, - // only then we mark the worker as complete. - wg.Done() - }(workerIndex) - } - }() + // if `chanIn` is closed, and the remaining jobs are finished, + // only then we mark the worker as complete. + wg.Done() + }(workerIndex) + } + }() - // wait until `chanIn` closed and then all workers are done, - // because right after that - we need to close the `chanOut` channel. - go func() { - wg.Wait() - close(chanOut) - }() + // wait until `chanIn` closed and then all workers are done, + // because right after that - we need to close the `chanOut` channel. + go func() { + wg.Wait() + close(chanOut) + }() - return chanOut + return chanOut } ```