From 653b2aeb92885eb44f664ad221418ac72eb0d9ab Mon Sep 17 00:00:00 2001 From: Ilya Lavrenov Date: Tue, 31 Dec 2024 08:11:27 +0400 Subject: [PATCH] [CB] Simplify SequenceGroup API (#1456) - Removed `enable_prefix_caching` parameter from `SequenceGroup` ctor - Removed necessity to call `set_sequence_group_ptr` after creation of sequence group - Renamed `get_cumulative_log_probs` to `get_cumulative_log_prob` as it returns a floating point value --- src/cpp/src/continuous_batching_impl.cpp | 6 +- src/cpp/src/llm_pipeline_stateful.cpp | 6 +- src/cpp/src/lm_encoding.cpp | 11 +- src/cpp/src/sequence_group.hpp | 89 +++++++------- ...batching_for_speculative_decoding_impl.cpp | 2 +- src/cpp/src/visual_language/pipeline.cpp | 4 +- tests/cpp/block_manager.cpp | 17 +-- tests/cpp/cache_manager.cpp | 15 ++- tests/cpp/sampler.cpp | 12 +- tests/cpp/scheduler.cpp | 113 ++++++++---------- tests/cpp/speculative_decoding.cpp | 4 +- 11 files changed, 129 insertions(+), 150 deletions(-) diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index 9e20171dcb..3ab242418e 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -105,9 +105,7 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::add_request(uint64_t request SequenceGroup::Ptr sequence_group = std::make_shared(request_id, input_ids, sampling_params, - m_scheduler->get_block_size(), - m_scheduler->get_config().enable_prefix_caching); - sequence_group->set_sequence_group_ptr(sequence_group); + m_scheduler->get_block_size()); if (m_scheduler->get_config().enable_prefix_caching) { m_scheduler->restore_cached_blocks(sequence_group); @@ -353,7 +351,7 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vectorget_beam_search_score(sampling_params) : sequence->get_cumulative_log_probs(); + const float score = sampling_params.is_beam_search() ? sequence->get_beam_search_score(sampling_params) : sequence->get_cumulative_log_prob(); const auto & generated_ids = sequence->get_generated_ids(); if (sampling_params.echo) diff --git a/src/cpp/src/llm_pipeline_stateful.cpp b/src/cpp/src/llm_pipeline_stateful.cpp index bdaae50b04..cbcca62978 100644 --- a/src/cpp/src/llm_pipeline_stateful.cpp +++ b/src/cpp/src/llm_pipeline_stateful.cpp @@ -300,23 +300,21 @@ EncodedResults StatefulLLMPipeline::generate( std::vector requests; size_t block_size = 1; - bool enable_prefix_caching = false; for (size_t request_id = 0; request_id < batch_size; request_id++) { SequenceGroup::Ptr sequence_group; if (is_chat_conversation) { ov::Tensor tokenized_chat_history = ov::Tensor(ov::element::i64, {1, m_tokenized_chat_history.size()}, m_tokenized_chat_history.data()); - sequence_group = std::make_shared(request_id, tokenized_chat_history, config, block_size, enable_prefix_caching); + sequence_group = std::make_shared(request_id, tokenized_chat_history, config, block_size); } else { size_t seq_len = input_ids.get_shape().at(1); size_t batch_offset = request_id * seq_len; const int64_t* prompt_start = input_ids.data() + batch_offset; std::vector tokenized_prompt(prompt_start, prompt_start + seq_len); - sequence_group = std::make_shared(request_id, tokenized_prompt, config, block_size, enable_prefix_caching); + sequence_group = std::make_shared(request_id, tokenized_prompt, config, block_size); } - sequence_group->set_sequence_group_ptr(sequence_group); requests.push_back(sequence_group); } diff --git a/src/cpp/src/lm_encoding.cpp b/src/cpp/src/lm_encoding.cpp index 17a20dd961..083c591927 100644 --- a/src/cpp/src/lm_encoding.cpp +++ b/src/cpp/src/lm_encoding.cpp @@ -119,10 +119,13 @@ std::pair> get_lm_encoded_results( auto logits = m_llm.get_tensor("logits"); - int64_t sequence_len = logits.get_shape().at(1); + // since we have applied `Slice` operationto last MatMul, model output sequence lenght is 1 + // so, we need to update sequence groups to think that they already have processed all prompt tokens except last ones + // and schedule only `output_sequence_len` ones + int64_t output_sequence_len = logits.get_shape().at(1); for (auto& sequence_group : sequence_groups) { - sequence_group->update_processed_tokens_num(sequence_group->get_prompt_len() - sequence_len); - sequence_group->schedule_tokens(sequence_len); + sequence_group->update_processed_tokens_num(sequence_group->get_prompt_len() - output_sequence_len); + sequence_group->schedule_tokens(output_sequence_len); } std::map beam_offets; @@ -217,7 +220,7 @@ std::pair> get_lm_encoded_results( for (size_t seq_id = 0; seq_id < num_outputs; ++seq_id) { const auto & sequence = sequences[seq_id]; - const float score = sampling_params.is_beam_search() ? sequence->get_beam_search_score(sampling_params) : sequence->get_cumulative_log_probs(); + const float score = sampling_params.is_beam_search() ? sequence->get_beam_search_score(sampling_params) : sequence->get_cumulative_log_prob(); results.tokens.push_back(sequence->get_generated_ids()); results.scores.push_back(score); diff --git a/src/cpp/src/sequence_group.hpp b/src/cpp/src/sequence_group.hpp index 220e93c032..8f8d5f899e 100644 --- a/src/cpp/src/sequence_group.hpp +++ b/src/cpp/src/sequence_group.hpp @@ -4,9 +4,11 @@ #pragma once #include +#include #include #include #include +#include #include "openvino/genai/generation_handle.hpp" #include "openvino/genai/generation_config.hpp" @@ -40,32 +42,32 @@ class Sequence { GenerationFinishReason m_finish_reason = GenerationFinishReason::NONE; float m_cumulative_log_prob = 0.0f; std::vector m_prefix_hashes; - std::weak_ptr m_sequence_group; + SequenceGroup* m_sequence_group = nullptr; static std::mutex m_counter_mutex; size_t _make_hash(size_t content_length); -public: - using Ptr = std::shared_ptr; - using CPtr = std::shared_ptr; - // don't use directly - Sequence(const uint64_t id) : m_grouped_id(id) {}; + explicit Sequence(const uint64_t id) : m_grouped_id(id) {} - // don't use directly Sequence(const Sequence& seq, const uint64_t id) : m_generated_ids(seq.m_generated_ids), m_grouped_id(id), m_status(seq.m_status), - m_cumulative_log_prob(seq.m_cumulative_log_prob){ + m_cumulative_log_prob(seq.m_cumulative_log_prob), + m_sequence_group(seq.m_sequence_group) { OPENVINO_ASSERT(seq.m_id != m_id); } +public: + using Ptr = std::shared_ptr; + using CPtr = std::shared_ptr; + static Sequence::Ptr create(const uint64_t id) { - return std::make_shared(id); + return Sequence::Ptr(new Sequence(id)); } static Sequence::Ptr fork(Sequence::CPtr sequence, const uint64_t id) { - return std::make_shared(*sequence, id); + return Sequence::Ptr(new Sequence(*sequence, id)); } bool operator ==(const Sequence& other) const { @@ -130,7 +132,7 @@ class Sequence { GenerationOutput output; if (token_cnt > 0) { OPENVINO_ASSERT(m_generated_ids.size()); - output.score = get_cumulative_log_probs(); + output.score = get_cumulative_log_prob(); auto generated_token_id = get_generated_ids(); auto generated_log_probs = get_generated_log_probs(); @@ -163,7 +165,7 @@ class Sequence { return m_generated_log_probs; } - float get_cumulative_log_probs() const { + float get_cumulative_log_prob() const { return m_cumulative_log_prob; } @@ -173,20 +175,18 @@ class Sequence { } float get_beam_search_score(const ov::genai::GenerationConfig& sampling_params) const { - float cumulative_log_prob = get_cumulative_log_probs(), current_length = get_generated_len(); + float cumulative_log_prob = get_cumulative_log_prob(), current_length = get_generated_len(); float score = cumulative_log_prob / std::pow(current_length, sampling_params.length_penalty); return score; } // Each KV block can be uniquely identified by - void set_sequence_group_ptr(std::shared_ptr sequence_group) { + void set_sequence_group_ptr(SequenceGroup* sequence_group) { + assert(sequence_group != nullptr); m_sequence_group = sequence_group; } - std::shared_ptr get_sequence_group_ptr() const { - OPENVINO_ASSERT(!m_sequence_group.expired()); - return m_sequence_group.lock(); - } + std::shared_ptr get_sequence_group_ptr() const; // Each KV block can be uniquely identified by // the tokens within the block and the tokens in the prefix before the block. @@ -198,7 +198,7 @@ class Sequence { // - each sequence shares the same prompt and KV-caches for promp // - in case of beam search each sequence also shares specific part of generic phase // via reference counter mechanism on BlockManager level -class SequenceGroup { +class SequenceGroup : public std::enable_shared_from_this { uint64_t m_request_id; std::vector m_sequences; ov::genai::GenerationConfig m_sampling_params; @@ -206,7 +206,6 @@ class SequenceGroup { TokenIds m_prompt_ids; std::vector m_prompt_log_probs; GenerationStream::Ptr m_generation_stream; - bool m_enable_prefix_caching; size_t m_num_evicted_tokens = 0; bool m_has_echoed = false; @@ -226,33 +225,32 @@ class SequenceGroup { size_t m_num_streamed_tokens = 0, m_stream_window_size = 0; - - SequenceGroup(uint64_t request_id, const ov::genai::GenerationConfig& sampling_params, std::size_t block_size, bool enable_prefix_caching) + SequenceGroup(uint64_t request_id, const ov::genai::GenerationConfig& sampling_params, std::size_t block_size) : m_request_id(request_id), m_sampling_params(sampling_params), m_block_size(block_size), - m_enable_prefix_caching(enable_prefix_caching) { - m_generation_stream = GenerationStream::create(); - } + m_generation_stream(GenerationStream::create()) { } public: using Ptr = std::shared_ptr; using CPtr = std::shared_ptr; - SequenceGroup(uint64_t request_id, const TokenIds& input_ids, const ov::genai::GenerationConfig& sampling_params, std::size_t block_size, bool enable_prefix_caching) - : SequenceGroup(request_id, ov::Tensor(ov::element::i64, ov::Shape{input_ids.size()}, (void *)input_ids.data()), sampling_params, block_size, enable_prefix_caching) { + SequenceGroup(uint64_t request_id, const TokenIds& input_ids, const ov::genai::GenerationConfig& sampling_params, std::size_t block_size) + : SequenceGroup(request_id, ov::Tensor(ov::element::i64, ov::Shape{input_ids.size()}, (void *)input_ids.data()), sampling_params, block_size) { } - SequenceGroup(uint64_t request_id, const ov::Tensor input_ids, const ov::genai::GenerationConfig& sampling_params, std::size_t block_size, bool enable_prefix_caching) - : SequenceGroup(request_id, sampling_params, block_size, enable_prefix_caching) { - add_sequence(Sequence::create(m_next_sequence_id++)); - + SequenceGroup(uint64_t request_id, const ov::Tensor input_ids, const ov::genai::GenerationConfig& sampling_params, std::size_t block_size) + : SequenceGroup(request_id, sampling_params, block_size) { m_prompt_ids.resize(input_ids.get_size()); std::copy_n(input_ids.data(), input_ids.get_size(), m_prompt_ids.begin()); m_prompt_log_probs.reserve(m_prompt_ids.size()); + + // create a single sequence + add_sequence(Sequence::create(m_next_sequence_id++)); } void add_sequence(const Sequence::Ptr & sequence) { + sequence->set_sequence_group_ptr(this); m_sequences.emplace_back(sequence); } @@ -322,7 +320,6 @@ class SequenceGroup { return it != m_sequences.end(); } - /** * @param seq_id Sequence identifier * @return Pointer to the sequence with this ID. @@ -344,8 +341,8 @@ class SequenceGroup { std::sort(finished_seqs.begin(), finished_seqs.end(), [=] (Sequence::CPtr s1, Sequence::CPtr s2) -> bool { bool is_beam_search = m_sampling_params.is_beam_search(); - const float score_1 = is_beam_search ? s1->get_beam_search_score(m_sampling_params) : s1->get_cumulative_log_probs(); - const float score_2 = is_beam_search ? s2->get_beam_search_score(m_sampling_params) : s2->get_cumulative_log_probs(); + const float score_1 = is_beam_search ? s1->get_beam_search_score(m_sampling_params) : s1->get_cumulative_log_prob(); + const float score_2 = is_beam_search ? s2->get_beam_search_score(m_sampling_params) : s2->get_cumulative_log_prob(); return score_1 > score_2; }); @@ -409,7 +406,6 @@ class SequenceGroup { m_num_evicted_tokens += num_evicted_tokens; } - /** * Resets the eviction tracking on this sequence to the state prior to any eviction taking place. */ @@ -434,7 +430,6 @@ class SequenceGroup { return get_num_processed_tokens() + get_num_scheduled_tokens(); } - bool requires_sampling() const { return get_context_len() >= get_prompt_len() && get_context_len() > m_max_content_len && m_sampling_params.max_new_tokens > 0; } @@ -513,7 +508,6 @@ class SequenceGroup { return (get_context_len() - get_num_evicted_tokens() + m_block_size - 1) / m_block_size; } - // requires number of physical blocks for next generation size_t get_num_blocks() const { return get_num_logical_blocks(); @@ -524,10 +518,9 @@ class SequenceGroup { } Sequence::Ptr fork_sequence(Sequence::CPtr sequence) { - auto ptr = sequence->get_sequence_group_ptr(); - m_sequences.emplace_back(Sequence::fork(std::move(sequence), m_next_sequence_id++)); - set_sequence_group_ptr(ptr); - return m_sequences.back(); + auto forked_sequence = Sequence::fork(sequence, m_next_sequence_id++); + m_sequences.emplace_back(forked_sequence); + return forked_sequence; } const ov::genai::GenerationConfig& get_sampling_parameters() const { @@ -568,12 +561,6 @@ class SequenceGroup { return m_is_gen_paused; } - void set_sequence_group_ptr(std::shared_ptr sequence_group) { - for (auto sequence: m_sequences) { - sequence->set_sequence_group_ptr(sequence_group); - } - } - GenerationStream::Ptr get_generation_stream() { return m_generation_stream; } @@ -600,7 +587,7 @@ class SequenceGroup { output.generated_ids.insert(output.generated_ids.begin(), m_prompt_ids.begin(), m_prompt_ids.end()); output.generated_log_probs.insert(output.generated_log_probs.begin(), m_prompt_log_probs.begin(), m_prompt_log_probs.end()); } - output.score = m_sampling_params.is_beam_search() ? sequence->get_beam_search_score(m_sampling_params) : sequence->get_cumulative_log_probs(); + output.score = m_sampling_params.is_beam_search() ? sequence->get_beam_search_score(m_sampling_params) : sequence->get_cumulative_log_prob(); output.finish_reason = sequence->get_finish_reason(); outputs.emplace(sequence->get_grouped_id(), output); } @@ -684,4 +671,10 @@ class SequenceGroup { m_generation_stream->push(std::move(outputs)); } }; + +inline std::shared_ptr Sequence::get_sequence_group_ptr() const { + assert(m_sequence_group != nullptr); + return m_sequence_group->shared_from_this(); +} + } diff --git a/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp b/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp index 5091218ccd..a1d0e85f17 100644 --- a/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp +++ b/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp @@ -159,7 +159,7 @@ init_request( for (const auto& candidate_sequence : candidates) { Sequence::Ptr sequence; if (is_init_all_sequences_in_request && candidate_sequence.first > 0) { - sequence = Sequence::Ptr(new Sequence(candidate_sequence.first)); + sequence = Sequence::create(candidate_sequence.first); sequence->set_status(ov::genai::SequenceStatus::RUNNING); request->add_sequence(sequence); } else { diff --git a/src/cpp/src/visual_language/pipeline.cpp b/src/cpp/src/visual_language/pipeline.cpp index d625485205..ebc5c3b5dd 100644 --- a/src/cpp/src/visual_language/pipeline.cpp +++ b/src/cpp/src/visual_language/pipeline.cpp @@ -175,7 +175,6 @@ class ov::genai::VLMPipeline::VLMPipelineImpl { std::vector requests; size_t request_id = 0; size_t block_size = 1; // not used - bool enable_prefix_caching = false; size_t history_size = m_language.get_tensor("attention_mask").get_shape().at(1) - to_remove_from_hist; size_t inputs_embeds_size = inputs_embeds.get_shape().at(1); @@ -185,8 +184,7 @@ class ov::genai::VLMPipeline::VLMPipelineImpl { std::fill_n(prompt_ids.data(), prompt_ids.get_size(), m_tokenizer.get_pad_token_id()); std::copy(tokenized_history.begin(), tokenized_history.end(), prompt_ids.data()); - SequenceGroup::Ptr sequence_group = std::make_shared(request_id, prompt_ids, generation_config, block_size, enable_prefix_caching); - sequence_group->set_sequence_group_ptr(sequence_group); + SequenceGroup::Ptr sequence_group = std::make_shared(request_id, prompt_ids, generation_config, block_size); requests.push_back(sequence_group); std::shared_ptr streamer_ptr = std::visit(overloaded{ diff --git a/tests/cpp/block_manager.cpp b/tests/cpp/block_manager.cpp index 466cc23864..46c2fdddd7 100644 --- a/tests/cpp/block_manager.cpp +++ b/tests/cpp/block_manager.cpp @@ -13,12 +13,11 @@ TEST(TestBlockManager, general_test) { ov::genai::TokenIds prompt_ids; ov::genai::SequenceGroup::Ptr sequence_group = std::make_shared( - 0, + 0, ov::Tensor(ov::element::i64, { prompt_ids.size()}, prompt_ids.data()), ov::genai::beam_search(), - 4, - false); + 4); auto sequence = sequence_group->get_not_finished_sequences()[0]; bm.allocate(sequence, 6); auto seq_id = sequence->get_id(); @@ -46,13 +45,11 @@ TEST(TestBlockManager, required_blocks_count) { std::vector tokens = {0,1,2,3,4}; ov::genai::SequenceGroup::Ptr sequence_group = std::make_shared( - 0, + 0, ov::Tensor(ov::element::i64, { tokens.size()}, tokens.data()), ov::genai::beam_search(), - 4, - false); - sequence_group->set_sequence_group_ptr(sequence_group); + 4); sequence_group->schedule_tokens(5); auto required_blocks = bm.required_blocks_count(sequence_group); EXPECT_EQ(required_blocks, 2); @@ -62,7 +59,7 @@ TEST(TestBlockManager, required_blocks_count) { EXPECT_EQ(bm.get_number_of_blocks_occupied_by_sequence(sequence_group), 2); sequence_group->finish_iteration(); - auto sequence_to_fork = sequence_group->get_running_sequences()[0]; + auto sequence_to_fork = sequence_group->get_running_sequences()[0]; for (size_t i = 0; i < 4; ++i) { const auto forked_sequence = sequence_group->fork_sequence(sequence_to_fork); bm.fork_sequence(sequence_to_fork->get_id(), forked_sequence->get_id()); @@ -98,9 +95,7 @@ TEST(TestBlockManager, CanFreeBlocksFromSequence) { ov::Tensor(ov::element::i64, { tokens.size()}, tokens.data()), ov::genai::beam_search(), - BLOCK_SIZE, - false); - sequence_group->set_sequence_group_ptr(sequence_group); + BLOCK_SIZE); sequence_group->schedule_tokens(5); bm.append_slots(sequence_group); ASSERT_EQ(bm.num_free_blocks(), 5); diff --git a/tests/cpp/cache_manager.cpp b/tests/cpp/cache_manager.cpp index 5dc848aba5..095cc39f09 100644 --- a/tests/cpp/cache_manager.cpp +++ b/tests/cpp/cache_manager.cpp @@ -11,14 +11,17 @@ using namespace ov::genai; -std::shared_ptr get_dummy_model(size_t num_layers) { +std::shared_ptr get_dummy_model(ov::Core core, size_t num_layers) { ov::NodeVector keys; ov::NodeVector values; ov::ParameterVector params; + ov::element::Type inference_precision = core.get_property("CPU", ov::hint::inference_precision); + ov::element::Type kv_cache_type = inference_precision == ov::element::bf16 ? ov::element::bf16 : ov::element::f16; + auto shape = ov::PartialShape({ov::Dimension::dynamic(), ov::Dimension::dynamic(), ov::Dimension::dynamic(), ov::Dimension::dynamic()}); for (size_t i = 0; i < num_layers; i++) { - auto key = std::make_shared(ov::element::f16, shape); - auto value = std::make_shared(ov::element::f16, shape); + auto key = std::make_shared(kv_cache_type, shape); + auto value = std::make_shared(kv_cache_type, shape); key->get_output_tensor(0).set_names({"key_cache." + std::to_string(i)}); value->get_output_tensor(0).set_names({"value_cache." + std::to_string(i)}); keys.push_back(key); @@ -57,7 +60,7 @@ TEST(TestCacheManager, test_cache_size_param) { std::vector num_kv_heads(12, 12); device_config.set_model_params(num_kv_heads, 64, num_decoder_layers); - ov::InferRequest request = core.compile_model(get_dummy_model(num_decoder_layers)).create_infer_request(); + ov::InferRequest request = core.compile_model(get_dummy_model(core, num_decoder_layers)).create_infer_request(); auto cache_manager = std::make_shared(device_config, request, core); auto block_manager = BlockManager(device_config.get_num_kv_blocks(), false, device_config.get_block_size(), device_config.get_num_layers()); cache_manager->allocate_cache_if_needed(block_manager.get_total_number_of_kv_blocks()); @@ -80,7 +83,7 @@ TEST(TestCacheManager, test_kv_blocks_param) { std::vector num_kv_heads(12, 12); device_config.set_model_params(num_kv_heads, 64, num_decoder_layers); - ov::InferRequest request = core.compile_model(get_dummy_model(num_decoder_layers)).create_infer_request(); + ov::InferRequest request = core.compile_model(get_dummy_model(core, num_decoder_layers)).create_infer_request(); auto cache_manager = std::make_shared(device_config, request, core); auto block_manager = BlockManager(device_config.get_num_kv_blocks(), false, device_config.get_block_size(), device_config.get_num_layers()); OPENVINO_ASSERT(block_manager.get_total_number_of_kv_blocks(), scheduler_config.num_kv_blocks); @@ -107,7 +110,7 @@ TEST(TestCacheManager, test_dynamic_cache_increase) { } - ov::InferRequest request = core.compile_model(get_dummy_model(num_decoder_layers)).create_infer_request(); + ov::InferRequest request = core.compile_model(get_dummy_model(core, num_decoder_layers)).create_infer_request(); auto cache_manager = std::make_shared(device_config, request, core); auto block_manager = BlockManager(device_config.get_num_kv_blocks(), false, device_config.get_block_size(), device_config.get_num_layers()); diff --git a/tests/cpp/sampler.cpp b/tests/cpp/sampler.cpp index f146ab7426..3741880827 100644 --- a/tests/cpp/sampler.cpp +++ b/tests/cpp/sampler.cpp @@ -38,7 +38,7 @@ TEST(SamplerValidationMode, gen_phase_to_cut_whole_seq) { std::vector input_vector{0, 1, 2, 3, 4}; ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); std::vector sequence_groups{ - SequenceGroup::Ptr(new SequenceGroup(0, input_tensor, sampling_config, 32, false)), + SequenceGroup::Ptr(new SequenceGroup(0, input_tensor, sampling_config, 32)), }; // to emulate processed prompt and add next token [ 0 ] @@ -82,7 +82,7 @@ TEST(SamplerValidationMode, gen_phase_to_cut_part_seq) { std::vector input_vector{0, 1, 2, 3, 4}; ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); std::vector sequence_groups{ - SequenceGroup::Ptr(new SequenceGroup(0, input_tensor, sampling_config, 32, false)), + SequenceGroup::Ptr(new SequenceGroup(0, input_tensor, sampling_config, 32)), }; // to emulate processed prompt and add next token [ 0 ] @@ -127,7 +127,7 @@ TEST(SamplerValidationMode, gen_phase) { std::vector input_vector{0, 1, 2, 3, 4}; ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); std::vector sequence_groups{ - SequenceGroup::Ptr(new SequenceGroup(0, input_tensor, sampling_config, 32, false)), + SequenceGroup::Ptr(new SequenceGroup(0, input_tensor, sampling_config, 32)), }; // to emulate processed prompt and add next token [ 0 ] @@ -171,7 +171,7 @@ TEST(SamplerValidationMode, prompt_phase_to_cut_part_seq) { std::vector input_vector{0, 1, 2, 3, 4}; ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); std::vector sequence_groups{ - SequenceGroup::Ptr(new SequenceGroup(0, input_tensor, sampling_config, 32, false)), + SequenceGroup::Ptr(new SequenceGroup(0, input_tensor, sampling_config, 32)), }; // append candidates [ 0, 1, 1 ] @@ -217,7 +217,7 @@ TEST(SamplerValidationMode, prompt_phase_to_cut_whole_seq) { std::vector input_vector{0, 1, 2, 3, 4}; ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); std::vector sequence_groups{ - SequenceGroup::Ptr(new SequenceGroup(0, input_tensor, sampling_config, 32, false)), + SequenceGroup::Ptr(new SequenceGroup(0, input_tensor, sampling_config, 32)), }; // append candidates [ 1, 2, 3 ] @@ -262,7 +262,7 @@ TEST(SamplerValidationMode, prompt_phase) { std::vector input_vector{0, 1, 2, 3, 4}; ov::Tensor input_tensor(ov::element::i64, ov::Shape{1, 5}, input_vector.data()); std::vector sequence_groups{ - SequenceGroup::Ptr(new SequenceGroup(0, input_tensor, sampling_config, 32, false)), + SequenceGroup::Ptr(new SequenceGroup(0, input_tensor, sampling_config, 32)), }; // append candidates [ 0, 1, 2 ] diff --git a/tests/cpp/scheduler.cpp b/tests/cpp/scheduler.cpp index cc0b53a433..23594adf50 100644 --- a/tests/cpp/scheduler.cpp +++ b/tests/cpp/scheduler.cpp @@ -18,14 +18,17 @@ void clear_finished_sequences(std::vector& requests) { }); requests.erase(new_end, requests.end()); } -std::shared_ptr get_model(size_t num_layers) { +std::shared_ptr get_model(ov::Core core, size_t num_layers) { ov::NodeVector keys; ov::NodeVector values; ov::ParameterVector params; + ov::element::Type inference_precision = core.get_property("CPU", ov::hint::inference_precision); + ov::element::Type kv_cache_type = inference_precision == ov::element::bf16 ? ov::element::bf16 : ov::element::f16; + auto shape = ov::PartialShape({ov::Dimension::dynamic(), ov::Dimension::dynamic(), ov::Dimension::dynamic(), ov::Dimension::dynamic()}); for (size_t i = 0; i < num_layers; i++) { - auto key = std::make_shared(ov::element::f16, shape); - auto value = std::make_shared(ov::element::f16, shape); + auto key = std::make_shared(kv_cache_type, shape); + auto value = std::make_shared(kv_cache_type, shape); key->get_output_tensor(0).set_names({"key_cache." + std::to_string(i)}); value->get_output_tensor(0).set_names({"value_cache." + std::to_string(i)}); keys.push_back(key); @@ -42,12 +45,12 @@ std::shared_ptr get_model(size_t num_layers) { std::shared_ptr init_cache_manager(SchedulerConfig scheduler_config) { ov::Core core = ov::Core(); size_t num_decoder_layers = 12; - ov::InferRequest request = core.compile_model(get_model(num_decoder_layers)).create_infer_request(); + ov::InferRequest request = core.compile_model(get_model(core, num_decoder_layers)).create_infer_request(); size_t head_size = 64, head_size_u8 = head_size + 8; std::vector num_kv_heads(12, 12); ov::genai::DeviceConfig device_config(core, scheduler_config, "CPU"); device_config.set_model_params(num_kv_heads, head_size_u8, num_decoder_layers); - return std::make_shared(device_config, request, core); + return std::make_shared(device_config, request, core); } TEST(TestScheduler, general_test) { @@ -63,17 +66,17 @@ TEST(TestScheduler, general_test) { for (auto scheduler_config: configs) { std::vector tokens = {0,1,2,3,4,5,6,7}; SequenceGroup::Ptr sequence_group1 = std::make_shared(0, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::greedy(), 4, scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 4); auto idx0 = (*sequence_group1)[0]->get_id(); SequenceGroup::Ptr sequence_group2 = std::make_shared(1, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::greedy(), 4, scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 4); auto idx1 = (*sequence_group2)[0]->get_id(); SequenceGroup::Ptr sequence_group3 = std::make_shared(1, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::greedy(), 4, scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 4); auto idx2 = (*sequence_group3)[0]->get_id(); std::vector requests = {sequence_group1, sequence_group2, sequence_group3}; - - // schedule 3 sequence groups that use 6 kv blocks + + // schedule 3 sequence groups that use 6 kv blocks Scheduler scheduler = Scheduler(4, init_cache_manager(scheduler_config), scheduler_config); auto out1 = scheduler.schedule(requests); @@ -82,7 +85,7 @@ TEST(TestScheduler, general_test) { EXPECT_EQ(out1.m_block_tables[idx0][0].size(), 2); EXPECT_EQ(out1.m_block_tables[idx1][0].size(), 2); EXPECT_EQ(out1.m_block_tables[idx2][0].size(), 2); - // tokens.size() * 2 tokens should be scheduled on prompt phase, corresponding to first three sequences + // tokens.size() * 2 tokens should be scheduled on prompt phase, corresponding to first three sequences EXPECT_EQ(out1.m_total_num_scheduled_tokens, tokens.size() * 3); EXPECT_EQ(out1.is_prompt, !scheduler_config.dynamic_split_fuse); @@ -109,7 +112,7 @@ TEST(TestScheduler, general_test) { EXPECT_EQ(out3.m_block_tables[idx0][0].size(), 3); EXPECT_EQ(out3.m_block_tables[idx1][0].size(), 3); // 2 tokens should be scheduled on generate phase for "0" and "1" sequence, "2" sequence should be preempted - EXPECT_EQ(out3.m_total_num_scheduled_tokens, 2); + EXPECT_EQ(out3.m_total_num_scheduled_tokens, 2); EXPECT_FALSE(out3.is_prompt); // check that scheduler has no block table for sequence_group3 @@ -124,7 +127,7 @@ TEST(TestScheduler, general_test) { auto out4 = scheduler.schedule(requests); - // check that sequence_group3 is fully scehuled + // check that sequence_group3 is fully scehuled EXPECT_EQ(out4.m_block_tables[idx2][0].size(), 2); EXPECT_FALSE(out4.m_block_tables[idx2][0][0]->is_free()); EXPECT_EQ(out4.m_block_tables[idx2][0][0]->get_index(), 0); @@ -168,10 +171,10 @@ TEST_P(AppendSlotsSchedulerTest, test_append_slots_considers_all_sequences) { auto scheduler_config = GetParam(); std::vector tokens = {0,1,2,3,4,5,6,7}; SequenceGroup::Ptr sequence_group1 = std::make_shared(0, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::greedy(), 4, scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 4); auto idx0 = (*sequence_group1)[0]->get_id(); SequenceGroup::Ptr sequence_group2 = std::make_shared(1, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::greedy(), 4, scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 4); auto idx1 = (*sequence_group2)[0]->get_id(); std::vector requests = {sequence_group1, sequence_group2}; @@ -233,11 +236,11 @@ TEST_P(PartialPreemptionSchedulerTest, test_partial_preemption) { auto scheduler_config = GetParam(); std::vector tokens1 = {0,1,2,3,4,5,6,7,8,9,10}; SequenceGroup::Ptr sequence_group1 = std::make_shared(0, ov::Tensor(ov::element::i64, {tokens1.size()}, tokens1.data()), - ov::genai::greedy(), 4, scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 4); std::vector tokens2 = {0,1,2,3,4,5,6,7}; auto idx0 = (*sequence_group1)[0]->get_id(); SequenceGroup::Ptr sequence_group2 = std::make_shared(1, ov::Tensor(ov::element::i64, {tokens2.size()}, tokens2.data()), - ov::genai::greedy(), 4, scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 4); auto idx1 = (*sequence_group2)[0]->get_id(); std::vector requests = {sequence_group1, sequence_group2}; @@ -324,9 +327,9 @@ TEST(TestScheduler, test_partial_preemption_beam_search) { // create beam search group SequenceGroup::Ptr sequence_group = std::make_shared(0, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::beam_search(), 4, scheduler_config.enable_prefix_caching); - sequence_group->set_sequence_group_ptr(sequence_group); + ov::genai::beam_search(), 4); std::vector requests = {sequence_group}; + EXPECT_NO_THROW(requests[0]->get_running_sequences()[0]->get_sequence_group_ptr()); Scheduler scheduler = Scheduler(4, init_cache_manager(scheduler_config), scheduler_config); auto out = scheduler.schedule(requests); @@ -336,7 +339,7 @@ TEST(TestScheduler, test_partial_preemption_beam_search) { sequence_group->finish_iteration(); // make 2 forked sequence - auto sequence_to_fork = sequence_group->get_running_sequences()[0]; + auto sequence_to_fork = sequence_group->get_running_sequences()[0]; for (size_t i = 0; i < 2; ++i) { const auto forked_sequence = sequence_group->fork_sequence(sequence_to_fork); scheduler.fork_sequence(sequence_to_fork->get_id(), forked_sequence->get_id()); @@ -352,7 +355,7 @@ TEST(TestScheduler, test_partial_preemption_beam_search) { } sequence_group->finish_iteration(); } - // currently sequence occupies 4 blocks (1 shared, 3 not shared) + // currently sequence occupies 4 blocks (1 shared, 3 not shared) // make another 2 forked sequence for (size_t i = 0; i < 2; ++i) { @@ -373,8 +376,7 @@ TEST(TestScheduler, test_partial_preemption_beam_search) { // create group, which requires 1 block SequenceGroup::Ptr sequence_group_greedy = std::make_shared(0, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::greedy(), 4, scheduler_config.enable_prefix_caching); - sequence_group_greedy->set_sequence_group_ptr(sequence_group_greedy); + ov::genai::greedy(), 4); // set greedy group at the beginning of list to make it higher priority std::vector new_requests = {sequence_group_greedy, sequence_group}; @@ -386,8 +388,8 @@ TEST(TestScheduler, test_partial_preemption_beam_search) { EXPECT_EQ(sequence_group->get_num_processed_tokens(), 12); EXPECT_EQ(sequence_group->get_context_len(), 12); - - // beam search group should be partially preempted and 5 blocks should be released + + // beam search group should be partially preempted and 5 blocks should be released out = scheduler.schedule(new_requests); sequence_group_greedy->get_sequences()[0]->append_token(token, 0.5); sequence_group_greedy->finish_iteration(); @@ -399,8 +401,8 @@ TEST(TestScheduler, test_partial_preemption_beam_search) { EXPECT_EQ(scheduler.get_block_tables(*seqs[2])[0].size(), 2); EXPECT_EQ(scheduler.get_block_tables(*seqs[3])[0].size(), 2); EXPECT_EQ(scheduler.get_block_tables(*seqs[4])[0].size(), 2); - - // append another 20 tokens to greedy group, this should result in usage of all free blocks and + + // append another 20 tokens to greedy group, this should result in usage of all free blocks and // another partial preemption of beam search group for (size_t i = 0; i < 20; i++) { out = scheduler.schedule(new_requests); @@ -431,13 +433,13 @@ TEST(TestScheduler, test_partially_preempted_prompt) { for (auto scheduler_config: configs) { std::vector tokens = {0,1,2,3,4,5,6,7,8,9,10,11}; SequenceGroup::Ptr sequence_group1 = std::make_shared(0, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::greedy(), 4, scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 4); auto idx0 = (*sequence_group1)[0]->get_id(); SequenceGroup::Ptr sequence_group2 = std::make_shared(1, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::greedy(), 4, scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 4); auto idx1 = (*sequence_group2)[0]->get_id(); - std::vector requests = {sequence_group1, sequence_group2}; - + std::vector requests = {sequence_group1, sequence_group2}; + // schedule 2 sequence groups that use all available 2*3 kv blocks, we used all available kv-blocks. Scheduler scheduler = Scheduler(4, init_cache_manager(scheduler_config), scheduler_config); auto out1 = scheduler.schedule(requests); @@ -450,7 +452,7 @@ TEST(TestScheduler, test_partially_preempted_prompt) { // sequence_group2 should be fully preempted auto out2 = scheduler.schedule(requests); - + // check that sequence_group1 has one more allocated block auto block_tables_for_all_layers = scheduler.get_block_tables(*(*sequence_group1)[0]); auto block_table1 = block_tables_for_all_layers[0]; @@ -467,7 +469,7 @@ TEST(TestScheduler, test_partially_preempted_prompt) { std::vector ref_ids = {0}; EXPECT_EQ(out2.m_scheduled_sequence_groups_ids, ref_ids); - EXPECT_EQ(out2.m_total_num_scheduled_tokens, 1); + EXPECT_EQ(out2.m_total_num_scheduled_tokens, 1); if (scheduler_config.dynamic_split_fuse) { // for dynamic_split_fuse sequence_group2 is preemted partially, part of prompt is left @@ -479,12 +481,12 @@ TEST(TestScheduler, test_partially_preempted_prompt) { // for vllm case sequence_group2 is fully preempted EXPECT_FALSE(scheduler.has_block_table(idx1)); } - + for (auto seq: requests) { std::vector running_sequences = seq->get_running_sequences(); seq->finish_iteration(); } - + // finish first sequence requests[0]->get_running_sequences()[0]->set_status(SequenceStatus::FINISHED); scheduler.free_sequence(idx0); @@ -496,11 +498,11 @@ TEST(TestScheduler, test_partially_preempted_prompt) { if (scheduler_config.dynamic_split_fuse) { // remaining part of prompt should be scheduled - EXPECT_EQ(out3.m_total_num_scheduled_tokens, 4); + EXPECT_EQ(out3.m_total_num_scheduled_tokens, 4); } else { // prompt should be fully scheduled - EXPECT_EQ(out3.m_total_num_scheduled_tokens, 12); + EXPECT_EQ(out3.m_total_num_scheduled_tokens, 12); } EXPECT_EQ(out3.m_block_tables[idx1][0][0]->get_index(), 3); @@ -541,16 +543,14 @@ TEST(TestScheduler, prefix_caching_test) { std::vector tokens = histrory_tokens; tokens.insert(tokens.end(), prompt_tokens.begin(), prompt_tokens.end()); SequenceGroup::Ptr sequence_group = std::make_shared(0, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::greedy(), 4, - scheduler_config.enable_prefix_caching); - sequence_group->set_sequence_group_ptr(sequence_group); + ov::genai::greedy(), 4); scheduler.restore_cached_blocks(sequence_group); std::vector requests = {sequence_group}; auto out1 = scheduler.schedule(requests); if (chat_iteration == 0) EXPECT_EQ(out1.m_total_num_scheduled_tokens, prompt_tokens.size()); - else + else EXPECT_EQ(out1.m_total_num_scheduled_tokens, prompt_tokens.size() + 1); for (auto seq: requests) { std::vector running_sequences = seq->get_running_sequences(); @@ -604,14 +604,10 @@ TEST(TestScheduler, prefix_caching_test_two_identical_sequences) { std::vector tokens = histrory_tokens; tokens.insert(tokens.end(), prompt_tokens.begin(), prompt_tokens.end()); SequenceGroup::Ptr sequence_group1 = std::make_shared(0, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::greedy(), 4, - scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 4); SequenceGroup::Ptr sequence_group2 = std::make_shared(0, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::greedy(), 4, - scheduler_config.enable_prefix_caching); - sequence_group1->set_sequence_group_ptr(sequence_group1); - sequence_group2->set_sequence_group_ptr(sequence_group2); + ov::genai::greedy(), 4); std::vector requests = {sequence_group1, sequence_group2}; // restore cached blocks for (auto request: requests) { @@ -622,7 +618,7 @@ TEST(TestScheduler, prefix_caching_test_two_identical_sequences) { auto out1 = scheduler.schedule(requests); if (chat_iteration == 0) EXPECT_EQ(out1.m_total_num_scheduled_tokens, prompt_tokens.size() * 2); - else + else EXPECT_EQ(out1.m_total_num_scheduled_tokens, (prompt_tokens.size() + 1) * 2); for (auto seq: requests) { std::vector running_sequences = seq->get_running_sequences(); @@ -650,7 +646,7 @@ TEST(TestScheduler, prefix_caching_test_two_identical_sequences) { scheduler.free_sequence(idx0); } auto generated_ids = requests[0]->get_sequences()[0]->get_generated_ids(); - + histrory_tokens.insert(histrory_tokens.end(), prompt_tokens.begin(), prompt_tokens.end()); histrory_tokens.insert(histrory_tokens.end(), generated_ids.begin(), generated_ids.end()); } @@ -676,10 +672,8 @@ TEST(TestScheduler, prefix_caching_with_max_new_tokens_equal_1) { for (size_t chat_iteration = 0; chat_iteration < chat_iterations; chat_iteration++) { SequenceGroup::Ptr sequence_group = std::make_shared(0, ov::Tensor(ov::element::i64, {prompt_tokens.size()}, prompt_tokens.data()), - ov::genai::greedy(), 32, - scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 32); - sequence_group->set_sequence_group_ptr(sequence_group); std::vector requests = {sequence_group}; // restore cached blocks for (auto request: requests) { @@ -690,7 +684,7 @@ TEST(TestScheduler, prefix_caching_with_max_new_tokens_equal_1) { auto out1 = scheduler.schedule(requests); if (chat_iteration == 0) EXPECT_EQ(out1.m_total_num_scheduled_tokens, prompt_tokens.size()); - else + else EXPECT_EQ(out1.m_total_num_scheduled_tokens, 1); for (auto seq: requests) { std::vector running_sequences = seq->get_running_sequences(); @@ -721,10 +715,10 @@ TEST(TestScheduler, test_partially_preempted_prompt_not_allowed) { std::vector tokens = {0,1,2,3,4,5,6,7,8,9,10,11}; SequenceGroup::Ptr sequence_group1 = std::make_shared(0, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::greedy(), 4, scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 4); auto idx0 = (*sequence_group1)[0]->get_id(); SequenceGroup::Ptr sequence_group2 = std::make_shared(1, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::greedy(), 4, scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 4); auto idx1 = (*sequence_group2)[0]->get_id(); std::vector requests = {sequence_group1, sequence_group2}; @@ -796,10 +790,10 @@ TEST(TestScheduler, test_partially_preempted_prompt_not_allowed2) { std::vector tokens = {0,1,2,3,4,5,6,7,8,9}; SequenceGroup::Ptr sequence_group1 = std::make_shared(0, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::greedy(), 4, scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 4); auto idx0 = (*sequence_group1)[0]->get_id(); SequenceGroup::Ptr sequence_group2 = std::make_shared(1, ov::Tensor(ov::element::i64, {tokens.size()}, tokens.data()), - ov::genai::greedy(), 4, scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 4); auto idx1 = (*sequence_group2)[0]->get_id(); std::vector requests = {sequence_group1, sequence_group2}; @@ -909,12 +903,11 @@ TEST(TestScheduler, FullyPreemptsCacheEvictedSequences) { ov::Tensor(ov::element::i64, {tokens1.size()}, tokens1.data()), ov::genai::greedy(), - 2, - scheduler_config.enable_prefix_caching); + 2); std::vector tokens2 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; // 5 full blocks, larger than eviction arena size (3 blocks) - will start evicting already at prompt stage auto idx1 = (*sequence_group1)[0]->get_id(); SequenceGroup::Ptr sequence_group2 = std::make_shared(1, ov::Tensor(ov::element::i64, {tokens2.size()}, tokens2.data()), - ov::genai::greedy(), 2, scheduler_config.enable_prefix_caching); + ov::genai::greedy(), 2); auto idx2 = (*sequence_group2)[0]->get_id(); std::vector requests = {sequence_group1, sequence_group2}; diff --git a/tests/cpp/speculative_decoding.cpp b/tests/cpp/speculative_decoding.cpp index bb10c2cc8f..1cf8db0fab 100644 --- a/tests/cpp/speculative_decoding.cpp +++ b/tests/cpp/speculative_decoding.cpp @@ -20,9 +20,7 @@ class CBForSDTest : public testing::Test, public ov::genai::ContinuousBatchingPi ov::genai::SequenceGroup::Ptr sequence_group = std::make_shared(request_id, input_ids, sampling_params, - 32, - true); - sequence_group->set_sequence_group_ptr(sequence_group); + 32); { std::lock_guard lock{m_awaiting_requests_mutex};