Skip to content

Commit

Permalink
master: track data load progress (#909)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz authored Dec 30, 2024
1 parent db0c194 commit c56ca63
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions master/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -1408,7 +1408,22 @@ func (m *Master) LoadDataFromDatabase(
evaluator *OnlineEvaluator,
nonPersonalizedRecommenders []*logics.NonPersonalized,
) (rankingDataset *ranking.DataSet, clickDataset *click.Dataset, err error) {
newCtx, span := progress.Start(ctx, "LoadDataFromDatabase", 4)
// Estimate the number of users, items, and feedbacks
estimatedNumUsers, err := m.DataClient.CountUsers(context.Background())
if err != nil {
return nil, nil, errors.Trace(err)
}
estimatedNumItems, err := m.DataClient.CountItems(context.Background())
if err != nil {
return nil, nil, errors.Trace(err)
}
estimatedNumFeedbacks, err := m.DataClient.CountFeedback(context.Background())
if err != nil {
return nil, nil, errors.Trace(err)
}

newCtx, span := progress.Start(ctx, "LoadDataFromDatabase",
estimatedNumUsers+estimatedNumItems+estimatedNumFeedbacks)
defer span.End()

// setup time limit
Expand Down Expand Up @@ -1468,6 +1483,7 @@ func (m *Master) LoadDataFromDatabase(
}
}
}
span.Add(len(users))
}
if err = <-errChan; err != nil {
return nil, nil, errors.Trace(err)
Expand All @@ -1478,7 +1494,6 @@ func (m *Master) LoadDataFromDatabase(
zap.Int32("n_user_labels", userLabelIndex.Len()),
zap.Duration("used_time", time.Since(start)))
LoadDatasetStepSecondsVec.WithLabelValues("load_users").Set(time.Since(start).Seconds())
span.Add(1)

// STEP 2: pull items
var items []data.Item
Expand Down Expand Up @@ -1528,6 +1543,7 @@ func (m *Master) LoadDataFromDatabase(
rankingDataset.HiddenItems[itemIndex] = true
}
}
span.Add(len(batchItems))
}
if err = <-errChan; err != nil {
return nil, nil, errors.Trace(err)
Expand All @@ -1538,7 +1554,6 @@ func (m *Master) LoadDataFromDatabase(
zap.Int32("n_item_labels", itemLabelIndex.Len()),
zap.Duration("used_time", time.Since(start)))
LoadDatasetStepSecondsVec.WithLabelValues("load_items").Set(time.Since(start).Seconds())
span.Add(1)

// create positive set
popularCount := make([]int32, rankingDataset.ItemCount())
Expand Down Expand Up @@ -1613,6 +1628,7 @@ func (m *Master) LoadDataFromDatabase(
}
}
}
span.Add(len(feedback))
}

// add item to non-personalized recommenders
Expand Down Expand Up @@ -1641,7 +1657,6 @@ func (m *Master) LoadDataFromDatabase(
zap.Int("n_positive_feedback", posFeedbackCount),
zap.Duration("used_time", time.Since(start)))
LoadDatasetStepSecondsVec.WithLabelValues("load_positive_feedback").Set(time.Since(start).Seconds())
span.Add(1)

// create negative set
negativeSet := make([]mapset.Set[int32], rankingDataset.UserCount())
Expand Down Expand Up @@ -1678,6 +1693,7 @@ func (m *Master) LoadDataFromDatabase(
evaluator.Read(userIndex, itemIndex, f.Timestamp)
mu.Unlock()
}
span.Add(len(feedback))
}
if err = <-errChan; err != nil {
return errors.Trace(err)
Expand All @@ -1691,7 +1707,6 @@ func (m *Master) LoadDataFromDatabase(
zap.Int("n_negative_feedback", int(negativeFeedbackCount)),
zap.Duration("used_time", time.Since(start)))
LoadDatasetStepSecondsVec.WithLabelValues("load_negative_feedback").Set(time.Since(start).Seconds())
span.Add(1)

// STEP 5: create click dataset
start = time.Now()
Expand Down

0 comments on commit c56ca63

Please sign in to comment.