Skip to content

Commit

Permalink
Add ContinuousBatching PerfMetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
pavel-esir committed Jan 10, 2025
1 parent 3b0ecb4 commit fc25c8e
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline {
friend class ContinuousBatchingForPromptLookupImpl;
friend class SpeculativeDecodingImpl;
friend class PromptLookupImpl;
friend class ContinuousBatchingAdapter;

std::shared_ptr<IContinuousBatchingPipeline> m_impl;

Expand Down
9 changes: 5 additions & 4 deletions src/cpp/src/continuous_batching_adapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "llm_pipeline_base.hpp"

#include "icontinuous_batching.hpp"
#include "openvino/genai/continuous_batching_pipeline.hpp"

namespace ov::genai {
Expand Down Expand Up @@ -87,7 +88,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase {
}, inputs);
const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config;
// -1 == config.eos_token_id and config.validate() are handled in m_impl.
std::vector<GenerationResult> generated = m_impl.generate(
auto [generated, perf_metrics] = m_impl.m_impl->generate(
prompts,
std::vector<GenerationConfig>{prompts.size(), config},
streamer
Expand All @@ -99,7 +100,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase {
std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_replies));
std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores));
}
return {std::move(plain_replies), std::move(plain_scores)};
return {std::move(plain_replies), std::move(plain_scores), std::move(perf_metrics)};
}

EncodedResults generate(
Expand Down Expand Up @@ -148,15 +149,15 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase {

const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config;
// -1 == config.eos_token_id and config.validate() are handled in m_impl.
std::vector<EncodedGenerationResult> generated = m_impl.generate(input_ids, std::vector<GenerationConfig>{input_ids.size(), config}, streamer);
auto [generated, perf_metrics] = m_impl.m_impl->generate(input_ids, std::vector<GenerationConfig>{input_ids.size(), config}, streamer);
std::vector<std::vector<int64_t>> plain_tokens;
std::vector<float> plain_scores;
for (EncodedGenerationResult& res : generated) {
OPENVINO_ASSERT(res.m_status == GenerationStatus::FINISHED || res.m_status == GenerationStatus::DROPPED_BY_HANDLE, "Got unfinished GenerationStatus");
std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_tokens));
std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores));
}
return {std::move(plain_tokens), std::move(plain_scores)};
return {std::move(plain_tokens), std::move(plain_scores), std::move(perf_metrics)};
}

void start_chat(const std::string& system_message) override {
Expand Down
27 changes: 19 additions & 8 deletions src/cpp/src/continuous_batching_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ bool ContinuousBatchingPipeline::ContinuousBatchingImpl::has_non_finished_reques
return !m_awaiting_requests.empty() || !m_requests.empty();
}

void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
size_t ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
static ManualTimer step_timer("step()");
step_timer.start();

Expand All @@ -157,7 +157,6 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
m_cache_manager->copy_blocks(scheduler_output.m_block_copy_map);
copy_blocks_timer.end();
}

// if no tokens were scheduled, we are out of memory => free all requests and return
if (scheduler_output.m_total_num_scheduled_tokens == 0) {
for (size_t i = 0; i < m_requests.size(); ++i) {
Expand All @@ -168,9 +167,10 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
}
}
_free_non_running_requests();
return;
return 0;
}

size_t num_generated_tokens = scheduler_output.m_total_num_scheduled_tokens;

ov::Tensor logits;
{
static ManualTimer timer("forward");
Expand Down Expand Up @@ -243,6 +243,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
}

step_timer.end();
return num_generated_tokens;
}

void ContinuousBatchingPipeline::ContinuousBatchingImpl::set_adapters(const std::optional<AdapterConfig>& adapters) {
Expand All @@ -251,13 +252,16 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::set_adapters(const std:
}
}

std::vector<EncodedGenerationResult>
std::pair<std::vector<EncodedGenerationResult>, PerfMetrics>
ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<ov::Tensor>& input_ids,
const std::vector<GenerationConfig>& sampling_params,
const StreamerVariant& streamer) {
OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request");
OPENVINO_ASSERT(input_ids.size() == sampling_params.size());

PerfMetrics perf_metrics;
auto& raw_perf_counters = perf_metrics.raw_metrics;

// checks that all requests has the same LoRA adapters property value
for (size_t i = 1; i < sampling_params.size(); ++i) {
OPENVINO_ASSERT(sampling_params[i - 1].adapters == sampling_params[i].adapters,
Expand Down Expand Up @@ -303,13 +307,20 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
bool continue_generation = true;
while (has_non_finished_requests() && continue_generation) {
try {
step();
const auto infer_start = std::chrono::steady_clock::now();
auto num_generated_tokens = step();
const auto infer_end = std::chrono::steady_clock::now();
const auto infer_ms = PerfMetrics::get_microsec(infer_end - infer_start);
raw_perf_counters.m_token_infer_durations.emplace_back(infer_ms);
// raw_perf_counters.m_inference_durations[0] += MicroSeconds(infer_ms);
raw_perf_counters.m_new_token_times.emplace_back(infer_end);
raw_perf_counters.m_batch_sizes.emplace_back(num_generated_tokens);
} catch (...) {
drop_requests(); // remove all requests from pipeline state in case of exception
throw;
}

auto & generation = generations.at(0);
GenerationHandle & generation = generations.at(0);
if (streamer_ptr && generation->can_read()) {
std::unordered_map<uint64_t, GenerationOutput> token = generation->back();
for (const auto& gen_token : token.begin()->second.generated_ids) {
Expand Down Expand Up @@ -362,7 +373,7 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
}

OPENVINO_ASSERT(results.size() == input_ids.size());
return results;
return {results, perf_metrics};
}

void ContinuousBatchingPipeline::ContinuousBatchingImpl::_free_non_running_requests() {
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/src/continuous_batching_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc

bool has_non_finished_requests() override;

void step() override;
size_t step() override;

std::vector<EncodedGenerationResult>
std::pair<std::vector<EncodedGenerationResult>, PerfMetrics>
generate(const std::vector<ov::Tensor>& input_ids,
const std::vector<GenerationConfig>& sampling_params,
const StreamerVariant& streamer) override;
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/src/continuous_batching_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ bool ContinuousBatchingPipeline::has_non_finished_requests() {
}

std::vector<EncodedGenerationResult> ContinuousBatchingPipeline::generate(const std::vector<ov::Tensor>& input_ids, const std::vector<ov::genai::GenerationConfig>& sampling_params, const StreamerVariant& streamer) {
return m_impl->generate(input_ids, sampling_params, streamer);
return m_impl->generate(input_ids, sampling_params, streamer).first;
}

std::vector<GenerationResult> ContinuousBatchingPipeline::generate(const std::vector<std::string>& prompts, const std::vector<ov::genai::GenerationConfig>& sampling_params, const StreamerVariant& streamer) {
return m_impl->generate(prompts, sampling_params, streamer);
return m_impl->generate(prompts, sampling_params, streamer).first;
}

void ContinuousBatchingPipeline::start_chat(const std::string& system_message) {
Expand Down
19 changes: 15 additions & 4 deletions src/cpp/src/icontinuous_batching.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ void ContinuousBatchingPipeline::IContinuousBatchingPipeline::finish_chat() {
m_history.clear();
};

std::vector<GenerationResult>
std::pair<std::vector<GenerationResult>, PerfMetrics>
ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate(
const std::vector<std::string>& prompts,
std::vector<ov::genai::GenerationConfig> sampling_params,
const StreamerVariant& streamer) {
std::vector<ov::Tensor> input_ids;
auto start_time = std::chrono::steady_clock::now();

static ManualTimer timer("tokenize");
if (m_is_chat_conversation) {
Expand All @@ -55,9 +56,10 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate(
timer.end();
}

std::vector<EncodedGenerationResult> encoded = generate(input_ids, sampling_params, streamer);

// std::vector<EncodedGenerationResult> encoded = generate(input_ids, sampling_params, streamer);
auto [encoded, perf_metrics] = generate(input_ids, sampling_params, streamer);
std::vector<GenerationResult> decoded;
auto decode_start_time = std::chrono::steady_clock::now();
decoded.reserve(encoded.size());
for (EncodedGenerationResult& res : encoded) {
std::vector<std::string> generated;
Expand All @@ -76,7 +78,16 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate(
res.m_status
});
}
auto stop_time = std::chrono::steady_clock::now();

auto& raw_counters = perf_metrics.raw_metrics;
raw_counters.generate_durations = std::vector<MicroSeconds>();
raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time));
raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_start_time - start_time));
raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(stop_time - decode_start_time));
perf_metrics.m_evaluated = false;
perf_metrics.evaluate_statistics(start_time);

return decoded;
return {decoded, perf_metrics};
}
}
6 changes: 3 additions & 3 deletions src/cpp/src/icontinuous_batching.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,20 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline {
/**
* Performs a single inference step of all running (and pulls awaiting) requests
*/
virtual void step() = 0;
virtual size_t step() = 0;

/**
* Performs monolitic generation based on encoded prompts
*/
virtual std::vector<EncodedGenerationResult>
virtual std::pair<std::vector<EncodedGenerationResult>, PerfMetrics>
generate(const std::vector<ov::Tensor>& input_ids,
const std::vector<GenerationConfig>& sampling_params,
const StreamerVariant& streamer) = 0;

/**
* Performs monolitic generation based on text prompts
*/
std::vector<GenerationResult>
std::pair<std::vector<GenerationResult>, PerfMetrics>
generate(const std::vector<std::string>& prompts,
std::vector<ov::genai::GenerationConfig> sampling_params,
const StreamerVariant& streamer);
Expand Down
11 changes: 8 additions & 3 deletions src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ bool ContinuousBatchingPipeline::PromptLookupImpl::has_non_finished_requests() {
return m_pipeline->has_non_finished_requests();
}

void ContinuousBatchingPipeline::PromptLookupImpl::step() {
size_t ContinuousBatchingPipeline::PromptLookupImpl::step() {
ManualTimer candidates_timer("prompt_lookup_decoding: generate_candidates()");
candidates_timer.start();
m_pipeline->generate_candidates();
Expand Down Expand Up @@ -67,9 +67,12 @@ void ContinuousBatchingPipeline::PromptLookupImpl::step() {
m_sd_metrics.print(true);
m_sd_metrics.clean_up();
}

// TODO: add valid number of output tokens
return 0;
}

std::vector<EncodedGenerationResult>
std::pair<std::vector<EncodedGenerationResult>, PerfMetrics>
ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector<ov::Tensor>& input_ids,
const std::vector<GenerationConfig>& sampling_params,
const StreamerVariant& streamer) {
Expand Down Expand Up @@ -110,6 +113,8 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector<ov::Ten

std::vector<EncodedGenerationResult> results;
results.reserve(input_ids.size());
PerfMetrics perf_metrics;
// TODO: add collecting statistics

bool continue_generation = true;
while (has_non_finished_requests() && continue_generation) {
Expand Down Expand Up @@ -158,7 +163,7 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector<ov::Ten
generate_timer.end();
m_sd_metrics.total_duration = generate_timer.get_duration();

return results;
return {results, perf_metrics};
}

SpeculativeDecodingMetrics
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/src/prompt_lookup/prompt_lookup_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ class ContinuousBatchingPipeline::PromptLookupImpl : public ContinuousBatchingPi

bool has_non_finished_requests() override;

void step() override;
size_t step() override;

std::vector<EncodedGenerationResult>
std::pair<std::vector<EncodedGenerationResult>, PerfMetrics>
generate(const std::vector<ov::Tensor>& input_ids,
const std::vector<GenerationConfig>& sampling_params,
const StreamerVariant& streamer) override;
Expand Down
11 changes: 8 additions & 3 deletions src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void print_generated_request(const ov::genai::GeneratedRequests& requests) {
}
}

void ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() {
size_t ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() {
// this blocks adding new requests during step as it may break coherence between main and draft models
std::lock_guard<std::mutex> lock{m_draft_generations_mutex};
m_draft_pipeline->pull_awaiting_requests(true);
Expand Down Expand Up @@ -186,9 +186,12 @@ void ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() {
m_sd_metrics.print(true);
m_sd_metrics.clean_up();
}

// TODO: return valid number of generated tokens
return 0;
}

std::vector<EncodedGenerationResult>
std::pair<std::vector<EncodedGenerationResult>, PerfMetrics>
ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector<ov::Tensor>& input_ids,
const std::vector<GenerationConfig>& sampling_params,
const StreamerVariant& streamer) {
Expand Down Expand Up @@ -236,6 +239,8 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector<

std::vector<EncodedGenerationResult> results;
results.reserve(input_ids.size());
PerfMetrics perf_metrics;
// TODO: add collecting perf statistics

bool continue_generation = true;
while (has_non_finished_requests() && continue_generation) {
Expand Down Expand Up @@ -280,7 +285,7 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector<

OPENVINO_ASSERT(results.size() == input_ids.size());
generate_timer.end();
return results;
return {results, perf_metrics};
}

SpeculativeDecodingMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ class ContinuousBatchingPipeline::SpeculativeDecodingImpl : public ContinuousBat

bool has_non_finished_requests() override;

void step() override;
size_t step() override;

std::vector<EncodedGenerationResult>
std::pair<std::vector<EncodedGenerationResult>, PerfMetrics>
generate(const std::vector<ov::Tensor>& input_ids,
const std::vector<GenerationConfig>& sampling_params,
const StreamerVariant& streamer) override;
Expand Down

0 comments on commit fc25c8e

Please sign in to comment.