Skip to content

Commit

Permalink
feat: impelement batch
Browse files Browse the repository at this point in the history
  • Loading branch information
nguyenvanduocit committed Nov 2, 2024
1 parent aee876d commit 87142e9
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 116 deletions.
1 change: 0 additions & 1 deletion cmd/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ type fileInfo struct {
type packingProgress struct {
fileCount int64
totalSize int64
mu sync.Mutex
}

func (p *packingProgress) update(size int64) {
Expand Down
188 changes: 73 additions & 115 deletions cmd/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ type elementToTranslate struct {

type translationBatch struct {
elements []elementToTranslate
filePath string
}

var fileLocks = make(map[string]*sync.Mutex)
Expand Down Expand Up @@ -119,27 +118,84 @@ func runTranslate(cmd *cobra.Command, args []string) error {
return fmt.Errorf("error getting translator: %v", err)
}

// we can handle 10 elements at a time
elementChan := make(chan elementToTranslate, 10)
doneChan := make(chan struct{})

go processElements(ctx, elementChan, doneChan, anthropicTranslator, limiter, bookName)

// 1 worker and 1 job at a time, mean 1 file at a time
err = processor.ProcessEpub(ctx, unzipPath, processor.Config{
Workers: 1,
JobBuffer: 1,
ResultBuffer: 10,
}, func(ctx context.Context, filePath string) error {
return processFile(ctx, filePath, elementChan)
return processFileDirectly(ctx, filePath, anthropicTranslator, limiter, bookName)
})

close(elementChan)
<-doneChan

return err
}

func processFileDirectly(ctx context.Context, filePath string, translator translator.Translator, limiter *rate.Limiter, bookName string) error {
fmt.Printf("\nProcessing file: %s\n", path.Base(filePath))

doc, err := openAndReadFile(filePath)
if err != nil {
return err
}

ensureUTF8Charset(doc)

selector := fmt.Sprintf("[%s]:not([%s])", util.ContentIdKey, util.TranslationByIdKey)
elements := doc.Find(selector)

if elements.Length() == 0 {
fmt.Printf("No elements to translate in %s\n", path.Base(filePath))
return nil
}

fmt.Printf("Found %d elements to translate in %s\n",
elements.Length(), path.Base(filePath))

// Create batches directly
var currentBatch translationBatch
maxBatchLength := 4000

elements.Each(func(i int, contentEl *goquery.Selection) {
select {
case <-ctx.Done():
return
default:
htmlContent, err := contentEl.Html()
if err != nil || len(htmlContent) <= 1 {
return
}

element := elementToTranslate{
filePath: filePath,
contentEl: contentEl,
doc: doc,
totalElements: elements.Length(),
index: i,
content: htmlContent,
}

currentBatchLength := getBatchLength(&currentBatch)
if currentBatchLength+len(htmlContent) > maxBatchLength && len(currentBatch.elements) > 0 {
// Process current batch
processBatch(ctx, filePath, currentBatch, translator, limiter, bookName)
// Start new batch
currentBatch = translationBatch{
elements: []elementToTranslate{element},
}
} else {
currentBatch.elements = append(currentBatch.elements, element)
}
}
})

// Process final batch if not empty
if len(currentBatch.elements) > 0 {
processBatch(ctx, filePath, currentBatch, translator, limiter, bookName)
}

return nil
}

func extractBookName(unzipPath string) (string, error) {
container, err := loader.ParseContainer(unzipPath)
if err != nil {
Expand All @@ -155,64 +211,6 @@ func extractBookName(unzipPath string) (string, error) {
return pkg.Metadata.Title, nil
}

func processElements(ctx context.Context, elementChan <-chan elementToTranslate, doneChan chan<- struct{}, anthropicTranslator translator.Translator, limiter *rate.Limiter, bookName string) {
defer close(doneChan)

fmt.Println("Starting element processing...")

// Create a batch map to group elements by file
batchMap := make(map[string]*translationBatch)
maxBatchLength := 4000 // Maximum characters per batch

for element := range elementChan {
fmt.Printf("Processing element from file: %s (%d/%d)\n",
path.Base(element.filePath), element.index+1, element.totalElements)

// Get HTML content
htmlContent, err := element.contentEl.Html()
if err != nil || len(htmlContent) <= 1 {
continue
}
element.content = htmlContent

// Add to batch if it won't exceed max length, otherwise process current batch
batch := batchMap[element.filePath]
if batch == nil {
batch = &translationBatch{
elements: []elementToTranslate{},
filePath: element.filePath,
}
batchMap[element.filePath] = batch
}

currentBatchLength := getBatchLength(batch)
if currentBatchLength+len(htmlContent) > maxBatchLength && len(batch.elements) > 0 {
// Process current batch before adding new element
fmt.Printf("Processing batch for file %s (length: %d, elements: %d)\n",
path.Base(element.filePath), currentBatchLength, len(batch.elements))
processBatch(ctx, batch, anthropicTranslator, limiter, bookName)

// Start new batch with current element
batchMap[element.filePath] = &translationBatch{
elements: []elementToTranslate{element},
filePath: element.filePath,
}
} else {
batch.elements = append(batch.elements, element)
}
}

// Process remaining batches
fmt.Printf("Processing remaining batches (count: %d)\n", len(batchMap))
for filePath, batch := range batchMap {
if len(batch.elements) > 0 {
fmt.Printf("Processing final batch for file %s (elements: %d)\n",
path.Base(filePath), len(batch.elements))
processBatch(ctx, batch, anthropicTranslator, limiter, bookName)
}
}
}

func getBatchLength(batch *translationBatch) int {
var length int
for _, element := range batch.elements {
Expand All @@ -221,13 +219,13 @@ func getBatchLength(batch *translationBatch) int {
return length
}

func processBatch(ctx context.Context, batch *translationBatch, anthropicTranslator translator.Translator, limiter *rate.Limiter, bookName string) {
func processBatch(ctx context.Context, filePath string, batch translationBatch, anthropicTranslator translator.Translator, limiter *rate.Limiter, bookName string) {
if len(batch.elements) == 0 {
return
}

fmt.Printf("Translating batch from file %s (segments: %d)\n",
path.Base(batch.filePath), len(batch.elements))
filePath, len(batch.elements))

// Combine contents with more distinct markers and instructions
var combinedContent strings.Builder
Expand All @@ -248,14 +246,13 @@ func processBatch(ctx context.Context, batch *translationBatch, anthropicTransla
translations := splitTranslations(translatedContent)
if len(translations) != len(batch.elements) {
fmt.Printf("Translation segments mismatch for %s: got %d, expected %d\n",
path.Base(batch.filePath), len(translations), len(batch.elements))
path.Base(filePath), len(translations), len(batch.elements))
return
}

fmt.Printf("Successfully translated batch from %s, writing to file...\n",
path.Base(batch.filePath))
fmt.Printf("Successfully translated batch from %s, writing to file...\n", path.Base(filePath))

fileLock := getFileLock(batch.filePath)
fileLock := getFileLock(filePath)
fileLock.Lock()
defer fileLock.Unlock()

Expand All @@ -268,7 +265,7 @@ func processBatch(ctx context.Context, batch *translationBatch, anthropicTransla
}
}

if err := writeContentToFile(batch.filePath, batch.elements[0].doc); err != nil {
if err := writeContentToFile(filePath, batch.elements[0].doc); err != nil {
fmt.Printf("Error writing to file: %v\n", err)
}
}
Expand All @@ -291,45 +288,6 @@ func splitTranslations(translatedContent string) []string {
return translations
}

func processFile(ctx context.Context, filePath string, elementChan chan<- elementToTranslate) error {
fmt.Printf("\nProcessing file: %s\n", path.Base(filePath))

doc, err := openAndReadFile(filePath)
if err != nil {
return err
}

ensureUTF8Charset(doc)

selector := fmt.Sprintf("[%s]:not([%s])", util.ContentIdKey, util.TranslationByIdKey)
needToBeTranslateEls := doc.Find(selector)

if needToBeTranslateEls.Length() == 0 {
fmt.Printf("No elements to translate in %s\n", path.Base(filePath))
return nil
}

fmt.Printf("Found %d elements to translate in %s\n",
needToBeTranslateEls.Length(), path.Base(filePath))

needToBeTranslateEls.Each(func(i int, contentEl *goquery.Selection) {
select {
case <-ctx.Done():
return
default:
elementChan <- elementToTranslate{
filePath: filePath,
contentEl: contentEl,
doc: doc,
totalElements: needToBeTranslateEls.Length(),
index: i,
}
}
})

return nil
}

func openAndReadFile(filePath string) (*goquery.Document, error) {
file, err := os.Open(filePath)
if err != nil {
Expand Down

0 comments on commit 87142e9

Please sign in to comment.